09-Pub/Sub
1 Channel和Subscription
- 关于Channel
- Eventing中的Channel CRD负责定义名称空间级别的消息总线
- 它的后端要基于特定的实现,如In-Memory Channel(简称imc)、NATS Channel或Kafka Channel等
- 每个Channel应该对应于一个特定Topic
- 通常,Channels and Subscriptions消息投递模式中才需要自行创建Channel
- Sources to Sink模式不需要Channel
- Brokers and Triggers无须自行配置Channel
- 关于Subscription
- Eventing中的Subscription CRD负责将Sink(例如Service或KService)连接至一个Channel之上;
- 何时需要自行创建Subscription
- Sources to Sink模式不需要Subscription,因为没有Channel可以订阅
- ◆Channels and Subscriptions消息投递模式,需要创建订阅至Channel的Subscription
- Brokers and Triggers消息投递模式,需要创建订阅至Trigger的Subscription
1.1 Channel和Subscription实践
-
示例环境说明
- 基于imc的channel/imc01作为消息总线
- kservice/event-display订阅channel/imc01
- curl命令作为event source,基于HTTP协议推送消息至channel/imc01
-
具体步骤
-
创建一个channel
kn channel create imc01 --type messaging.knative.dev:v1:InMemoryChannel
apiVersion: messaging.knative.dev/v1 kind: Channel metadata: name: imc01 spec: channelTemplate: apiVersion: messaging.knative.dev/v1 kind: InMemoryChannel
查看channel
kn channel list
-
创建2个Sink: kservice/event-display01/kservice/event-display02
kn service create event-display01 --image ikubernetes/event_display --port 8080 --scale-min 1 kn service create event-display02 --image ikubernetes/event_display --port 8080 --scale-min 1
--- apiVersion: serving.knative.dev/v1 kind: Service metadata: name: event-display01 spec: template: metadata: annotations: autoscaling.knative.dev/min-scale: "1" spec: containers: - image: ikubernetes/event_display ports: - containerPort: 8080 --- apiVersion: serving.knative.dev/v1 kind: Service metadata: name: event-display02 spec: template: metadata: annotations: autoscaling.knative.dev/min-scale: "1" spec: containers: - image: ikubernetes/event_display ports: - containerPort: 8080
-
创建subscription: 负责连接channel和sink。这里模拟使用同一channel,创建多个subscription,验证分发效果
# /sub01负责连接kservice/event-display01至channel/imc01 kn subscription create sub01 --channel imc01 --sink ksvc:event-display01 #/sub02负责连接kservice/event-display02至channel/imc01 kn subscription create sub02 --channel imc01 --sink ksvc:event-display02
apiVersion: messaging.knative.dev/v1 kind: Subscription metadata: name: subscription1 spec: channel: apiVersion: messaging.knative.dev/v1 kind: Channel name: imc01 subscriber: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display01 --- apiVersion: messaging.knative.dev/v1 kind: Subscription metadata: name: subscription2 spec: channel: apiVersion: messaging.knative.dev/v1 kind: Channel name: imc01 subscriber: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display02
查看subscription:
-
验证
-
创建一个客户端Pod,使用curl命令基于HTTP协议推送event
kubectl run client-$RANDOM --image=ikubernetes/admin-box:v1.2 --restart=Never --rm -it --command -- /bin/bash
-
进入测试的pod里面,向channel/imc01的URL发起事件推送请求
curl -v "http://imc01-kn-channel.default.svc.cluster.local" -X POST -H "Content-Type: application/cloudevents+json" \ -d '{"id": "say-hello", "specversion": "1.0", "type": "com.icloud2native.sayhi", "source": "sendoff", "data": {"msg":"Hello Knative imc01 Channel"}}'
-
在sink1: event-display01查看log,能收到相关的event
kubectl logs -f event-display01-00001-deployment-64cd5c8866-48t4m
-
在sink2: event-display02查看log,能收到相关的event
kubectl logs -f event-display02-00001-deployment-664ccfc45d-k96zz
-
-
2 Broker/Triger
2.1 Message Broker
-
Broker
- 承载消息队列的组件,它从生产者接收消息,并根据消息交换规则将其交换至相应的队列(或Topic)
- 生产者通过特定的协议将Message投递至Broker
- 然后,通过队列(或Topic),将消息传递给消费者
- Kafka、RabbitMQ、ActiveMQ和RocketMQ是较为常见的代表产品
- 承载消息队列的组件,它从生产者接收消息,并根据消息交换规则将其交换至相应的队列(或Topic)
-
消息代理模式
-
Point-to-point messaging
- 消息的发送者与接收者之间存在“一对一”的关系,队列中的每条消息只发送一个接收者,并且只能被消费一次
- 适合消息仅能被处理一次的场景,例工资单处理、金融交易处理等
-
Publish/subscribe messaging
- 即“发布/订阅”模式,每条消息的生产者将消息发布到一个主题(Topic),多个消费者可以访问他们希望从中接收消息的Topic
- 发布到Topic的所有消息,都会分发给订阅该Topic的消费者
- Kafka的Topic内部由一到多个队列(Queue)组成,这些内部队列称为Partition
- 消费者也可以在Partition级别订阅
- 广播式分发机制,消息的发布者与消息的消费者之间存在“一对多”的关系
-
2.2 Knative的Broker/Trigger 消息传递框架
-
Broker
- Knative Eventing提供的CRD,负责收集CloudEvents类型的事件
- Broker对象会提供一个用于事件传入的入口端点,各生产者可以调用该入口将事件发往Broker
- 将事件投递至目的地的任务则由Trigger资源负责
- Trigger基于属性过滤事件,并将筛选出的的事件投递给订阅该Trigger的Subscriber
- Subscriber还可生成响应事件,并将这些新生成的事件传入Broker
2.3 knative Broker
-
Knative Eventing支持以下几种类型的Broker
-
基于Channel的多租户Broker (Multi-tenant channel-based broker,简称为MT-Channel-based Broker)
- 基于Channel进行事件路由
- 需要部署至少一种Channel的实现
- InMemoryChannel:可用于开发和测试目的,但不为生产环境提供适当的事件交付保证
- KafkaChannel:提供生产环境所需的事件交付保证
-
其它的可用的Broker类型
- Apache Kafka Broker
- RabbitMQ Broker
- GCP Broker
-
-
Knative Serving在名称空间级别提供了一个名为default的默认Broker,但使用前需要通过某种方式先行完成创建
-
创建默认Broker的方法
-
命令式命令,或使用配置文件
kn broker create default --namespace NS_NAME
-
在Trigger资源上使用特定的Annotation自动创建
- eventing.knative.dev/injection=enabled
-
在名称空间上添加特定的Label自动创建
- eventing.knative.dev/injection=enabled
-
-
删除默认的Broker
- 第一种方法创建的默认Broker可直接进行删除
- 后面两种是通过Injection的方式进行的资源创建,这类资源需要由管理员手动才能完成删除
2.4 Broker/Trigger 实践
-
示例环境说明
- 基于MT-Channel-based的Broker
- Trigger1过滤“type=sayhi”类的事件
- Sink为ksvc/event-display-hi
- Triiger2过滤“type=saybye”类的事件
- Sink为ksvc/event-display-bye
- curl命令作为event source
- 基于HTTP协议推送消息至broker
- Trigger基于类型过滤事件并完成分发
-
具体步骤
-
创建两个ksvc作为sink: event-display-hi/event-display-bye
$ kn service create event-display-hi --image ikubernetes/event_display --port 8080 --scale-min 1 $ kn service create event-display-bye --image ikubernetes/event_display --port 8080 --scale-min 1
--- apiVersion: serving.knative.dev/v1 kind: Service metadata: name: event-display-hi spec: template: metadata: annotations: autoscaling.knative.dev/min-scale: "1" spec: containers: - image: ikubernetes/event_display ports: - containerPort: 8080 --- apiVersion: serving.knative.dev/v1 kind: Service metadata: name: event-display-bye spec: template: metadata: annotations: autoscaling.knative.dev/min-scale: "1" spec: containers: - image: ikubernetes/event_display ports: - containerPort: 8080
-
创建broker
$ kn broker create default --class MTChannelBasedBroker # 其中的“--class”选项在使用默认的Broker类时,可以省略
apiVersion: eventing.knative.dev/v1 kind: Broker metadata: name: default
-
创建trigger:两个trigger分别过滤了不同类型的事件
kn trigger create trigger1 --broker default --sink ksvc:event-display-hi --filter type=com.icloud2native.sayhi kn trigger create trigger2 --broker default --sink ksvc:event-display-bye --filter type=com.icloud2native.saybye
--- apiVersion: eventing.knative.dev/v1 kind: Trigger metadata: name: trigger1 spec: broker: default filter: attributes: type: com.icloud2native.sayhi subscriber: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display-hi --- apiVersion: eventing.knative.dev/v1 kind: Trigger metadata: name: trigger2 spec: broker: default filter: attributes: type: com.icloud2native.saybye subscriber: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display-bye
列出两个trigger
kn trigger list
-
测试验证:event-display-hi的sink只收到hi的event,event-display-bye的sink只收到bye的event
-
创建一个客户端Pod,使用curl命令基于HTTP协议推送event
kubectl run client-$RANDOM --image=ikubernetes/admin-box:v1.2 --restart=Never --rm -it --command -- /bin/bash
-
在启动的客户端Pod中,向Broker/default的URL发起事件推送事件,sayhi和saybye类型的事件都要推送
curl -v "http://broker-ingress.knative-eventing.svc.cluster.local/default/default" -X POST -H "Content-Type: application/cloudevents+json" \ -d '{"id": "say-hi", "specversion": "1.0", "type": "com.icloud2native.sayhi", "source": "sendoff", "data": {"msg":"Hello Knative default Broker Say HI"}}' curl -v "http://broker-ingress.knative-eventing.svc.cluster.local/default/default" -X POST -H "Content-Type: application/cloudevents+json" \ -d '{"id": "say-bye", "specversion": "1.0", "type": "com.icloud2native.saybye", "source": "sendoff", "data": {"msg":"Hello Knative default Broker Say BYE"}}'
-
获取event-display-hi中的日志信息,验证是否仅存在sayhi类型的事件
-
获取event-display-bye中的日志信息,验证是否仅存在saybye类型的事件
-
-
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!