程序地带

一. spring cloud gateway集成 spring cloud stream binder kafka,实现“动态路由“刷新与加载之采坑记录


一、前言


Spring Cloud Stream是用于构建消息驱动的微服务应用程序的框架。


本文主要介绍如何集成 Spring Cloud Stream,以 Kafka发布订阅模式(topic),实现发送消息>>消费消息。我所找到的帖子大部分都是关于 binder RabbitMQ,后来又拜读了官方文档(google翻译),逐渐对 Spring Cloud Stream有了清晰的认识。


真正集成时,并没有那么顺利;我看别人都是很简单的完成了测试,而我一直没有实现 @output与@input管道的"绑定" topic,非常困惑,ε=(´ο`*)))唉。还好,最终,发现了问题,接下来娓娓道来。


这一步的成功,是我实现 gateway"动态路由"刷新与加载的关键步骤,后续再补充。


二、官方原理与配置



官方解释:Spring Cloud Stream应用程序由与中间件无关的核心组成。 该应用程序通过在外部代理(brokers)暴露的目标与代码中的输入/输出(input/output)参数之间建立绑定(bindings)来与外界进行通信。 建立绑定(bindings)所需的特定于代理(brokers)的详细信息由特定于中间件的Binder实现处理。(着重看红字,易于理解后续配置)


 


官方参考指南


1. Apache Kafka粘合剂 1.1.用法 要使用Apache Kafka粘合剂,您需要将其spring-cloud-stream-binder-kafka作为依赖项添加到Spring Cloud Stream应用程序中,如Maven的以下示例所示:


<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

另外,您也可以使用Spring Cloud Stream Kafka Starter,如以下Maven示例所示:


<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

2.以下是我用的配置:可以看到,我为了采坑,又尝试使用了 spring-cloud-starter-bus-kafka,就是想成功实现o(╯□╰)o。


<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<!-- <artifactId>spring-cloud-starter-bus-kafka</artifactId>-->
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>

3.gate-admin工程(消息生产者),bootstrap.yml (错误的配置,懂得人会发现问题)


spring:
cloud:
stream:
kafka:
binder:
brokers: 192.168.57.110
#defaultBrokerPort: 9092
auto-create-topics: true
auto-add-partitions: true
zk-nodes: 192.168.57.110
#defaultZkPort: 2181
bindings:
routeOutput:
binder: kafka
destination: gate
content-type: text/plain
#producer:
# useTopicHeader: false
default-binder: kafka

3.1 @Output


public interface GateRouteOut {
String OUT = "routeOutput";
@Output(GateRouteOut.OUT)
MessageChannel routeOutput();
}

3.2 RouteSendService 发送消息;


/**
* @desc:
* @author: yanfei
* @date: 2020/12/10
*/
@EnableBinding(value = {GateRouteOut.class})
@Component
@Slf4j
public class RouteSendService {
@Autowired
@Qualifier(GateRouteOut.OUT)
private MessageChannel channel;
public void sendMsg(String routingKey, String routeDefinition) {
channel.send(MessageBuilder.withPayload(routeDefinition).build());
log.info(">>>> gate-admin发送路由信息:{},成功!", routeDefinition);
}
// @Bean
// public Supplier<String> uppercase() {
// log.info(">>>> 发送信息成功!");
// return () -> "Hello from Supplier";
// }
}

4.gateway工程(消息消费者),bootstrap.yml (错误的配置,懂得人会发现问题)


spring:
cloud:
stream:
kafka:
binder:
brokers: 192.168.57.110
auto-create-topics: true
auto-add-partitions: true
zk-nodes: 192.168.57.110
bindings:
routeInput:
group: route-group
binder: kafka
destination: gate
content-type: text/plain
#consumer:
# pollTimeout: 1
# startOffset: earliest
default-binder: kafka

4.1 @Input


/**
* @desc:
* @author: yanfei
* @date: 2020/12/15
*/
public interface GateRouteIn {
String IN = "routeInput";
@Input(GateRouteIn.IN)
SubscribableChannel routeInput();
}

4.2 RouteReceiveService 消费消息


/**
* @desc:
* @author: yanfei
* @date: 2020/12/10
*/
@Slf4j
@EnableAutoConfiguration
@EnableBinding(value = {GateRouteIn.class})
public class RouteReceiveService {
@StreamListener(value = GateRouteIn.IN)
public void receive(String payload) {
log.info(">>>> gateway网关消费信息:{}成功!", payload);
}
// @Bean
// public Consumer<String> uppercase() {
// log.info(">>>> {}接手到了信息,成功!", "uppercase");
// return System.out::println;
// }
}

5.看是简单吧,我竟然没有成功,接着看。


三、分析原因与思路


我怀疑:


官方没有交代清楚!呵呵,别人都成功了,我失败了。肯定不是。写法不对?配置不对?我又尝试了其他写法,又把 kafka相关的配置全部扒拉一遍,默认的配置也写上,如:binder/default-binder/producer等。还是不对。服务端 kafka,server.properties,有什么我漏掉的?忘记配置了?我又搞了一遍。哎,失败。kafka tool 可视化工具,查看利器!

问题在于此:



关键点,我写成了这样:spring.cloud.stream.kafka.bindings.<channelName>.destination,这是错误的,无法绑定主题。


正确的是:                     spring.cloud.stream.bindings.input.destination=myQueue




四、终于实现消费消息


gate-admin工程(消息生产者),控制台



gateway工程(消息消费者),控制台



五、参考文献


https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-introducing


https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_annotation_based_support_legacy


https://blog.csdn.net/qq_32734365/article/details/81413218?utm_medium=distribute.pc_relevant_t0.none-task-blog-BlogCommendFromMachineLearnPai2-1.edu_weight&depth_1-utm_source=distribute.pc_relevant_t0.none-task-blog-BlogCommendFromMachineLearnPai2-1.edu_weight


https://www.cnblogs.com/wphmoon/p/11769823.html


https://baijiahao.baidu.com/s?id=1651919282506404758&wfr=spider&for=pc


https://www.cnblogs.com/hellxz/p/9396282.html


https://blog.csdn.net/weixin_38399962/article/details/82192340?utm_medium=distribute.pc_relevant.none-task-blog-title-2&spm=1001.2101.3001.4242


https://www.cnblogs.com/miracle-luna/p/11299345.html


版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/yanfei_1986/article/details/111246333

随机推荐

range python 3.6 type class_Python 3.6 有什么新特性

匿名用户1级2017-07-31回答有哪些重要的新特性。1.格式化字符串字面量PEP498引入了f-string,一种新型的字符串字面量。中文翻译为“格式化字符串字面量”。这种字符串以f...

weixin_39530557 阅读(226)

IDEA快捷键总结

删除光标所在行代码Ctrl+Y复制光标所在行代码,或者鼠标选中的代码Ctrl+D剪切Ctrl+X行注释Ctrl+/块注释Ctrl+Shift...

【-_-】 阅读(574)