【RocketMQ每日一问】RocketMQ SQL92过滤用法以及原理?
2024-01-07 17:41:39
1.生产端
public class SQLProducer {
public static int count = 10;
public static String topic = "xiao-zou-topic";
public static void main(String[] args) {
DefaultMQProducer producer = MQUtils.createLocalProducer();
IntStream.range(0, count).forEach(i -> {
Message message = new Message(topic, ("sql92 test" + i).getBytes(StandardCharsets.UTF_8));
try {
if (i % 2 == 0) {
message.putUserProperty("gray", "dev1");
}
SendResult sendResult = producer.send(message);
DateTimeFormatter dtf2 = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
System.out.printf("%s %s%n", sendResult, dtf2.format(LocalDateTime.now()));
}
catch (Exception e) {
throw new RuntimeException(e);
}
});
producer.shutdown();
}
}
2.消费端
public class SQLConsumer {
public static String GID = "xiao-zou-gid";
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = MQUtils.createLocalConsumer(GID);
String sql = "gray is not null and gray = 'dev1'";
consumer.subscribe(MQUtils.TOPIC, MessageSelector.bySql(sql));
consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msg);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
/*
* Launch the consumer instance.
*/
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
3.语法规则
4.原理
-
当消息到达 Broker 时,Broker 会将消息与对应的订阅关系进行匹配。
-
如果该订阅关系包含 SQL92 表达式,则将该表达式传递给消息过滤器。
-
消息过滤器使用 Antlr4 解析器解析 SQL92 表达式,并将其转换为语法树。
-
一旦表达式被转换为语法树,过滤器就可以开始遍历语法树,并使用消息属性和自定义属性来匹配表达式中的条件。
-
如果消息属性和自定义属性匹配 SQL92 表达式中的条件,则过滤器将消息传递给消费者。
-
如果消息属性和自定义属性不匹配 SQL92 表达式中的条件,则过滤器将跳过该消息,并继续匹配其他消息。
文章来源:https://blog.csdn.net/jianjun_fei/article/details/135365858
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!