背景
之前写了一篇关于线程挂起超时导致的 Native Crash
的文章,并提供了一种基于 inline hook
技术规避超时导致的 abort
信号引发崩溃的解决方案,有兴趣可以看下 - Android 线程挂起超时问题
不过我们当时没有详细聊一聊线程挂起的流程,这次我们简单捋一捋。文章源自灵鲨社区-https://www.0s52.com/bcjc/cyyjc/15361.html
前置问题
线程在修改线程名称或者dump 堆栈为何要先挂起?
1. 数据一致性文章源自灵鲨社区-https://www.0s52.com/bcjc/cyyjc/15361.html
当我们修改线程名称或 dump 线程堆栈时,线程可能正在运行和修改它的状态。如果在运行过程中修改其名称或读取其堆栈,可能会导致数据不一致或读取到不完整的信息。文章源自灵鲨社区-https://www.0s52.com/bcjc/cyyjc/15361.html
2. 避免竞争条件文章源自灵鲨社区-https://www.0s52.com/bcjc/cyyjc/15361.html
线程在运行时可能会与其他线程竞争资源或数据。如果不挂起线程直接修改其名称或读取堆栈,可能会引入竞争条件,导致不可预测的行为或崩溃。文章源自灵鲨社区-https://www.0s52.com/bcjc/cyyjc/15361.html
3. 保证操作的原子性文章源自灵鲨社区-https://www.0s52.com/bcjc/cyyjc/15361.html
挂起线程可以确保在进行修改或读取操作时,该操作是原子的。即在修改线程名称或读取堆栈的整个过程中,不会有其他操作干扰,从而保证操作的完整性。文章源自灵鲨社区-https://www.0s52.com/bcjc/cyyjc/15361.html
挂起前置流程
挂起入口代码
arduino
Thread* ThreadList::SuspendThreadByPeer(jobject peer,
SuspendReason reason,
bool* timed_out) {
bool request_suspension = true; // 是否请求挂起的标志
const uint64_t start_time = NanoTime(); // 开始时间
useconds_t sleep_us = kThreadSuspendInitialSleepUs; // 初始睡眠时间
*timed_out = false; // 是否超时标志初始化为 false
Thread* const self = Thread::Current(); // 当前线程
Thread* suspended_thread = nullptr; // 被挂起的线程
VLOG(threads) << "SuspendThreadByPeer starting";
while (true) {
Thread* thread;
{
ScopedObjectAccess soa(self); // 确保当前线程有权访问托管对象
MutexLock thread_list_mu(self, *Locks::thread_list_lock_); // 锁住线程列表
thread = Thread::FromManagedThread(soa, peer); // 从托管线程获取线程对象
if (thread == nullptr) {
// 如果线程不存在,记录警告并返回 nullptr
ThreadSuspendByPeerWarning(soa, ::android::base::WARNING, "No such thread for suspend", peer);
return nullptr;
}
if (!Contains(thread)) {
// 如果线程不在列表中,记录日志并返回 nullptr
VLOG(threads) << "SuspendThreadByPeer failed for unattached thread: " << reinterpret_cast<void*>(thread);
return nullptr;
}
VLOG(threads) << "SuspendThreadByPeer found thread: " << *thread;
{
MutexLock suspend_count_mu(self, *Locks::thread_suspend_count_lock_); // 锁住挂起计数
if (request_suspension) {
// 如果请求挂起
CHECK(suspended_thread == nullptr); // 确认没有其他挂起的线程
suspended_thread = thread; // 设置当前线程为被挂起的线程
// 修改挂起计数
bool updated = suspended_thread->ModifySuspendCount(self, +1, nullptr, reason);
DCHECK(updated);
request_suspension = false; // 请求挂起完成
} else {
// 确保线程挂起计数大于0
CHECK_GT(thread->GetSuspendCount(), 0);
}
// 确保当前线程不是试图挂起自己
CHECK_NE(thread, self) << "Attempt to suspend the current thread for the debugger";
if (thread->IsSuspended()) {
// 如果线程已经挂起,返回该线程
VLOG(threads) << "SuspendThreadByPeer thread suspended: " << *thread;
return thread;
}
const uint64_t total_delay = NanoTime() - start_time; // 计算总延迟时间
if (total_delay >= thread_suspend_timeout_ns_) {
// 如果超时,记录错误并中止
LOG(WARNING) << "Suspended thread state_and_flags: " << suspended_thread->StateAndFlagsAsHexString();
ThreadSuspendByPeerWarning(soa, ::android::base::FATAL, "Thread suspension timed out", peer);
UNREACHABLE();
} else if (sleep_us == 0 && total_delay > static_cast<uint64_t>(kThreadSuspendMaxYieldUs) * 1000) {
// 切换到睡眠以防止过度使用 CPU
sleep_us = kThreadSuspendMaxYieldUs / 2;
}
}
}
VLOG(threads) << "SuspendThreadByPeer waiting to allow thread chance to suspend";
ThreadSuspendSleep(sleep_us); // 睡眠一段时间等待线程挂起
// 增加睡眠时间以避免忙等
sleep_us = std::min(sleep_us * 2, kThreadSuspendMaxSleepUs);
}
}
主要需要关注这行代码:文章源自灵鲨社区-https://www.0s52.com/bcjc/cyyjc/15361.html
bool updated = suspended_thread->ModifySuspendCount(self, +1, nullptr, reason);
文章源自灵鲨社区-https://www.0s52.com/bcjc/cyyjc/15361.html
我们看下ModifySuspendCount
函数的具体实现文章源自灵鲨社区-https://www.0s52.com/bcjc/cyyjc/15361.html
php
inline bool Thread::ModifySuspendCount(Thread* self,
int delta,
AtomicInteger* suspend_barrier,
SuspendReason reason) {
// 如果 delta 大于 0,表示请求挂起线程
// 并且 (gUseUserfaultfd 或 gUseReadBarrier) 为 true 且目标线程不是当前线程
// 或者 suspend_barrier 非空时,需要循环尝试修改挂起计数,这里从我们上个方法传进来是 null
if (delta > 0 && (((gUseUserfaultfd || gUseReadBarrier) && this != self) || suspend_barrier != nullptr)) {
// 当请求挂起时,ModifySuspendCountInternal() 可能会失败
while (true) {
// 尝试修改挂起计数,如果成功,则返回 true
if (LIKELY(ModifySuspendCountInternal(self, delta, suspend_barrier, reason))) {
return true;
} else {
// 如果失败,可能是 active_suspend_barriers 已满或正在执行线程翻转
// 需要释放 thread_suspend_count_lock_ 以避免死锁
// 并等待目标线程执行或线程翻转函数执行完成
Locks::thread_suspend_count_lock_->ExclusiveUnlock(self); // 释放锁
NanoSleep(100000); // 睡眠 100 毫秒
Locks::thread_suspend_count_lock_->ExclusiveLock(self); // 重新获取锁
}
}
} else {
// 如果 delta 小于等于 0,直接调用 ModifySuspendCountInternal() 修改挂起计数
return ModifySuspendCountInternal(self, delta, suspend_barrier, reason);
}
}
暂且不谈后续流程,如果主线程去修改子线程名称,走到这个方法,如果发生了线程翻转(正处于挂起请求或者挂起中),势必要死循环+睡眠 100 毫秒去请求挂起,这样看可能阻塞主线程,不过概率不大。
之后的流程会直接调用ModifySuspendCountInternal
函数,我们再看一下。
arduino
bool Thread::ModifySuspendCountInternal(Thread* self,
int delta,
AtomicInteger* suspend_barrier,
SuspendReason reason) {
if (kIsDebugBuild) {
// 确保 delta 为 -1 或 +1
DCHECK(delta == -1 || delta == +1) << reason << " " << delta << " " << this;
// 确保当前线程持有 thread_suspend_count_lock_ 锁
Locks::thread_suspend_count_lock_->AssertHeld(self);
// 如果目标线程不是当前线程并且目标线程未挂起,确保当前线程持有 thread_list_lock_ 锁
if (this != self && !IsSuspended()) {
Locks::thread_list_lock_->AssertHeld(self);
}
}
// 用户代码挂起需要进行更严格的检查,因为它们来源于运行时控制之外的代码
if (UNLIKELY(reason == SuspendReason::kForUserCode)) {
// 确保当前线程持有 user_code_suspension_lock_ 锁
Locks::user_code_suspension_lock_->AssertHeld(self);
// 确保用户代码挂起计数不会变为负数
if (UNLIKELY(delta + tls32_.user_code_suspend_count < 0)) {
LOG(ERROR) << "attempting to modify suspend count in an illegal way.";
return false;
}
}
// 检查挂起计数的合法性,防止挂起计数变为负值
if (UNLIKELY(delta < 0 && tls32_.suspend_count <= 0)) {
UnsafeLogFatalForSuspendCount(self, this);
return false;
}
// 检查线程翻转,如果目标线程不是当前线程且正在进行线程翻转,返回 false 以避免死锁
if (delta > 0 && this != self && tlsPtr_.flip_function != nullptr) {
return false;
}
// 重点标记位,记住这里,后续围绕这个展开。
uint32_t flags = enum_cast<uint32_t>(ThreadFlag::kSuspendRequest);
// 设置挂起屏障
if (delta > 0 && suspend_barrier != nullptr) {
uint32_t available_barrier = kMaxSuspendBarriers;
// 查找可用的挂起屏障位置
for (uint32_t i = 0; i < kMaxSuspendBarriers; ++i) {
if (tlsPtr_.active_suspend_barriers[i] == nullptr) {
available_barrier = i;
break;
}
}
// 如果没有可用的挂起屏障位置,返回 false
if (available_barrier == kMaxSuspendBarriers) {
return false;
}
// 设置挂起屏障
tlsPtr_.active_suspend_barriers[available_barrier] = suspend_barrier;
flags |= enum_cast<uint32_t>(ThreadFlag::kActiveSuspendBarrier);
}
// 修改挂起计数
tls32_.suspend_count += delta;
switch (reason) {
case SuspendReason::kForUserCode:
tls32_.user_code_suspend_count += delta; // 修改用户代码挂起计数
break;
case SuspendReason::kInternal:
break;
}
// 更新线程状态和触发挂起
if (tls32_.suspend_count == 0) {
// 如果挂起计数为 0,清除挂起请求标志
AtomicClearFlag(ThreadFlag::kSuspendRequest);
} else {
// 设置挂起请求标志
tls32_.state_and_flags.fetch_or(flags, std::memory_order_seq_cst);
TriggerSuspend(); // 触发挂起操作
}
return true;
}
上述代码很清晰,是将Native
侧的线程中 tls32_
这个结构体内部的state_and_flags
标记成ThreadFlag::kSuspendRequest
,类似 Handler
内存屏障的逻辑,插入一个flag
然后等待某个时机读取这个flag
,择机挂起。
-
这个
tls32_
结构体是什么?
tls32_
前边的命名是 Thread Local Storage
,线程本地存储,也就是说通常用来表示每个线程独有的一块内存区域。这个类定义在art/runtime/thread.h
中,然后Native 侧的 thread.class 对象(面向对象)都定义了一个tls32_
的成员结构体对象,对象类型是tls_32bit_sized_values
。
c
struct PACKED(4) tls_32bit_sized_values {
using bool32_t = uint32_t;
explicit tls_32bit_sized_values(bool is_daemon)
: state_and_flags(0u),
suspend_count(0),
thin_lock_thread_id(0),
tid(0),
daemon(is_daemon),
throwing_OutOfMemoryError(false),
no_thread_suspension(0),
thread_exit_check_count(0),
is_transitioning_to_runnable(false),
is_gc_marking(false),
is_deopt_check_required(false),
weak_ref_access_enabled(WeakRefAccessState::kVisiblyEnabled),
disable_thread_flip_count(0),
user_code_suspend_count(0),
force_interpreter_count(0),
make_visibly_initialized_counter(0),
define_class_counter(0),
num_name_readers(0),
shared_method_hotness(kSharedMethodHotnessThreshold)
{}
Atomic<uint32_t> state_and_flags;
static_assert(sizeof(state_and_flags) == sizeof(uint32_t), "Size of state_and_flags and uint32 are different");
// 非零值用于告诉当前线程在下一个轮询点进入安全点。
int suspend_count GUARDED_BY(Locks::thread_suspend_count_lock_);
uint32_t thin_lock_thread_id;
// 系统线程 ID。
uint32_t tid;
// 线程是否为守护线程?
const bool32_t daemon;
// 一个布尔值,是否递归抛出 OOM。
bool32_t throwing_OutOfMemoryError;
// 正值表示我们在一个不应挂起线程的区域。
uint32_t no_thread_suspension;
uint32_t thread_exit_check_count;
bool32_t is_transitioning_to_runnable;
bool32_t is_gc_marking;
bool32_t is_deopt_check_required;
// 线程的 "interrupted" 状态
Atomic<bool32_t> interrupted;
AtomicInteger park_state_;
mutable std::atomic<WeakRefAccessState> weak_ref_access_enabled;
uint32_t disable_thread_flip_count;
int user_code_suspend_count GUARDED_BY(Locks::thread_suspend_count_lock_);
uint32_t force_interpreter_count;
uint32_t make_visibly_initialized_counter;
uint32_t define_class_counter;
mutable std::atomic<uint32_t> num_name_readers;
static_assert(std::atomic<uint32_t>::is_always_lock_free);
uint32_t shared_method_hotness;
} tls32_;
上边的代码块是 tls32_ 的结构体细节,有些字段我也没有完全搞清楚用途,不过对本文的分析影响不大,我们只需要了解 state_and_flags
这个Atomic<uint32_t>
原子类型的 uint32_t
是代表线程状态的值就 ok 了。 到这里,我们可以顺藤摸瓜一下,这个state_and_flags
都有哪些含义。
ini
enum class ThreadFlag : uint32_t {
// 如果设置,表示 suspend_count_ > 0,并且线程应该进入安全点处理程序。
kSuspendRequest = 1u << 0,
// 请求线程执行一些检查点工作然后继续。
kCheckpointRequest = 1u << 1,
// 请求线程执行空检查点然后继续。
kEmptyCheckpointRequest = 1u << 2,
kActiveSuspendBarrier = 1u << 3,
// 标记该线程需要执行一个“翻转函数”。
kPendingFlipFunction = 1u << 4,
kRunningFlipFunction = 1u << 5,
kWaitingForFlipFunction = 1u << 6,
kMonitorJniEntryExit = 1u << 7,
kLastFlag = kMonitorJniEntryExit
};
ThreadFlag
是一个枚举值,内部定义了一系列对线程操作的标记,但只是标记不是立即执行。我们此文核心是kSuspendRequest
代表挂起请求,其他的我也没有深究,有兴趣可以后边看看。
-
小知识
在 Java 中,线程有多种状态,例如 NEW
、RUNNABLE
、BLOCKED
、WAITING
、TIMED_WAITING
和 TERMINATED
。JVM
被设计成可以运行在多种操作系统上,因此需要一种机制将这些 Java 线程状态映射到操作系统支持的线程状态上。 所以在 Android Art
中也需要对线程在 native
中实现(适配)Java
侧的线程状态。 在 thread_state.h 中可以看到具体的定义状态的细节。这里我创建了一个表格,来讲 Java
中的 Thread
状态与 native thread
状态的映射关系梳理了一下。
Java Thread.State | ART ThreadState | 描述 |
---|---|---|
NEW | kStarting | 本地线程已启动,但尚未准备好运行托管代码 |
RUNNABLE | kRunnable | 可运行状态 |
kNative | 在 JNI 本地方法中运行 | |
kSuspended | 被 GC 或调试器挂起 | |
BLOCKED | kBlocked | 在监视器上阻塞 |
WAITING | kWaiting | 在 Object.wait() 中 |
kWaitingForLockInflation | 阻塞在膨胀一个 thin-lock | |
kWaitingForTaskProcessor | 阻塞等待 taskProcessor | |
kWaitingForGcToComplete | 阻塞等待 GC 完成 | |
kWaitingForCheckPointsToRun | GC 等待检查点运行 | |
kWaitingPerformingGc | 执行 GC 中 | |
kWaitingForDebuggerSend | 阻塞等待事件发送 | |
kWaitingForDebuggerToAttach | 阻塞等待调试器附加 | |
kWaitingInMainDebuggerLoop | 阻塞/读取/处理调试器事件 | |
kWaitingForDebuggerSuspension | 等待调试器挂起所有线程 | |
kWaitingForJniOnLoad | 等待执行 dlopen 和 JNI on load 代码 | |
kWaitingForSignalCatcherOutput | 等待信号捕捉器 IO 完成 | |
kWaitingInMainSignalCatcherLoop | 阻塞/读取/处理信号 | |
kWaitingForDeoptimization | 等待去优化挂起所有线程 | |
kWaitingForMethodTracingStart | 等待方法跟踪开始 | |
kWaitingForVisitObjects | 等待访问对象 | |
kWaitingForGetObjectsAllocated | 等待获取已分配对象的数量 | |
kWaitingWeakGcRootRead | 等待 GC 读取弱引用根 | |
kWaitingForGcThreadFlip | 等待 GC 线程翻转(CC 收集器)完成 | |
kNativeForAbort | 检查其他线程在中止时未运行 | |
TIMED_WAITING | kTimedWaiting | 在 Object.wait() 中有超时 |
kSleeping | 在 Thread.sleep() 中 | |
TERMINATED | kTerminated | Thread.run 已返回,但 Thread* 仍然存在 |
ok,到这里基本已经分析了50%
,回到 ModifySuspendCountInternal()
这个函数,最后调用了一个函数
TriggerSuspend(); // 触发挂起操作
最初我看到这,以为整个挂起流程已经完结了,顾名思义一看到这个函数名称,以为已经触发了挂起。等我点进去找到这个类的实现,发现没有那么简单。
arduino
// Trigger a suspend check by making the suspend_trigger_ TLS value an invalid pointer.
// The next time a suspend check is done, it will load from the value at this address
// and trigger a SIGSEGV.
// Only needed if Runtime::implicit_suspend_checks_ is true and fully implemented. It currently
// is always false. Client code currently just looks at the thread flags directly to determine
// whether we should suspend, so this call is currently unnecessary.
void TriggerSuspend() {
tlsPtr_.suspend_trigger = nullptr;
}
在 Android 14
的版本中仅仅只是对tlsPtr_.suspend_trigger
设置了 nullptr
,通过注释来看可以这么理解
注释中提到,只有在 Runtime::implicit_suspend_checks_ 为 true 且完全实现时,才需要这个函数。当前的客户端代码是通过检查线程标志来确定是否需要挂起,因此这个函数在当前实现中是多余的。
所以这个函数没什么作用,在研究如何修复线程挂起超时的时候,原本想如果发生了超时时,我们阻止了 abort 信号发送,然后设想主调这个,但实际上TriggerSuspend()
函数强制挂起不可行。
到这里前置的挂起流程已结束。
挂起后置流程
探寻执行挂起点
根据前置流程的梳理,我们知道整个挂起的机制就是死循环添加挂起屏障,给tls32_
的线程本地结构体对象中的state_and_flags
设置为 ThreadFlag::kSuspendRequest
。
那么什么时候去获取这个state_and_flags
,然后读取值判断是不是ThreadFlag::kSuspendRequest
,之后真正执行挂起流程呢?
顺藤摸瓜即可,我们找一下 ThreadFlag::kSuspendRequest
都用在了哪里。
在 AOSP
官网这边只能切到main分支
才可以实现这个代码调用查找的功能,不过影响不大和 Android
其他版本基本是一致的。
第一个调用的地方就是名叫 CheckSuspend()
函数,后边我们会找一下哪里调用了这个函数。
分析一下这个函数,如果标记了ThreadFlag::kSuspendRequest
会执行 FillSuspendCheck(implicit)
这个函数,见名知意跟挂起有关系。
scss
void Thread::FullSuspendCheck(bool implicit) {
ScopedTrace trace(__FUNCTION__);
DCHECK(!ReadFlag(ThreadFlag::kSuspensionImmune));
DCHECK(this == Thread::Current());
VLOG(threads) << this << " self-suspending";
// Make thread appear suspended to other threads, release mutator_lock_.
// Transition to suspended and back to runnable, re-acquire share on mutator_lock_.
ScopedThreadSuspension(this, ThreadState::kSuspended); // NOLINT
if (implicit) {
// For implicit suspend check we want to `madvise()` away
// the alternate signal stack to avoid wasting memory.
MadviseAwayAlternateSignalStack();
}
VLOG(threads) << this << " self-reviving";
}
DCHECK(!ReadFlag(ThreadFlag::kSuspensionImmune)); 这个函数的含义是线程 A 触发挂起线程 B 的请求,线程 B 已经执行挂起,此时 A 也收到了挂起请求,此时若 B 需要唤醒,但是 A 挂起了没有机会唤醒 B。所以该标志可以防止线程在即将挂起所有其他线程时接收到挂起请求,并随后挂起自身,导致死锁。
然后调用了ScopedThreadSuspension(this, ThreadState::kSuspended); // NOLINT
scss
inline ScopedThreadSuspension::ScopedThreadSuspension(Thread* self, ThreadState suspended_state)
: self_(self), suspended_state_(suspended_state) {
DCHECK(self_ != nullptr);
self_->TransitionFromRunnableToSuspended(suspended_state);
}
inline ScopedThreadSuspension::~ScopedThreadSuspension() {
DCHECK_EQ(self_->GetState(), suspended_state_);
self_->TransitionFromSuspendedToRunnable();
}
inline
关键字作用是,当函数被声明为 inline 后,编译器会尝试在每个调用点直接插入该函数的代码,而不是进行正常的函数调用。这种方式可以避免函数调用开销,包括参数传递、返回地址保存、栈帧创建和销毁等。当inline
标记给一个 C++ 类时,会将构造函数整体的代码逻辑直接插入到调用这个构造方法的某一个函数,从而提高性能。
析构函数
是一个特殊的成员函数,在对象的生命周期结束时被自动调用,用于执行对象清理操作,如释放资源、关闭文件或网络连接、释放内存等。在 C++ 中,析构函数的名称与类名相同,但前面带有一个波浪号(~)。析构函数没有返回类型,也不接受参数。有点像Java 中的 finalized 函数用于释放资源的,析构一定会执行罢了。
ScopedThreadSuspension类
- 对象创建
触发TransitionFromRunnableToSuspended(suspended_state)
- 对象销毁
触发TransitionFromSuspendedToRunnable();
TransitionFromRunnableToSuspended
C++
inline void Thread::TransitionFromRunnableToSuspended(ThreadState new_state) {
// Note: JNI stubs inline a fast path of this method that transitions to suspended if
// there are no flags set and then clears the `held_mutexes[kMutatorLock]` (this comes
// from a specialized `BaseMutex::RegisterAsLockedImpl(., kMutatorLock)` inlined from
// the `GetMutatorLock()->TransitionFromRunnableToSuspended(this)` below).
// Therefore any code added here (other than debug build assertions) should be gated
// on some flag being set, so that the JNI stub can take the slow path to get here.
// 确保当前线程可以被挂起。这个检查确保线程在被允许的情况下才能进行挂起操作。
AssertThreadSuspensionIsAllowable();
PoisonObjectPointersIfDebug();
// 确认当前线程就是正在调用该方法的线程。
DCHECK_EQ(this, Thread::Current());
// 将线程状态更改为非可运行状态,使其在系统中看起来是挂起的。该方法还会运行检查点操作,
// 确保在挂起过程中进行必要的检查和操作。
TransitionToSuspendedAndRunCheckpoints(new_state);
// 标记释放变异锁(mutator lock)的共享部分。变异锁用于保护线程在执行某些操作时不会被其他线程干扰。
GetMutatorLock()->TransitionFromRunnableToSuspended(this);
// 一旦线程被挂起,检查是否有活动的挂起屏障(suspend barrier)。挂起屏障用于同步线程的挂起操作,
// 确保所有线程都在预期状态下进行协调。
CheckActiveSuspendBarriers();
}
总的来说就是先改变一下线程状态为挂起态。注意此时还未真正的挂起。
scss
inline void Thread::TransitionToSuspendedAndRunCheckpoints(ThreadState new_state) {
// 确保新状态不是可运行状态(kRunnable)
DCHECK_NE(new_state, ThreadState::kRunnable);
while (true) {
StateAndFlags old_state_and_flags = GetStateAndFlags(std::memory_order_relaxed);
// 确保当前状态是可运行状态(kRunnable)
DCHECK_EQ(old_state_and_flags.GetState(), ThreadState::kRunnable);
// 如果设置了检查点请求标志,则运行检查点函数并继续循环
if (UNLIKELY(old_state_and_flags.IsFlagSet(ThreadFlag::kCheckpointRequest))) {
IncrementStatsCounter(&checkpoint_count_);
RunCheckpointFunction(); // 1
continue;
}
// 如果设置了空检查点请求标志,则运行空检查点并继续循环
if (UNLIKELY(old_state_and_flags.IsFlagSet(ThreadFlag::kEmptyCheckpointRequest))) {
RunEmptyCheckpoint();
continue;
}
// 确保检查点请求标志和空检查点请求标志未设置
DCHECK(!old_state_and_flags.IsFlagSet(ThreadFlag::kCheckpointRequest));
DCHECK(!old_state_and_flags.IsFlagSet(ThreadFlag::kEmptyCheckpointRequest));
// 创建新的状态和标志对象,设置新的状态,但保留当前标志
StateAndFlags new_state_and_flags = old_state_and_flags.WithState(new_state);
// 使用比较并交换(CAS)操作更新状态和标志,确保之前的内存操作对观察到挂起的任何线程都是可见的
bool done = tls32_.state_and_flags.CompareAndSetWeakRelease(
old_state_and_flags.GetValue(), new_state_and_flags.GetValue());
// 如果CAS操作成功,增加挂起计数器并退出循环
if (LIKELY(done)) {
IncrementStatsCounter(&suspended_count_);
break;
}
}
}
还记得我们整个流程是由CheckSuspend()
函数所引发的, 代码 1 处的RunCheckpointFunction();
最终就会执行CheckSuspend()
后边我们再看,这里再说一下,不然容易晕。 所以上述代码块核心职责就是通过 CAS 修改 state_and_flags
的值为 ThreadState::kSuspended
。
TransitionFromSuspendedToRunnable
好戏开场了,修改线程的状态以后,ScopedThreadSuspension类
即将销毁,使用 inline + 析构
既保证了性能又保证了事件的一致性(先改状态再挂起)。
scss
inline ThreadState Thread::TransitionFromSuspendedToRunnable(bool fail_on_suspend_req) {
// 确保当前线程是正在执行该方法的线程。
DCHECK(this == Current());
// 获取当前的状态和标志
StateAndFlags old_state_and_flags = GetStateAndFlags(std::memory_order_relaxed);
ThreadState old_state = old_state_and_flags.GetState();
// 确保当前状态不是可运行状态。
DCHECK_NE(old_state, ThreadState::kRunnable);
while (true) {
// 确保当前线程没有设置kSuspensionImmune标志。
DCHECK(!old_state_and_flags.IsFlagSet(ThreadFlag::kSuspensionImmune));
// 确保当前线程未持有mutator锁,否则可能导致GC阻塞。
GetMutatorLock()->AssertNotHeld(this);
// ???
constexpr uint32_t kCheckedFlags =
SuspendOrCheckpointRequestFlags() |
enum_cast<uint32_t>(ThreadFlag::kActiveSuspendBarrier) |
FlipFunctionFlags();
// 如果没有挂起请求或检查点请求,则将状态原子地从挂起改为可运行。
if (LIKELY(!old_state_and_flags.IsAnyOfFlagsSet(kCheckedFlags))) {
StateAndFlags new_state_and_flags = old_state_and_flags.WithState(ThreadState::kRunnable);
if (LIKELY(tls32_.state_and_flags.CompareAndSetWeakAcquire(
old_state_and_flags.GetValue(), new_state_and_flags.GetValue()))) {
// 1
GetMutatorLock()->TransitionFromSuspendedToRunnable(this);
break;
}
}
// 如果设置了kActiveSuspendBarrier标志,则通过挂起屏障。
else if (old_state_and_flags.IsFlagSet(ThreadFlag::kActiveSuspendBarrier)) {
PassActiveSuspendBarriers();
}
// 检查是否设置了检查点请求标志或空检查点请求标志。
else if (UNLIKELY(old_state_and_flags.IsFlagSet(ThreadFlag::kCheckpointRequest) ||
old_state_and_flags.IsFlagSet(ThreadFlag::kEmptyCheckpointRequest))) {
// 如果在挂起状态时设置了检查点标志,日志记录错误信息并终止程序。
// 2
LOG(FATAL) << "Transitioning to Runnable with checkpoint flag,"
<< " flags=" << old_state_and_flags.WithState(ThreadState::kRunnable).GetValue()
<< " state=" << old_state_and_flags.GetState();
}
// 如果设置了kSuspendRequest标志,则处理挂起请求。
else if (old_state_and_flags.IsFlagSet(ThreadFlag::kSuspendRequest)) {
auto fake_mutator_locker = []() SHARED_LOCK_FUNCTION(Locks::mutator_lock_)
NO_THREAD_SAFETY_ANALYSIS {};
if (fail_on_suspend_req) {
fake_mutator_locker();
return ThreadState::kInvalidState;
}
Thread* thread_to_pass = nullptr;
if (kIsDebugBuild && !IsDaemon()) {
thread_to_pass = this;
}
MutexLock mu(thread_to_pass, *Locks::thread_suspend_count_lock_);
old_state_and_flags = GetStateAndFlags(std::memory_order_relaxed);
DCHECK_EQ(old_state, old_state_and_flags.GetState());
while (old_state_and_flags.IsFlagSet(ThreadFlag::kSuspendRequest)) {
// 真正挂起的触发
// 3
Thread::resume_cond_->Wait(thread_to_pass);
old_state_and_flags = GetStateAndFlags(std::memory_order_relaxed);
DCHECK_EQ(old_state, old_state_and_flags.GetState());
}
DCHECK_EQ(GetSuspendCount(), 0);
}
else if (UNLIKELY(old_state_and_flags.IsFlagSet(ThreadFlag::kRunningFlipFunction))) {
DCHECK(!old_state_and_flags.IsFlagSet(ThreadFlag::kPendingFlipFunction));
WaitForFlipFunction(this);
}
else if (old_state_and_flags.IsFlagSet(ThreadFlag::kPendingFlipFunction)) {
DCHECK(!old_state_and_flags.IsFlagSet(ThreadFlag::kRunningFlipFunction));
if (EnsureFlipFunctionStarted(this, this, old_state_and_flags)) {
break;
}
}
old_state_and_flags = GetStateAndFlags(std::memory_order_relaxed);
DCHECK_EQ(old_state, old_state_and_flags.GetState());
}
DCHECK_EQ(this->GetState(), ThreadState::kRunnable);
return static_cast<ThreadState>(old_state);
}
- 代码1 处,如果没有挂起等改变线程状态的请求,则直接改回 Running 状态。
- 代码2 处,如果在挂起状态时设置了检查点标志 这个是错误的一种状态也会发射 abort 信号导致应用终止。
- 代码 3处,调用了
Wait()
函数,此Wait()
非彼Java wait()
。
scss
void ConditionVariable::Wait(Thread* self) {
guard_.CheckSafeToWait(self);
// 在持有锁的情况下等待。
WaitHoldingLocks(self);
}
void ConditionVariable::WaitHoldingLocks(Thread* self) {
// 确保当前线程是正在执行该方法的线程,或者当前线程为 nullptr。
DCHECK(self == nullptr || self == Thread::Current());
// 确保当前线程持有锁。
guard_.AssertExclusiveHeld(self);
// 保存当前锁的递归计数。
unsigned int old_recursion_count = guard_.recursion_count_;
#if ART_USE_FUTEXES
// 增加等待线程的计数。
num_waiters_++;
// 确保互斥锁被争用,以便重新排队的线程被唤醒。
guard_.increment_contenders();
// 将递归计数设置为1,以便其他线程可以安全地获取锁。
guard_.recursion_count_ = 1;
// 获取当前的序列值。
int32_t cur_sequence = sequence_.load(std::memory_order_relaxed);
// 释放互斥锁,使其他线程可以继续执行。
guard_.ExclusiveUnlock(self);
// 在 Futex 上等待,直到被唤醒或超时。
if (futex(sequence_.Address(), FUTEX_WAIT_PRIVATE, cur_sequence, nullptr, nullptr, 0) != 0) {
// 如果 Futex 失败,检查是否是预期的错误。
// EAGAIN 等于 EWOULDBLK,表示调用者应该再次尝试。
// EINTR 表示该线程收到一个信号。
if ((errno != EINTR) && (errno != EAGAIN)) {
PLOG(FATAL) << "futex wait failed for " << name_;
}
}
// 如果运行时已删除,则让线程进入睡眠状态。
SleepIfRuntimeDeleted(self);
// 重新获取互斥锁。
guard_.ExclusiveLock(self);
// 确保等待线程的计数大于0。
CHECK_GT(num_waiters_, 0);
num_waiters_--;
// 确保争用者的计数大于0,并减少争用者计数。
CHECK_GT(guard_.get_contenders(), 0);
guard_.decrement_contenders();
#else
// 保存互斥锁的旧所有者线程ID。
pid_t old_owner = guard_.GetExclusiveOwnerTid();
// 清空互斥锁的所有者线程ID。
guard_.exclusive_owner_.store(0 /* pid */, std::memory_order_relaxed);
// 将递归计数重置为0。
guard_.recursion_count_ = 0;
// 使用 pthread 条件变量等待,直到被唤醒。
CHECK_MUTEX_CALL(pthread_cond_wait, (&cond_, &guard_.mutex_));
// 恢复互斥锁的所有者线程ID。
guard_.exclusive_owner_.store(old_owner, std::memory_order_relaxed);
#endif
// 恢复之前保存的递归计数。
guard_.recursion_count_ = old_recursion_count;
}
针对挂起核心方式存在两个,是否使用Linux 的 futex(),如果是则用,否则使用pthread 条件变量。
Linux#futex()
第一次看到这个 futex(),不知道什么东西,只能从先用入手。 先看一个小示例,注意这段代码只能在 Linux 系统中执行。
c
#include <linux/futex.h>
#include <sys/syscall.h>
#include <unistd.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <errno.h>
#define futex_wait(uaddr, val) syscall(SYS_futex, uaddr, FUTEX_WAIT, val, NULL, NULL, 0)
#define futex_wake(uaddr, val) syscall(SYS_futex, uaddr, FUTEX_WAKE, val, NULL, NULL, 0)
typedef struct {
uint32_t futex;
int counter;
} shared_counter_t;
shared_counter_t *counter;
// 计数器递增函数
void* increment_counter(void* arg) {
for (int i = 0; i < 1000; i++) {
while (1) {
// 尝试获取锁
if (__sync_bool_compare_and_swap(&counter->futex, 0, 1)) {
// 成功获取锁后,更新计数器
counter->counter++;
// 释放锁
counter->futex = 0;
// 唤醒等待的线程
futex_wake(&counter->futex, 1);
break;
} else {
// 等待锁可用
futex_wait(&counter->futex, 1);
}
}
}
return NULL;
}
int main() {
counter = malloc(sizeof(shared_counter_t));
counter->futex = 0;
counter->counter = 0;
pthread_t threads[10];
// 创建10个线程
for (int i = 0; i < 10; i++) {
pthread_create(&threads[i], NULL, increment_counter, NULL);
}
// 等待所有线程完成
for (int i = 0; i < 10; i++) {
pthread_join(threads[i], NULL);
}
// 输出计数器的最终值
printf("Final counter value: %d\n", counter->counter);
free(counter);
return 0;
}
上述代码实践了一下,基于futex
实现的锁线程达到线程安全计数的逻辑,不过还没有深入探究原理。
不过在我们 Art 中的唤醒流程中并不是直接显示调用的futex_wake
。 我们看一下唤醒的逻辑。 我们知道Thread.setNativeName()
函数会先挂起然后再改名字,之后再唤醒线程,我们直接看下它怎么唤醒的。
scss
static void Thread_setNativeName(JNIEnv* env, jobject peer, jstring java_name) {
ScopedUtfChars name(env, java_name);
{
ScopedObjectAccess soa(env);
if (soa.Decode<mirror::Object>(peer) == soa.Self()->GetPeer()) {
soa.Self()->SetThreadName(name.c_str());
return;
}
}
// Suspend thread to avoid it from killing itself while we set its name. We don't just hold the
// thread list lock to avoid this, as setting the thread name causes mutator to lock/unlock
// in the DDMS send code.
ThreadList* thread_list = Runtime::Current()->GetThreadList();
// Take suspend thread lock to avoid races with threads trying to suspend this one.
Thread* thread = thread_list->SuspendThreadByPeer(peer, SuspendReason::kInternal);
if (thread != nullptr) {
{
ScopedObjectAccess soa(env);
thread->SetThreadName(name.c_str());
}
bool resumed = thread_list->Resume(thread, SuspendReason::kInternal);
DCHECK(resumed);
}
}
可以看到调用Resume()函数。
rust
bool ThreadList::Resume(Thread* thread, SuspendReason reason) {
ATraceEnd();
Thread* self = Thread::Current();
DCHECK_NE(thread, self);
{
MutexLock mu(self, *Locks::thread_list_lock_);
MutexLock mu2(self, *Locks::thread_suspend_count_lock_);
if (UNLIKELY(!thread->IsSuspended())) {
return false;
}
if (!Contains(thread)) {
return false;
}
thread->DecrementSuspendCount(self, /*for_user_code=*/(reason == SuspendReason::kForUserCode));
Thread::resume_cond_->Broadcast(self);
}
VLOG(threads) << "Resume(" << reinterpret_cast<void*>(thread) << ") finished waking others";
return true;
}
可以看到调用Broadcast()函数。
scss
void ConditionVariable::Broadcast(Thread* self) {
DCHECK(self == nullptr || self == Thread::Current());
// TODO: enable below, there's a race in thread creation that causes false failures currently.
// guard_.AssertExclusiveHeld(self);
DCHECK_EQ(guard_.GetExclusiveOwnerTid(), SafeGetTid(self));
#if ART_USE_FUTEXES
RequeueWaiters(std::numeric_limits<int32_t>::max());
#else
CHECK_MUTEX_CALL(pthread_cond_broadcast, (&cond_));
#endif
}
#if ART_USE_FUTEXES
void ConditionVariable::RequeueWaiters(int32_t count) {
if (num_waiters_ > 0) {
sequence_++; // Indicate a signal occurred.
// Move waiters from the condition variable's futex to the guard's futex,
// so that they will be woken up when the mutex is released.
bool done = futex(sequence_.Address(),
FUTEX_REQUEUE_PRIVATE,
/* Threads to wake */ 0,
/* Threads to requeue*/ reinterpret_cast<const timespec*>(count),
guard_.state_and_contenders_.Address(),
0) != -1;
if (!done && errno != EAGAIN && errno != EINTR) {
PLOG(FATAL) << "futex requeue failed for " << name_;
}
}
}
#endif
可见 RequeueWaiters
最终调用了futex(FUTEX_REQUEUE_PRIVATE)
来唤醒线程的。但是一直不是很理解为什么不使用FUTEX_WAKE
进行唤醒。
Futex又是一个知识点哈哈,学习总是这样的,知识点琐碎,后续再出一个关于 Futex 原理的文章,这里暂时认为FUTEX_REQUEUE_PRIVATE
就是唤醒线程。
总结一下
通过前置与后置的流程,我们知道,前置流程仅仅是给需要挂起的线程内部的 tls 设置一个标记,等后续的 CheckSuspend
函数执行的时候,知道这个函数需要被挂起,之后的流程就会走到FillSuspendCheck()
的函数,之后就有一个被 inline
标记的 ScopedThreadSuspension 类构造方法
执行了对线程状态从 Running
-> 标记为挂起态
,之后执行了析构函数,析构函数中最终会调用一个Wait()
函数,在这个函数中最终会调用Linux#futex(FUTEX_WAIT_PRIVATE)
去挂起线程。
Art 检查点的流程
在 art/runtime/thread.h
中定义了如下的方法
csharp
// 处理挂起的线程挂起请求并处理挂起的情况
void CheckSuspend(bool implicit = false) REQUIRES_SHARED(Locks::mutator_lock_);
我们知道后置流程中的 CheckSuspend()
是挂起线程的起源,那么 CheckSuspend()
函数是什么时候被调用的呢?先别急,先看下面的例子。
我们总说触发了 GC
就会 stop the world
,让所有的线程挂起,虽然说 art 虚拟机采用了并发式 GC ,但是总的来说逻辑都差不多,然后 GC线程
去遍历GCRoot,释放掉一些不用的对象。
在上述的过程中,如果开始 GC 一定会触发线程挂起吗?肯定不是,而是要等待线程都到了安全点才会执行挂起。
同样在Android 发生了 ANR
的时候,系统会发SigQuic
信号,最后也会触发 dump 线程。
rust
void ThreadList::DumpForSigQuit(std::ostream& os) {
// ignore some code...
bool dump_native_stack = Runtime::Current()->GetDumpNativeStackOnSigQuit();
// 触发 dump
Dump(os, dump_native_stack);
DumpUnattachedThreads(os, dump_native_stack && kDumpUnattachedThreadNativeStackForSigQuit);
}
void ThreadList::Dump(std::ostream& os, bool dump_native_stack) {
// 1. 获取当前线程并锁定线程列表。
Thread* self = Thread::Current();
{
MutexLock mu(self, *Locks::thread_list_lock_);
os << "DALVIK THREADS (" << list_.size() << "):\n";
}
// 2. 检查当前线程是否有效。
if (self != nullptr) {
// 3. 创建并运行检查点,确保所有线程都在一个一致的状态。
DumpCheckpoint checkpoint(dump_native_stack);
size_t threads_running_checkpoint;
{
ScopedObjectAccess soa(self);
threads_running_checkpoint = RunCheckpoint(&checkpoint);
}
// 4. 等待所有线程运行完检查点。
if (threads_running_checkpoint != 0) {
checkpoint.WaitForThreadsToRunThroughCheckpoint(threads_running_checkpoint);
}
// 5. 转储检查点信息到输出流。
checkpoint.Dump(self, os);
} else {
// 6. 如果当前线程无效,则转储未附加的线程信息。
DumpUnattachedThreads(os, dump_native_stack);
}
}
size_t ThreadList::RunCheckpoint(Closure* checkpoint_function,
Closure* callback,
bool allow_lock_checking) {
// 获取当前线程,并确保当前线程没有持有某些锁以避免死锁。
Thread* self = Thread::Current();
Locks::mutator_lock_->AssertNotExclusiveHeld(self);
Locks::thread_list_lock_->AssertNotHeld(self);
Locks::thread_suspend_count_lock_->AssertNotHeld(self);
// 初始化变量
std::vector<Thread*> suspended_count_modified_threads;
size_t count = 0;
{
// 锁定线程列表和线程挂起计数锁。
MutexLock mu(self, *Locks::thread_list_lock_);
if (kIsDebugBuild && allow_lock_checking) {
self->DisallowPreMonitorMutexes();
}
MutexLock mu2(self, *Locks::thread_suspend_count_lock_);
count = list_.size();
// 遍历所有线程,对每个线程(除了当前线程)执行以下操作。
for (const auto& thread : list_) {
if (thread != self) {
bool requested_suspend = false;
while (true) {
// 请求线程执行检查点函数。如果成功,且之前请求了挂起,则减小挂起计数。
if (thread->RequestCheckpoint(checkpoint_function)) {
if (requested_suspend) {
thread->DecrementSuspendCount(self);
Thread::resume_cond_->Broadcast(self);
requested_suspend = false;
}
break;
} else {
// 如果检查点请求失败,且未请求挂起,则增加挂起计数。
if (!requested_suspend) {
thread->IncrementSuspendCount(self);
requested_suspend = true;
}
if (thread->IsSuspended()) {
break;
}
}
// 如果线程进入了kRunnable状态,立即重试。
}
// 记录挂起计数被修改的线程。
if (requested_suspend) {
DCHECK(thread->IsSuspended());
suspended_count_modified_threads.push_back(thread);
}
}
}
// 如果提供了回调函数,运行回调函数。
if (callback != nullptr) {
callback->Run(self);
}
}
// 在当前线程上运行检查点函数。
checkpoint_function->Run(self);
// 处理挂起的线程。
bool mutator_lock_held = Locks::mutator_lock_->IsSharedHeld(self);
bool repeat = true;
while (repeat) {
repeat = false;
for (auto& thread : suspended_count_modified_threads) {
if (thread != nullptr) {
// 确保线程处于挂起状态。
DCHECK(thread->IsSuspended());
if (mutator_lock_held) {
Thread::EnsureFlipFunctionStarted(self, thread);
if (thread->GetStateAndFlags(std::memory_order_acquire)
.IsAnyOfFlagsSet(Thread::FlipFunctionFlags())) {
repeat = true;
continue;
}
}
// 运行检查点函数。
checkpoint_function->Run(thread);
{
MutexLock mu2(self, *Locks::thread_suspend_count_lock_);
thread->DecrementSuspendCount(self);
}
// 将线程设置为nullptr,跳过已处理的线程。
thread = nullptr;
}
}
}
DCHECK(std::all_of(suspended_count_modified_threads.cbegin(),
suspended_count_modified_threads.cend(),
[](Thread* thread) { return thread == nullptr; }));
// 恢复所有可能被挂起的线程。
{
MutexLock mu2(self, *Locks::thread_suspend_count_lock_);
Thread::resume_cond_->Broadcast(self);
}
// 恢复锁检查。
if (kIsDebugBuild && allow_lock_checking) {
self->AllowPreMonitorMutexes();
}
return count;
}
在触发 dump 的过程中,要确保每个线程都能在一个一致的状态下运行检查点函数,并在必要时运行回调函数。在多线程环境中,它通过增加和减少线程的挂起计数,以及使用多个锁来确保操作的安全和一致性。
checkpoint_function->Run(thread);
这个是重点,在 GC or ANR dump
中 都会执行这个运行检查点函数。因为这个函数会间接触发 CheckSuspend
最终接应我们上文的后置挂起流程。
php
// Run a checkpoint on all threads. Return the total number of threads for which the checkpoint
// function has been or will be called.
// Running threads are not suspended but run the checkpoint inside of the suspend check. The
// return value includes already suspended threads for b/24191051. Runs or requests the
// callback, if non-null, inside the thread_list_lock critical section after determining the
// runnable/suspended states of the threads. Does not wait for completion of the checkpoint
// function in running threads. If the caller holds the mutator lock, then all instances of the
// checkpoint function are run with the mutator lock. If the caller does not hold the mutator
// lock (see mutator_gc_coord.md) then, since the checkpoint code may not acquire or release the
// mutator lock, the checkpoint will have no way to access Java data.
// TODO: Is it possible to just require the mutator lock here?
EXPORT size_t RunCheckpoint(Closure* checkpoint_function,
Closure* callback = nullptr,
bool allow_lock_checking = true)
REQUIRES(!Locks::thread_list_lock_, !Locks::thread_suspend_count_lock_);
Closure*
这个指针是什么?
在Android ART中,Closure是一种用于表示可调用对象的抽象类型(纯虚函数),类似于函数对象或回调。它封装了一段可执行的代码,可以在适当的时候调用。
arduino
class Closure {
public:
virtual ~Closure() { }
virtual void Run(Thread* self) = 0;
};
但是在GC or ANR dump
的流程中是显示的触发了检查点请求,在我们的线程挂起的分析中我们只是修改了线程标记,然后等触发 CheckSuspend()
方法时才会挂起,那么是什么时机触发的?
我直接找 CheckSuspend()
都调用在哪个位置,发现
有一个叫 AllowThreadSuspension()
的函数调用了CheckSuspend()
,我们再看下这个 AllowThreadSuspension()
在哪儿调用的。
这四个方法是存在于 art/runtime/interpreter/interpreter_switch_impl-inl.h
中,而interpreter
这个主要用来做解释器解释执行的,用于将源代码逐行翻译成可执行的机器指令,所以说在这个4个情况下,会将 CheckSuspend()
插入。
见异思迁:
art/runtime/interpreter/interpreter.cc 中还存在一段代码,关于检查 ThrowStackOverflowError的流程,也有探索价值。
函数名称 | 检查点位置 |
---|---|
HandlePendingException() | 抛出异常并处理异常的地方 |
HandleReturn(result) | 方法结束并返回结果的时候 |
HandleBranch(offset) | 处理条件语句分支时 |
RETURN_OBJECT() | 方法结束并返回对象的时候 |
所以说当上述这四个情况任意一个条件被满足,最终才会真正的挂起线程。
总结
经过上述分享我们大概对线程挂起整体的流程串在一起了,留下一个知识点是Linux#futex()
和pthread 条件变量
的挂起原理,后续再深入探究一下。挂起流程的整体细节非常多,涉及面比较广泛,如果上述有写错的地方,请及时提出我会及时思考与更正,谢谢!
评论