workflow系列教程(4)Parallel并联任务流

2023-12-17 20:33:47

往期教程

如果觉得写的可以,请给一个点赞+关注支持一下

观看之前请先看,往期的博客教程,否则这篇博客没办法看懂

简介

上一篇博客中讲了串行任务流,有了串行,那必然也有并行,本篇博客讲解任务流并行执行

创建一个任务流序列

  • SubTask是所有任务的基类
    • first参数是这个任务流序列执行的首个任务
  • callback是这个序列执行完毕后执行的回调函数
using series_callback_t = std::function<void (const SeriesWork *)>;

inline SeriesWork *
Workflow::create_series_work(SubTask *first, series_callback_t callback)
{
	return new SeriesWork(first, std::move(callback));
}

创建并行任务流序列

  • 参数callback是设置并行任务流中所有任务执行完毕之后调用的回调函数
  • 函数返回ParallelWork *并行任务指针
using parallel_callback_t = std::function<void (const ParallelWork *)>;

inline ParallelWork *
Workflow::create_parallel_work(parallel_callback_t callback)
{
	return new ParallelWork(std::move(callback));
}
向并行任务流中添加一个任物流序列
class ParallelWork{
public:
	void add_series(SeriesWork *series);
}

代码示例

图示流程

无标题

  • 首先创建一个并行任务流序列,随后创建n个任务流序列,每个任务流序列添加一个http任务,
  • 先并行执行每个序列的http任务基本工作,随后调用设置的httpCallback异步回调函数,httpCallback执行完毕后调用所在序列的序列回调函数,当所有的序列回调函数执行完毕之后在执行并行任务流的parallelCallback回调函数
#include <vector>
#include <workflow/WFFacilities.h>
#include <workflow/Workflow.h>
#include <workflow/HttpUtil.h>
struct SeriesContext{
    std::string url;
    int state;
    int error;
    protocol::HttpResponse resp;//响应报文的完整内容
};
void parallelCallback(const ParallelWork *pwork){
    fprintf(stderr,"pwork callback!\n");
    SeriesContext *context;
    for(size_t i = 0; i != pwork->size(); ++i){
        context = static_cast<SeriesContext *>(pwork->series_at(i)->get_context());
        fprintf(stderr,"url = %s\n", context->url.c_str());
        if(context->state == WFT_STATE_SUCCESS){
            const void *body;
            size_t size;
            context->resp.get_parsed_body(&body,&size);
            fwrite(body,1,size,stderr); 
            fprintf(stderr,"\n");
        }
        else{
            fprintf(stderr,"Error, state = %d, error = %d\n", context->state, context->error);
        }
        delete context;
    }
}
void httpCallback(WFHttpTask *httpTask){
    SeriesContext *context = static_cast<SeriesContext *>(series_of(httpTask)->get_context());
    fprintf(stderr,"httpTask callback, url = %s\n", context->url.c_str());
    context->state = httpTask->get_state();
    context->error = httpTask->get_error();
    context->resp = std::move(*httpTask->get_resp());
}
int main(){
    //使用工厂函数,创建一个并行任务
    ParallelWork *pwork = Workflow::create_parallel_work(parallelCallback);//Workflow::create_parallel_work

    std::vector<std::string> urlVec ={"http://192.168.135.129:81", "http://192.168.135.129","http://47.94.147.94"};
    for(size_t i = 0; i != urlVec.size() ; ++i){
        //创建若干个任务
        // WFTaskFactory::create_http_task
        std::string url = urlVec[i];
        auto httpTask = WFTaskFactory::create_http_task(url,0,5,httpCallback);

        // 修改任务的属性
        auto req = httpTask->get_req();
        req->add_header_pair("Accept","*/*");
        req->add_header_pair("User-Agent","myHttpTask");
        req->set_header_pair("Connection", "Close");
        //为响应的内容申请一片堆空间
        SeriesContext *context = new SeriesContext;
        context->url = std::move(url);
        // 为每个任务创建一个序列
        auto series = Workflow::create_series_work(httpTask,nullptr);
        // 把存储响应内容的指针 拷贝到序列的context当中。
        series->set_context(context);
        //把序列加入到并行任务中
        // add_series
        pwork->add_series(series);
    }
    pwork->start();//启动并行任务
}

文章来源:https://blog.csdn.net/weixin_50448879/article/details/135049102
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。