RocketMQ Streams详解

2023-12-13 05:58:40

一、RocketMQ Streams 概览

RocketMQ Streams是基于RocketMQ的轻量级流计算引擎。能以SDK方式被应用依赖,无须部署复杂的流计算服务端即可获得流计算能力。 因此具有资源消耗少、扩展性好、支持流计算算子丰富的特点。

1、整体架构?

总体架构

数据从RocketMQ中被RocketMQ-streams消费,经过处理最终被写回到RocketMQ。

总体架构

数据被RocketMQ Consumer消费,进入处理拓扑被算子处理,如果流处理任务中含有算子keyBy,则需要将数据按照Key进行分组,将分组数据写入shuffle topic。后续算子从 shuffle topic消费。如果还涉及count之类有状态算子,那么计算时需要读写state topic,计算结束后,将结果写回到RocketMQ中。

2、消费模型?

img_2.png

计算实例实质上是依赖了Rocket-streams SDK的client,因此,计算实例消费的MQ依赖RocketMQ rebalance分配, 计算实例总个数也不能大于消费总MQ个数,否则将有部分计算实例处于等待状态,消费不到数据。

一个计算实例可以消费多个MQ,一个实例内也只有一张计算拓扑图。

3、状态?

img_3.png

对于有状态算子,比如count,需要先对count算子进行分组,然后才能求和。分组算子keyBy会将数据按照分组的key重新写回RocketMQ,并且使相同key写入同一分区(这一过程称作shuffle), 保证这个含有相同key的数据被同一个消费者消费。 状态本地依赖RocksDB加速读取,远程依赖RocketMQ做持久化。

4、扩缩容?

img.png

当计算实例从3个缩容到2个,借助于RocketMQ集群消费模式下的rebalance功能,被消费的分片MQ会在计算实例之间重新分配。Instance1上消费的MQ2和MQ3被分配到Instance2和Instance3上, 这两个MQ的状态数据也需要迁移到Instance2和Instance3上,这也暗示,状态数据是根据源数据分片MQ保存的;扩容则是刚好相反的过程。

二、RocketMQ Streams 核心概念

1、领域模型?

StreamBuilder?

img_2.png

  • 一个StreamBuilder实例,有1到N个pipeline,pipeline表示一个数据处理路径;
  • 一个pipeline可以含有1到N个处理节点GroupNode;
  • 一个StreamBuilder实例,有一个TopologyBuilder,TopologyBuilder可构建出数据处理器processor;
  • 一个JobId对应一个StreamBuilder实例。

RocketMQStream?

img_2.png

  • 一个RocketMQStream实例,有一个拓扑构建器TopologyBuilder;
  • 一个RocketMQStream实例,可实例化1到N个worker线程;
  • 每个线程WorkerThread实例,包含一个engine;
  • 一个engine包含执行数据处理的所有逻辑,包含一个consumer实例、一个producer实例、一个StateStore实例;

流处理实例?

流处理实例表示一个运行RocketMQ Streams的进程;

  • 一个流处理实例包含一个StreamBuilder,一个RocketMQStream,一个拓扑图,一到多个pipeline;

2、StreamBuilder?

  • StreamBuilder(jobId)?构建实例;
  • <OUT> RStream<OUT> source(topicName, deserializer)?定义source topic 和反序列化方式;

3、RStream?

  • <K> GroupedStream<K, T> keyBy(selectAction)?按照特定字段分组;
  • <O> RStream<O> map(mapperAction)?对数据进行一对一转化;
  • RStream<T> filter(predictor)?对数据进行过滤
  • <VR> RStream<T> flatMap(mapper)对数据进行一对多转化;
  • <T2> JoinedStream<T, T2> join(rightStream)?双流Join;
  • sink(topicName, serializer)?将结果输出到特定topic;

3、GroupedStream?

对含有相同Key的数据进行操作

  • <OUT> GroupedStream<K, Integer> count(selectAction)?统计含有某个字段数据的个数;
  • GroupedStream<K, V> min(selectAction)?对某个字段统计最小值;
  • GroupedStream<K, V> max(selectAction)?对某个字段统计最大值;
  • GroupedStream<K, ? extends Number> sum(selectAction)?对某个字段统计和;
  • GroupedStream<K, V> filter(predictor)?对某个字段进行过滤;
  • <OUT> GroupedStream<K, OUT> map(valueMapperAction)?对数据进行一对一转化;
  • <OUT> GroupedStream<K, OUT> aggregate(accumulator)?对数据进行聚合操作,且聚合支持二阶聚合,例如在窗口未触发时添加数据,在窗口触发时计算结果这类算子;
  • WindowStream<K, V> window(windowInfo)?对窗口划定window;
  • GroupedStream<K, V> addGraphNode(name, supplier)?底层接口,向流处理拓扑中增加自定义算子;
  • RStream<V> toRStream()?转化为RStream,只是在接口形式上转化,对数据无任何操作;
  • sink(topicName, serializer)?按照自定义序列化形式将结果写出到topic;

4、WindowStream?

对被划分window的数据进行操作

  • WindowStream<K, Integer> count()?统计窗口内数据个数;
  • WindowStream<K, V> filter(predictor)?过滤窗口内数据;
  • <OUT> WindowStream<K, OUT> map(mapperAction)?对窗口内数据一对一转化;
  • <OUT> WindowStream<K, OUT> aggregate(aggregateAction)?对窗口内数据多对一转化;
  • <OUT> WindowStream<K, OUT> aggregate(accumulator)?对数据进行聚合操作,且聚合支持二阶聚合,例如在窗口未触发时添加数据,在窗口触发时计算结果这类算子;
  • void sink(topicName, serializer)?按照自定义序列化形式将结果写出到topic;

?

文章来源:https://blog.csdn.net/leesinbad/article/details/134958975
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。