CEF线程模型与初始化过程详解
文章目录
上一篇说了CEF框架的消息传递,实际上这个消息传递指的是进程之间的消息传递,也就是IPC的方式在Browser,Render进程之间传递消息。
这一篇就接上篇,说说CEF的线程模型,因为在CEF框架的main entry中,主要的三个函数CefInitialize、CefRunMessageLoop和CefShutdown都和这个模型有关。
官方文档主要参考两篇:
CEF进程 & 线程模型
CEF框架是基于Chromium的代码框架来实现的,所以CEF的进程与线程模型就都使用的是Chromuim的模型框架。所以,除了CEF项目的代码,还需要参考CEF LIB和chromium的代码:
网上有一些使用翻译软件直接翻译的文章,不太建议看,实在是太难理解,不如直接看google的英文资料+代码,理解起来比较快。
我针对资料和代码的理解,画了一张大致的图:
几个大的框就是CEF框架拥有的几个子进程:
- Browser进程,一般来说就是主进程。
- Render进程,用于渲染界面
- GPU加速进程
- 其他进程等
- 进程之间使用IPC通讯,就是上一篇提到的内容。
每个进程中包含的内容基本上是一样的,我图中就是以Browser进程为例。
Chromium中的线程概念
在Chromium中,有一个重要的概念:Sequence,官方也给出了一个非常好的说明文档:[Chromium Sequence]
多线程中的数据竞争,也就是线程安全问题
一个进程中的多个线程,一般来说都是共享同样的内存空间,所以需要解决线程并行执行期间,数据的准确性问题:
一般的解决办法有两类:
- 共享内存,关键数据加锁: mutex。Must use mutexes, condvars, etc. to ensure safety
- 线程之间通过信号量等方式通讯,相当于是线程间通讯。Must use message-passing between threads
- chromium采用的是综合方式,但是主要还是线程间的通讯方式。Send data and tasks between threads, instead of using locks to synchronize;Locks/condition variables exist, but are rarely needed.
chromium对这个通讯方式给出了更具体的实现方式:Sequence。
简单的说,就是多个任务的一个有序集合,这个有序集合在一个虚拟的线程上运行,这个虚拟的线程可以在实际运行过程中,由不同的实际线程(Physical thread)来承载。
至于这个承载方式,chromium也给出了选择方式:
也就是一个对多对的关系,类似线程池,在chromium框架中有一个SequenceManager类,就是用来做这个东西的调度用的。
调度过程大致如下:
其他线程概念
Google的官方文档都已经一一列举出来了:
-
Task: A unit of work to be processed. Effectively a function pointer with optionally associated state. In Chrome this is base::OnceCallback and base::RepeatingCallback created via base::BindOnce and base::BindRepeating, respectively. (documentation).
简单的说,一个task就可以对应一个任务,或者换到代码里的概念就是一个函数,这个任务或者函数需要被执行。void TaskA() {} void TaskB(int v) {} auto task_a = base::BindOnce(&TaskA); auto task_b = base::BindOnce(&TaskB, 42);
- 从上面的代码可以看出,task就是由一个函数指针得到的。再由BindOnce或者BindRepeating来封装一次,告诉chromium,这个任务是需要被执行一次,或者是被重复执行的(官方文档说明)。
-
Task queue: A queue of tasks to be processed. 这个就是上图中CurrentDefaultHandle queue,所有的task都需要被提交到这里来统一分发。
-
Physical thread: An operating system provided thread (e.g. pthread on POSIX or CreateThread() on Windows). The Chrome cross-platform abstraction is base::PlatformThread. You should pretty much never use this directly. 操作系统层面的线程(Chromium’s abstraction over physical threads)。
-
base::Thread: A physical thread forever processing messages from a dedicated task queue until Quit(). You should pretty much never be creating your own base::Thread’s. Chromium框架中对不同平台的thread的封装,Chromium中的Thread类,一般不直接使用。
-
Thread pool: A pool of physical threads with a shared task queue. In Chrome, this is base::ThreadPoolInstance. There’s exactly one instance per Chrome process, it serves tasks posted through base/task/thread_pool.h and as such you should rarely need to use the base::ThreadPoolInstance API directly (more on posting tasks later). 线程池,每个进程都有且仅有一个线程池实例,用于处理那些可以并行处理(parallel)的任务。
-
Sequence or Virtual thread: A chrome-managed thread of execution. Like a physical thread, only one task can run on a given sequence / virtual thread at any given moment and each task sees the side-effects of the preceding tasks. Tasks are executed sequentially but may hop physical threads between each one. 就是一个用于调度,为了做load-balance的一个容器。
-
Task runner: An interface through which tasks can be posted. In Chrome this is base::TaskRunner. 任务分发器,上图中有提到,一个任务(task)必须由某种runner来发布到任务队列。
class A { public: A() = default; void PostSomething() { task_runner_->PostTask(FROM_HERE, base::BindOnce(&A, &DoSomething)); } void DoSomething() { } private: scoped_refptr<base::TaskRunner> task_runner_ = base::ThreadPool::CreateTaskRunner({base::TaskPriority::USER_VISIBLE}); };
-
Sequenced task runner: A task runner which guarantees that tasks posted to it will run sequentially, in posted order. Each such task is guaranteed to see the side-effects of the task preceding it. Tasks posted to a sequenced task runner are typically processed by a single thread (virtual or physical). In Chrome this is base::SequencedTaskRunner which is-a base::TaskRunner. 上面任务分发器的一种,用于分发Sequenced任务,这种任务分发到任务队列后,chromium会自行进行调度执行(上图中的SequenceManager),调度器会保证这个任务会被序列执行(sequentially, in posted order)。
-
Single-thread task runner: A sequenced task runner which guarantees that all tasks will be processed by the same physical thread. In Chrome this is base::SingleThreadTaskRunner which is-a base::SequencedTaskRunner. We prefer sequences to threads whenever possible. 也是一种任务分发器,由这种分发器分发的任务,调度器会将这些任务串行的在一个固定的physical thread上执行。对应与上图中的Single Threads。
几种不同的Task Runner
-
parallel,也就是说这些线程不需要顺序执行,直接丢到线程池里运行就行了,这就对应了上图中的ThreadPool。
class A { public: A() = default; void PostSomething() { task_runner_->PostTask(FROM_HERE, base::BindOnce(&A, &DoSomething)); } void DoSomething() { } private: scoped_refptr<base::TaskRunner> task_runner_ = base::ThreadPool::CreateTaskRunner({base::TaskPriority::USER_VISIBLE}); };
-
Sequenced task runner。
- 发送任务到一个新的Sequence,也就是新建一个。
scoped_refptr<SequencedTaskRunner> sequenced_task_runner = base::ThreadPool::CreateSequencedTaskRunner(...); // TaskB runs after TaskA completes. sequenced_task_runner->PostTask(FROM_HERE, base::BindOnce(&TaskA)); sequenced_task_runner->PostTask(FROM_HERE, base::BindOnce(&TaskB));
- 发送到当前执行的Sequence
// The task will run on the current (virtual) thread's default task queue. base::SequencedTaskRunner::GetCurrentDefault()->PostTask( FROM_HERE, base::BindOnce(&Task);
- 发送任务到一个新的Sequence,也就是新建一个。
-
Single-thread task runner
base::PostTask(FROM_HERE, {content::BrowserThread::UI}, ...); base::CreateSingleThreadTaskRunner({content::BrowserThread::IO}) ->PostTask(FROM_HERE, ...);
-
使用Sequence Task代替锁:
下面代码中的两个宏定义,CEF中的DCHECK后面再说:
#define SEQUENCE_CHECKER(name) base::SequenceChecker name
#define DETACH_FROM_SEQUENCE(name) (name).DetachFromSequence()class A { public: A() { // Do not require accesses to be on the creation sequence. DETACH_FROM_SEQUENCE(sequence_checker_); } void AddValue(int v) { // Check that all accesses are on the same sequence. DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); values_.push_back(v); } private: SEQUENCE_CHECKER(sequence_checker_); // No lock required, because all accesses are on the // same sequence. std::vector<int> values_; }; A a; scoped_refptr<SequencedTaskRunner> task_runner_for_a = ...; task_runner_for_a->PostTask(FROM_HERE, base::BindOnce(&A::AddValue, base::Unretained(&a), 42)); task_runner_for_a->PostTask(FROM_HERE, base::BindOnce(&A::AddValue, base::Unretained(&a), 27)); // Access from a different sequence causes a DCHECK failure. scoped_refptr<SequencedTaskRunner> other_task_runner = ...; other_task_runner->PostTask(FROM_HERE, base::BindOnce(&A::AddValue, base::Unretained(&a), 1));
官方测试代码
做个记录
class MyTest : public testing::Test {
public:
// ...
protected:
base::test::TaskEnvironment task_environment_;
};
TEST_F(MyTest, FirstTest) {
base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(FROM_HERE, base::BindOnce(&A));
base::SequencedTaskRunner::GetCurrentDefault()->PostTask(FROM_HERE,
base::BindOnce(&B));
base::SingleThreadTaskRunner::GetCurrentDefault()->PostDelayedTask(
FROM_HERE, base::BindOnce(&C), base::TimeDelta::Max());
// This runs the (SingleThread|Sequenced)TaskRunner::CurrentDefaultHandle queue until it is empty.
// Delayed tasks are not added to the queue until they are ripe for execution.
// Prefer explicit exit conditions to RunUntilIdle when possible:
// bit.ly/run-until-idle-with-care2.
base::RunLoop().RunUntilIdle();
// A and B have been executed. C is not ripe for execution yet.
base::RunLoop run_loop;
base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(FROM_HERE, base::BindOnce(&D));
base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(FROM_HERE, run_loop.QuitClosure());
base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(FROM_HERE, base::BindOnce(&E));
// This runs the (SingleThread|Sequenced)TaskRunner::CurrentDefaultHandle queue until QuitClosure is
// invoked.
run_loop.Run();
// D and run_loop.QuitClosure() have been executed. E is still in the queue.
// Tasks posted to thread pool run asynchronously as they are posted.
base::ThreadPool::PostTask(FROM_HERE, {}, base::BindOnce(&F));
auto task_runner =
base::ThreadPool::CreateSequencedTaskRunner({});
task_runner->PostTask(FROM_HERE, base::BindOnce(&G));
// To block until all tasks posted to thread pool are done running:
base::ThreadPoolInstance::Get()->FlushForTesting();
// F and G have been executed.
base::ThreadPool::PostTaskAndReplyWithResult(
FROM_HERE, {}, base::BindOnce(&H), base::BindOnce(&I));
// This runs the (SingleThread|Sequenced)TaskRunner::CurrentDefaultHandle queue until both the
// (SingleThread|Sequenced)TaskRunner::CurrentDefaultHandle queue and the ThreadPool queue are
// empty. Prefer explicit exit conditions to RunUntilIdle when possible:
// bit.ly/run-until-idle-with-care2.
task_environment_.RunUntilIdle();
// E, H, I have been executed.
}
CEF初始化过程
在CEF框架的入口代码里,主要就是三个函数:
CefInitialize(main_args, settings, app.get(), sandbox_info);
// Run the CEF message loop. This will block until CefQuitMessageLoop() is
// called.
CefRunMessageLoop();
// Shut down CEF.
CefShutdown();
这三个函数初始化了chrome浏览器的基本框架。把一个exe作为一个进程来看,这三行代码就初始化了某个chrome进程的线程模型及上下文(context)。
画了一个大致的流程图:
CefInitialize函数
CefInitialize函数的实现代码在CEF LIB的context.cc文件中:
首先是对外暴露的一个全局函数:
bool CefInitialize(const CefMainArgs& args,
const CefSettings& settings,
CefRefPtr<CefApp> application,
void* windows_sandbox_info) {
#if BUILDFLAG(IS_WIN)
InitInstallDetails();
InitCrashReporter();
#endif
// Return true if the global context already exists.
if (g_context) {
return true;
}
if (settings.size != sizeof(cef_settings_t)) {
DCHECK(false) << "invalid CefSettings structure size";
return false;
}
// Create the new global context object.
g_context = new CefContext();
// Initialize the global context.
if (!g_context->Initialize(args, settings, application,
windows_sandbox_info)) {
// Initialization failed. Delete the global context object.
delete g_context;
g_context = nullptr;
return false;
}
return true;
}
CefContext
在全局函数中使用CefContext类的initialize函数:
这个类就是CEF进程框架中的上下文了。
bool CefContext::Initialize(const CefMainArgs& args,
const CefSettings& settings,
CefRefPtr<CefApp> application,
void* windows_sandbox_info) {
init_thread_id_ = base::PlatformThread::CurrentId();
settings_ = settings;
application_ = application;
#if !(BUILDFLAG(IS_WIN) || BUILDFLAG(IS_LINUX))
if (settings.multi_threaded_message_loop) {
NOTIMPLEMENTED() << "multi_threaded_message_loop is not supported.";
return false;
}
#endif
#if BUILDFLAG(IS_WIN)
// Signal Chrome Elf that Chrome has begun to start.
SignalChromeElf();
#endif
const base::FilePath& root_cache_path =
NormalizePathAndSet(settings_.root_cache_path, "root_cache_path");
const base::FilePath& cache_path =
NormalizeCachePathAndSet(settings_.cache_path, root_cache_path);
if (root_cache_path.empty() && !cache_path.empty()) {
CefString(&settings_.root_cache_path) = cache_path.value();
}
// All other paths that need to be normalized.
NormalizePathAndSet(settings_.browser_subprocess_path,
"browser_subprocess_path");
NormalizePathAndSet(settings_.framework_dir_path, "framework_dir_path");
NormalizePathAndSet(settings_.main_bundle_path, "main_bundle_path");
NormalizePathAndSet(settings_.resources_dir_path, "resources_dir_path");
NormalizePathAndSet(settings_.locales_dir_path, "locales_dir_path");
browser_info_manager_.reset(new CefBrowserInfoManager);
main_runner_.reset(new CefMainRunner(settings_.multi_threaded_message_loop,
settings_.external_message_pump));
if (!main_runner_->Initialize(
&settings_, application, args, windows_sandbox_info, &initialized_,
base::BindOnce(&CefContext::OnContextInitialized,
base::Unretained(this)))) {
shutting_down_ = true;
FinalizeShutdown();
return false;
}
return true;
}
最后,还是使用的std::unique_ptr<CefMainRunner> main_runner_;
这样一个CefMainRunner指针来初始化。
int CefMainRunner::ContentMainRun(bool* initialized,
base::OnceClosure context_initialized) {
main_delegate_->BeforeMainThreadRun(multi_threaded_message_loop_);
int exit_code = -1;
if (multi_threaded_message_loop_) {
// Detach the CommandLine from the main thread so that it can be
// attached and modified from the UI thread going forward.
base::CommandLine::ForCurrentProcess()->DetachFromCurrentSequence();
base::WaitableEvent uithread_startup_event(
base::WaitableEvent::ResetPolicy::AUTOMATIC,
base::WaitableEvent::InitialState::NOT_SIGNALED);
if (!CreateUIThread(base::BindOnce(
[](CefMainRunner* runner, base::WaitableEvent* event,
int* exit_code) {
runner->main_delegate_->BeforeUIThreadInitialize();
*exit_code = content::ContentMainRun(runner->main_runner_.get());
event->Signal();
},
base::Unretained(this), base::Unretained(&uithread_startup_event),
base::Unretained(&exit_code)))) {
return exit_code;
}
*initialized = true;
// We need to wait until content::ContentMainRun has finished.
uithread_startup_event.Wait();
} else {
*initialized = true;
main_delegate_->BeforeUIThreadInitialize();
exit_code = content::ContentMainRun(main_runner_.get());
}
if (exit_code == content::RESULT_CODE_NORMAL_EXIT) {
// content::ContentMainRun was successful and we're not exiting early.
if (CEF_CURRENTLY_ON_UIT()) {
OnContextInitialized(std::move(context_initialized));
} else {
// Continue initialization on the UI thread.
CEF_POST_TASK(CEF_UIT,
base::BindOnce(&CefMainRunner::OnContextInitialized,
base::Unretained(this),
std::move(context_initialized)));
}
}
return exit_code;
}
- main_delegate_是线程创建后处理消息的两个runner之一:AlloyMainRunnerDelegate或者ChromeMainRunnerDelegate,通过MakeDelegate生成,管理线程的状态等:
std::unique_ptr<CefMainRunnerDelegate> MakeDelegate( RuntimeType type, CefMainRunnerHandler* runner, CefSettings* settings, CefRefPtr<CefApp> application) { if (type == RuntimeType::ALLOY) { g_runtime_type = RuntimeType::ALLOY; return std::make_unique<AlloyMainRunnerDelegate>(runner, settings, application); } else { g_runtime_type = RuntimeType::CHROME; return std::make_unique<ChromeMainRunnerDelegate>(runner, settings, application); } }
- 如果命令行中的multi_threaded_message_loop为true,则在这个主线程外通过CreateUIThread函数单独创建一个UI线程。
bool CefMainRunner::CreateUIThread(base::OnceClosure setup_callback) { DCHECK(!ui_thread_); ui_thread_.reset(new CefUIThread(this, std::move(setup_callback))); ui_thread_->Start(); ui_thread_->WaitUntilThreadStarted(); if (external_message_pump_) { InitExternalMessagePumpFactoryForUI(); } return true; }
- 如果multi_threaded_message_loop为false,则不创建。两者都通过main_delegate_->BeforeUIThreadInitialize的方式来初始化一些内容。
- 线程创建成功后,判断下当前是不是就是UI线程(CEF_CURRENTLY_ON_UIT()),如果是,则通过Context传下来的一个回调函数(context_initialized),初始化Context上下文的内容。
- 如果不是UI线程,则通过CEF_POST_TASK,也就是上面提到的POST_TASK方法向UI发送一个一次性执行的task。
CefRunMessageLoop
CefRunMessageLoop的调用流程和CefInitialize是一样的,最终是在CefMainRunner类中的RunMessageLoop方法:
void CefMainRunner::RunMessageLoop() {
base::RunLoop run_loop;
DCHECK(quit_callback_.is_null());
quit_callback_ = run_loop.QuitClosure();
main_delegate_->BeforeMainMessageLoopRun(&run_loop);
// Blocks until QuitMessageLoop() is called.
run_loop.Run();
}
就是使用上文中提到的RunLoop,启动各个线程从task queue中获取task,相当于按下传送带的启动按钮。
在管理代理类中,把这个run_loop对象放进去:main_delegate_->BeforeMainMessageLoopRun(&run_loop);
管理代理在shutdown的时候用得上。
CefShutdown
CefShutdown也是同样的逻辑,在CefMainRunner类中的Shutdown方法:
void CefMainRunner::Shutdown(base::OnceClosure shutdown_on_ui_thread,
base::OnceClosure finalize_shutdown) {
if (multi_threaded_message_loop_) {
// Start shutdown on the UI thread. This is guaranteed to run before the
// thread RunLoop has stopped.
CEF_POST_TASK(CEF_UIT,
base::BindOnce(&CefMainRunner::StartShutdownOnUIThread,
base::Unretained(this),
std::move(shutdown_on_ui_thread)));
// Finish shutdown on the UI thread after the thread RunLoop has stopped and
// before running exit callbacks.
ui_thread_->set_shutdown_callback(base::BindOnce(
&CefMainRunner::FinishShutdownOnUIThread, base::Unretained(this)));
// Blocks until the thread has stopped.
ui_thread_->Stop();
ui_thread_.reset();
}
main_delegate_->BeforeMainThreadShutdown();
if (!multi_threaded_message_loop_) {
// Main thread and UI thread are the same.
StartShutdownOnUIThread(std::move(shutdown_on_ui_thread));
browser_runner_->Shutdown();
browser_runner_.reset();
FinishShutdownOnUIThread();
}
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!