CEF线程模型与初始化过程详解

2023-12-28 16:06:07


上一篇说了CEF框架的消息传递,实际上这个消息传递指的是进程之间的消息传递,也就是IPC的方式在Browser,Render进程之间传递消息。

这一篇就接上篇,说说CEF的线程模型,因为在CEF框架的main entry中,主要的三个函数CefInitialize、CefRunMessageLoop和CefShutdown都和这个模型有关。

官方文档主要参考两篇:

CEF General Usage

Threading and Tasks in Chrome

Chromium Sequence

CEF进程 & 线程模型

CEF框架是基于Chromium的代码框架来实现的,所以CEF的进程与线程模型就都使用的是Chromuim的模型框架。所以,除了CEF项目的代码,还需要参考CEF LIB和chromium的代码:

CEF LIB

chromium

网上有一些使用翻译软件直接翻译的文章,不太建议看,实在是太难理解,不如直接看google的英文资料+代码,理解起来比较快。

我针对资料和代码的理解,画了一张大致的图:

几个大的框就是CEF框架拥有的几个子进程:

  • Browser进程,一般来说就是主进程。
  • Render进程,用于渲染界面
  • GPU加速进程
  • 其他进程等
  • 进程之间使用IPC通讯,就是上一篇提到的内容。

每个进程中包含的内容基本上是一样的,我图中就是以Browser进程为例。

Chromium中的线程概念

在Chromium中,有一个重要的概念:Sequence,官方也给出了一个非常好的说明文档:[Chromium Sequence]

多线程中的数据竞争,也就是线程安全问题

一个进程中的多个线程,一般来说都是共享同样的内存空间,所以需要解决线程并行执行期间,数据的准确性问题:
一般的解决办法有两类:

  • 共享内存,关键数据加锁: mutex。Must use mutexes, condvars, etc. to ensure safety
  • 线程之间通过信号量等方式通讯,相当于是线程间通讯。Must use message-passing between threads
  • chromium采用的是综合方式,但是主要还是线程间的通讯方式。Send data and tasks between threads, instead of using locks to synchronize;Locks/condition variables exist, but are rarely needed.

chromium对这个通讯方式给出了更具体的实现方式:Sequence。


简单的说,就是多个任务的一个有序集合,这个有序集合在一个虚拟的线程上运行,这个虚拟的线程可以在实际运行过程中,由不同的实际线程(Physical thread)来承载。

至于这个承载方式,chromium也给出了选择方式:


也就是一个对多对的关系,类似线程池,在chromium框架中有一个SequenceManager类,就是用来做这个东西的调度用的。

调度过程大致如下:

其他线程概念

Google的官方文档都已经一一列举出来了:

  • Task: A unit of work to be processed. Effectively a function pointer with optionally associated state. In Chrome this is base::OnceCallback and base::RepeatingCallback created via base::BindOnce and base::BindRepeating, respectively. (documentation).
    简单的说,一个task就可以对应一个任务,或者换到代码里的概念就是一个函数,这个任务或者函数需要被执行。

      void TaskA() {}
      void TaskB(int v) {}
    
      auto task_a = base::BindOnce(&TaskA);
      auto task_b = base::BindOnce(&TaskB, 42);
    
    • 从上面的代码可以看出,task就是由一个函数指针得到的。再由BindOnce或者BindRepeating来封装一次,告诉chromium,这个任务是需要被执行一次,或者是被重复执行的(官方文档说明)。
  • Task queue: A queue of tasks to be processed. 这个就是上图中CurrentDefaultHandle queue,所有的task都需要被提交到这里来统一分发。

  • Physical thread: An operating system provided thread (e.g. pthread on POSIX or CreateThread() on Windows). The Chrome cross-platform abstraction is base::PlatformThread. You should pretty much never use this directly. 操作系统层面的线程(Chromium’s abstraction over physical threads)。

  • base::Thread: A physical thread forever processing messages from a dedicated task queue until Quit(). You should pretty much never be creating your own base::Thread’s. Chromium框架中对不同平台的thread的封装,Chromium中的Thread类,一般不直接使用。

  • Thread pool: A pool of physical threads with a shared task queue. In Chrome, this is base::ThreadPoolInstance. There’s exactly one instance per Chrome process, it serves tasks posted through base/task/thread_pool.h and as such you should rarely need to use the base::ThreadPoolInstance API directly (more on posting tasks later). 线程池,每个进程都有且仅有一个线程池实例,用于处理那些可以并行处理(parallel)的任务。

  • Sequence or Virtual thread: A chrome-managed thread of execution. Like a physical thread, only one task can run on a given sequence / virtual thread at any given moment and each task sees the side-effects of the preceding tasks. Tasks are executed sequentially but may hop physical threads between each one. 就是一个用于调度,为了做load-balance的一个容器。

  • Task runner: An interface through which tasks can be posted. In Chrome this is base::TaskRunner. 任务分发器,上图中有提到,一个任务(task)必须由某种runner来发布到任务队列。

      class A {
       public:
        A() = default;
    
        void PostSomething() {
          task_runner_->PostTask(FROM_HERE, base::BindOnce(&A, &DoSomething));
        }
    
        void DoSomething() {
        }
    
       private:
        scoped_refptr<base::TaskRunner> task_runner_ =
            base::ThreadPool::CreateTaskRunner({base::TaskPriority::USER_VISIBLE});
      };
    
  • Sequenced task runner: A task runner which guarantees that tasks posted to it will run sequentially, in posted order. Each such task is guaranteed to see the side-effects of the task preceding it. Tasks posted to a sequenced task runner are typically processed by a single thread (virtual or physical). In Chrome this is base::SequencedTaskRunner which is-a base::TaskRunner. 上面任务分发器的一种,用于分发Sequenced任务,这种任务分发到任务队列后,chromium会自行进行调度执行(上图中的SequenceManager),调度器会保证这个任务会被序列执行(sequentially, in posted order)。

  • Single-thread task runner: A sequenced task runner which guarantees that all tasks will be processed by the same physical thread. In Chrome this is base::SingleThreadTaskRunner which is-a base::SequencedTaskRunner. We prefer sequences to threads whenever possible. 也是一种任务分发器,由这种分发器分发的任务,调度器会将这些任务串行的在一个固定的physical thread上执行。对应与上图中的Single Threads。

几种不同的Task Runner

  • parallel,也就是说这些线程不需要顺序执行,直接丢到线程池里运行就行了,这就对应了上图中的ThreadPool。

    class A {
     public:
      A() = default;
    
      void PostSomething() {
        task_runner_->PostTask(FROM_HERE, base::BindOnce(&A, &DoSomething));
      }
    
      void DoSomething() {
      }
    
     private:
      scoped_refptr<base::TaskRunner> task_runner_ =
          base::ThreadPool::CreateTaskRunner({base::TaskPriority::USER_VISIBLE});
    };
    
  • Sequenced task runner。

    • 发送任务到一个新的Sequence,也就是新建一个。
      scoped_refptr<SequencedTaskRunner> sequenced_task_runner =
          base::ThreadPool::CreateSequencedTaskRunner(...);
      
      // TaskB runs after TaskA completes.
      sequenced_task_runner->PostTask(FROM_HERE, base::BindOnce(&TaskA));
      sequenced_task_runner->PostTask(FROM_HERE, base::BindOnce(&TaskB));
      
    • 发送到当前执行的Sequence
      // The task will run on the current (virtual) thread's default task queue.
      base::SequencedTaskRunner::GetCurrentDefault()->PostTask(
          FROM_HERE, base::BindOnce(&Task);
      
  • Single-thread task runner

    base::PostTask(FROM_HERE, {content::BrowserThread::UI}, ...);
    
    base::CreateSingleThreadTaskRunner({content::BrowserThread::IO})
        ->PostTask(FROM_HERE, ...);
    
  • 使用Sequence Task代替锁:
    下面代码中的两个宏定义,CEF中的DCHECK后面再说:
    #define SEQUENCE_CHECKER(name) base::SequenceChecker name
    #define DETACH_FROM_SEQUENCE(name) (name).DetachFromSequence()

    class A {
     public:
      A() {
        // Do not require accesses to be on the creation sequence.
        DETACH_FROM_SEQUENCE(sequence_checker_);
      }
    
      void AddValue(int v) {
        // Check that all accesses are on the same sequence.
        DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
        values_.push_back(v);
    }
    
     private:
      SEQUENCE_CHECKER(sequence_checker_);
    
      // No lock required, because all accesses are on the
      // same sequence.
      std::vector<int> values_;
    };
    
    A a;
    scoped_refptr<SequencedTaskRunner> task_runner_for_a = ...;
    task_runner_for_a->PostTask(FROM_HERE,
                          base::BindOnce(&A::AddValue, base::Unretained(&a), 42));
    task_runner_for_a->PostTask(FROM_HERE,
                          base::BindOnce(&A::AddValue, base::Unretained(&a), 27));
    
    // Access from a different sequence causes a DCHECK failure.
    scoped_refptr<SequencedTaskRunner> other_task_runner = ...;
    other_task_runner->PostTask(FROM_HERE,
                                base::BindOnce(&A::AddValue, base::Unretained(&a), 1));
    

官方测试代码

做个记录

class MyTest : public testing::Test {
 public:
  // ...
 protected:
   base::test::TaskEnvironment task_environment_;
};

TEST_F(MyTest, FirstTest) {
  base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(FROM_HERE, base::BindOnce(&A));
  base::SequencedTaskRunner::GetCurrentDefault()->PostTask(FROM_HERE,
                                                   base::BindOnce(&B));
  base::SingleThreadTaskRunner::GetCurrentDefault()->PostDelayedTask(
      FROM_HERE, base::BindOnce(&C), base::TimeDelta::Max());

  // This runs the (SingleThread|Sequenced)TaskRunner::CurrentDefaultHandle queue until it is empty.
  // Delayed tasks are not added to the queue until they are ripe for execution.
  // Prefer explicit exit conditions to RunUntilIdle when possible:
  // bit.ly/run-until-idle-with-care2.
  base::RunLoop().RunUntilIdle();
  // A and B have been executed. C is not ripe for execution yet.

  base::RunLoop run_loop;
  base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(FROM_HERE, base::BindOnce(&D));
  base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(FROM_HERE, run_loop.QuitClosure());
  base::SingleThreadTaskRunner::GetCurrentDefault()->PostTask(FROM_HERE, base::BindOnce(&E));

  // This runs the (SingleThread|Sequenced)TaskRunner::CurrentDefaultHandle queue until QuitClosure is
  // invoked.
  run_loop.Run();
  // D and run_loop.QuitClosure() have been executed. E is still in the queue.

  // Tasks posted to thread pool run asynchronously as they are posted.
  base::ThreadPool::PostTask(FROM_HERE, {}, base::BindOnce(&F));
  auto task_runner =
      base::ThreadPool::CreateSequencedTaskRunner({});
  task_runner->PostTask(FROM_HERE, base::BindOnce(&G));

  // To block until all tasks posted to thread pool are done running:
  base::ThreadPoolInstance::Get()->FlushForTesting();
  // F and G have been executed.

  base::ThreadPool::PostTaskAndReplyWithResult(
      FROM_HERE, {}, base::BindOnce(&H), base::BindOnce(&I));

  // This runs the (SingleThread|Sequenced)TaskRunner::CurrentDefaultHandle queue until both the
  // (SingleThread|Sequenced)TaskRunner::CurrentDefaultHandle queue and the ThreadPool queue are
  // empty. Prefer explicit exit conditions to RunUntilIdle when possible:
  // bit.ly/run-until-idle-with-care2.
  task_environment_.RunUntilIdle();
  // E, H, I have been executed.
}

CEF初始化过程

在CEF框架的入口代码里,主要就是三个函数:

  CefInitialize(main_args, settings, app.get(), sandbox_info);

  // Run the CEF message loop. This will block until CefQuitMessageLoop() is
  // called.
  CefRunMessageLoop();

  // Shut down CEF.
  CefShutdown();

这三个函数初始化了chrome浏览器的基本框架。把一个exe作为一个进程来看,这三行代码就初始化了某个chrome进程的线程模型及上下文(context)。

画了一个大致的流程图:

CefInitialize函数

CefInitialize函数的实现代码在CEF LIB的context.cc文件中:
首先是对外暴露的一个全局函数:

bool CefInitialize(const CefMainArgs& args,
                   const CefSettings& settings,
                   CefRefPtr<CefApp> application,
                   void* windows_sandbox_info) {
#if BUILDFLAG(IS_WIN)
  InitInstallDetails();
  InitCrashReporter();
#endif

  // Return true if the global context already exists.
  if (g_context) {
    return true;
  }

  if (settings.size != sizeof(cef_settings_t)) {
    DCHECK(false) << "invalid CefSettings structure size";
    return false;
  }

  // Create the new global context object.
  g_context = new CefContext();

  // Initialize the global context.
  if (!g_context->Initialize(args, settings, application,
                             windows_sandbox_info)) {
    // Initialization failed. Delete the global context object.
    delete g_context;
    g_context = nullptr;
    return false;
  }

  return true;
}

CefContext

在全局函数中使用CefContext类的initialize函数:
这个类就是CEF进程框架中的上下文了。

bool CefContext::Initialize(const CefMainArgs& args,
                            const CefSettings& settings,
                            CefRefPtr<CefApp> application,
                            void* windows_sandbox_info) {
  init_thread_id_ = base::PlatformThread::CurrentId();
  settings_ = settings;
  application_ = application;

#if !(BUILDFLAG(IS_WIN) || BUILDFLAG(IS_LINUX))
  if (settings.multi_threaded_message_loop) {
    NOTIMPLEMENTED() << "multi_threaded_message_loop is not supported.";
    return false;
  }
#endif

#if BUILDFLAG(IS_WIN)
  // Signal Chrome Elf that Chrome has begun to start.
  SignalChromeElf();
#endif

  const base::FilePath& root_cache_path =
      NormalizePathAndSet(settings_.root_cache_path, "root_cache_path");
  const base::FilePath& cache_path =
      NormalizeCachePathAndSet(settings_.cache_path, root_cache_path);
  if (root_cache_path.empty() && !cache_path.empty()) {
    CefString(&settings_.root_cache_path) = cache_path.value();
  }

  // All other paths that need to be normalized.
  NormalizePathAndSet(settings_.browser_subprocess_path,
                      "browser_subprocess_path");
  NormalizePathAndSet(settings_.framework_dir_path, "framework_dir_path");
  NormalizePathAndSet(settings_.main_bundle_path, "main_bundle_path");
  NormalizePathAndSet(settings_.resources_dir_path, "resources_dir_path");
  NormalizePathAndSet(settings_.locales_dir_path, "locales_dir_path");

  browser_info_manager_.reset(new CefBrowserInfoManager);

  main_runner_.reset(new CefMainRunner(settings_.multi_threaded_message_loop,
                                       settings_.external_message_pump));
  if (!main_runner_->Initialize(
          &settings_, application, args, windows_sandbox_info, &initialized_,
          base::BindOnce(&CefContext::OnContextInitialized,
                         base::Unretained(this)))) {
    shutting_down_ = true;
    FinalizeShutdown();
    return false;
  }

  return true;
}

最后,还是使用的std::unique_ptr<CefMainRunner> main_runner_;这样一个CefMainRunner指针来初始化。

int CefMainRunner::ContentMainRun(bool* initialized,
                                  base::OnceClosure context_initialized) {
  main_delegate_->BeforeMainThreadRun(multi_threaded_message_loop_);

  int exit_code = -1;

  if (multi_threaded_message_loop_) {
    // Detach the CommandLine from the main thread so that it can be
    // attached and modified from the UI thread going forward.
    base::CommandLine::ForCurrentProcess()->DetachFromCurrentSequence();

    base::WaitableEvent uithread_startup_event(
        base::WaitableEvent::ResetPolicy::AUTOMATIC,
        base::WaitableEvent::InitialState::NOT_SIGNALED);

    if (!CreateUIThread(base::BindOnce(
            [](CefMainRunner* runner, base::WaitableEvent* event,
               int* exit_code) {
              runner->main_delegate_->BeforeUIThreadInitialize();
              *exit_code = content::ContentMainRun(runner->main_runner_.get());
              event->Signal();
            },
            base::Unretained(this), base::Unretained(&uithread_startup_event),
            base::Unretained(&exit_code)))) {
      return exit_code;
    }

    *initialized = true;

    // We need to wait until content::ContentMainRun has finished.
    uithread_startup_event.Wait();
  } else {
    *initialized = true;
    main_delegate_->BeforeUIThreadInitialize();
    exit_code = content::ContentMainRun(main_runner_.get());
  }

  if (exit_code == content::RESULT_CODE_NORMAL_EXIT) {
    // content::ContentMainRun was successful and we're not exiting early.
    if (CEF_CURRENTLY_ON_UIT()) {
      OnContextInitialized(std::move(context_initialized));
    } else {
      // Continue initialization on the UI thread.
      CEF_POST_TASK(CEF_UIT,
                    base::BindOnce(&CefMainRunner::OnContextInitialized,
                                   base::Unretained(this),
                                   std::move(context_initialized)));
    }
  }

  return exit_code;
}
  • main_delegate_是线程创建后处理消息的两个runner之一:AlloyMainRunnerDelegate或者ChromeMainRunnerDelegate,通过MakeDelegate生成,管理线程的状态等:
    std::unique_ptr<CefMainRunnerDelegate> MakeDelegate(
        RuntimeType type,
        CefMainRunnerHandler* runner,
        CefSettings* settings,
        CefRefPtr<CefApp> application) {
      if (type == RuntimeType::ALLOY) {
        g_runtime_type = RuntimeType::ALLOY;
        return std::make_unique<AlloyMainRunnerDelegate>(runner, settings,
                                                         application);
      } else {
        g_runtime_type = RuntimeType::CHROME;
        return std::make_unique<ChromeMainRunnerDelegate>(runner, settings,
                                                          application);
      }
    }
    
  • 如果命令行中的multi_threaded_message_loop为true,则在这个主线程外通过CreateUIThread函数单独创建一个UI线程。
    bool CefMainRunner::CreateUIThread(base::OnceClosure setup_callback) {
      DCHECK(!ui_thread_);
    
      ui_thread_.reset(new CefUIThread(this, std::move(setup_callback)));
      ui_thread_->Start();
      ui_thread_->WaitUntilThreadStarted();
    
      if (external_message_pump_) {
        InitExternalMessagePumpFactoryForUI();
      }
      return true;
    }
    
  • 如果multi_threaded_message_loop为false,则不创建。两者都通过main_delegate_->BeforeUIThreadInitialize的方式来初始化一些内容。
  • 线程创建成功后,判断下当前是不是就是UI线程(CEF_CURRENTLY_ON_UIT()),如果是,则通过Context传下来的一个回调函数(context_initialized),初始化Context上下文的内容。
  • 如果不是UI线程,则通过CEF_POST_TASK,也就是上面提到的POST_TASK方法向UI发送一个一次性执行的task。

CefRunMessageLoop

CefRunMessageLoop的调用流程和CefInitialize是一样的,最终是在CefMainRunner类中的RunMessageLoop方法:

void CefMainRunner::RunMessageLoop() {
  base::RunLoop run_loop;

  DCHECK(quit_callback_.is_null());
  quit_callback_ = run_loop.QuitClosure();

  main_delegate_->BeforeMainMessageLoopRun(&run_loop);

  // Blocks until QuitMessageLoop() is called.
  run_loop.Run();
}

就是使用上文中提到的RunLoop,启动各个线程从task queue中获取task,相当于按下传送带的启动按钮。

在管理代理类中,把这个run_loop对象放进去:main_delegate_->BeforeMainMessageLoopRun(&run_loop); 管理代理在shutdown的时候用得上。

CefShutdown

CefShutdown也是同样的逻辑,在CefMainRunner类中的Shutdown方法:

void CefMainRunner::Shutdown(base::OnceClosure shutdown_on_ui_thread,
                             base::OnceClosure finalize_shutdown) {
  if (multi_threaded_message_loop_) {
    // Start shutdown on the UI thread. This is guaranteed to run before the
    // thread RunLoop has stopped.
    CEF_POST_TASK(CEF_UIT,
                  base::BindOnce(&CefMainRunner::StartShutdownOnUIThread,
                                 base::Unretained(this),
                                 std::move(shutdown_on_ui_thread)));

    // Finish shutdown on the UI thread after the thread RunLoop has stopped and
    // before running exit callbacks.
    ui_thread_->set_shutdown_callback(base::BindOnce(
        &CefMainRunner::FinishShutdownOnUIThread, base::Unretained(this)));

    // Blocks until the thread has stopped.
    ui_thread_->Stop();
    ui_thread_.reset();
  }

  main_delegate_->BeforeMainThreadShutdown();

  if (!multi_threaded_message_loop_) {
    // Main thread and UI thread are the same.
    StartShutdownOnUIThread(std::move(shutdown_on_ui_thread));

    browser_runner_->Shutdown();
    browser_runner_.reset();

    FinishShutdownOnUIThread();
  }

文章来源:https://blog.csdn.net/pcgamer/article/details/135261664
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。