java – 配置Spring Integration聚合器以组合RabbitMq扇出交换的响应
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了java – 配置Spring Integration聚合器以组合RabbitMq扇出交换的响应,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含3520字,纯文字阅读大概需要6分钟。
内容图文
![java – 配置Spring Integration聚合器以组合RabbitMq扇出交换的响应](/upload/InfoBanner/zyjiaocheng/777/f6ae65190bcc43a9accbff2dccaacb9a.jpg)
我试图使用Spring Integration配置以下内容:
>向频道发送消息.
>将此消息发布到与n个消费者的兔子扇出(pub / sub)交换.
>每个消费者都提供响应消息.
>让Spring Integration在将它们返回到原始客户端之前聚合这些响应.
到目前为止,我有一些问题……
>我正在使用发布 – 订阅 – 通道来设置apply-sequence =“true”属性,以便correlationId,sequenceSize& sequenceNumber属性已设置. DefaultAmqpHeaderMapper抛弃了这些属性. DEBUG headerName = [correlationId]将不会被映射
>即使在扇出交换中注册了2个队列,sequenceSize属性也只被设置为1.据推测,这意味着消息将过早地从聚合器中释放.我希望这是因为我滥用发布 – 订阅 – 频道才能使用apply-sequence =“true”,而且正确地说只有一个订户,即int-amqp:outbound-gateway.
我的出站Spring配置如下:
<int:publish-subscribe-channel id="output" apply-sequence="true"/>
<int:channel id="reply">
<int:interceptors>
<int:wire-tap channel="logger"/>
</int:interceptors>
</int:channel>
<int:aggregator input-channel="reply" method="combine">
<bean class="example.SimpleAggregator"/>
</int:aggregator>
<int:logging-channel-adapter id="logger" level="INFO"/>
<int:gateway id="senderGateway" service-interface="example.SenderGateway" default-request-channel="output" default-reply-channel="reply"/>
<int-amqp:outbound-gateway request-channel="output"
amqp-template="amqpTemplate" exchange-name="fanout-exchange"
reply-channel="reply"/>
我的rabbitMQ配置如下:
<rabbit:connection-factory id="connectionFactory" />
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" reply-timeout="-1" />
<rabbit:admin connection-factory="connectionFactory" />
<rabbit:queue name="a-queue"/>
<rabbit:queue name="b-queue"/>
<rabbit:fanout-exchange name="fanout-exchange">
<rabbit:bindings>
<rabbit:binding queue="a-queue" />
<rabbit:binding queue="b-queue" />
</rabbit:bindings>
</rabbit:fanout-exchange>
消费者看起来像这样:
<int:channel id="input"/>
<int-amqp:inbound-gateway request-channel="input" queue-names="a-queue" connection-factory="connectionFactory" concurrent-consumers="1"/>
<bean id="listenerService" class="example.ListenerService"/>
<int:service-activator input-channel="input" ref="listenerService" method="receiveMessage"/>
任何建议都会很棒,我怀疑我的某个地方有错误的结尾……
基于Gary的评论新的出站弹簧配置:
<int:channel id="output"/>
<int:header-enricher input-channel="output" output-channel="output">
<int:correlation-id expression="headers['id']" />
</int:header-enricher>
<int:gateway id="senderGateway" service-interface="example.SenderGateway" default-request-channel="output" default-reply-timeout="5000" default-reply-channel="reply" />
<int-amqp:outbound-gateway request-channel="output"
amqp-template="amqpTemplate" exchange-name="fanout-exchange"
reply-channel="reply"
mapped-reply-headers="amqp*,correlationId" mapped-request-headers="amqp*,correlationId"/>
<int:channel id="reply"/>
<int:aggregator input-channel="reply" output-channel="reply" method="combine" release-strategy-expression="size() == 2">
<bean class="example.SimpleAggregator"/>
</int:aggregator>
解决方法:
问题是S.I.不知道扇出交换的拓扑结构.
最简单的方法是使用自定义发布策略
release-strategy-expression="size() == 2"
在聚合器上(假设扇出为2).所以,你不需要序列大小;你可以避免使用标题扩充器“滥用”发布/订阅者频道…
<int:header-enricher input-channel="foo" output-channel="bar">
<int:correlation-id expression="T(java.util.UUID).randomUUID().toString()" />
</int:header-enricher>
您可以使用消息ID避免创建新的UUID,消息ID已经是唯一的…
<int:correlation-id expression="headers['id']" />
最后,您可以通过添加将correlationId标头传递给AMQP
mapped-request-headers="correlationId"
到你的amqp端点.
内容总结
以上是互联网集市为您收集整理的java – 配置Spring Integration聚合器以组合RabbitMq扇出交换的响应全部内容,希望文章能够帮你解决java – 配置Spring Integration聚合器以组合RabbitMq扇出交换的响应所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。