java – 我正确实现ActiveMQ吗?实现事务处理会话并重试
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了java – 我正确实现ActiveMQ吗?实现事务处理会话并重试,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含3981字,纯文字阅读大概需要6分钟。
内容图文
我正在尝试使用事务会话来支持回滚的JMS-ActiveMQ实现.
我是ActiveMQ的新手,我已经使用它的Java库进行了第一次实现.
当我运行我的应用程序时,我看到消息已成功入队并出列.我还可以看到相应的DLQ是自动生成的.但是,我不确定我是否正确配置了redeliverypolicy.截至目前它已在生产者上配置,但有些examples将重新传递策略与监听器容器联系起来,所以我不能完全确定在我的情况下(如果有的话)是否会将有毒消息放在DLQ上.摘要中包含详细注释.
此外,到目前为止我遇到的所有示例都使用Spring.但是,我没有选择使用它需要重新布线整个项目(如果只涉及最小的开销,我会打开).
任何关于如何使用ActiveMQ api在Java中做到这一点的见解将不胜感激.
制片人
public void publishUpdate(final MessageBody payload)
??????????? throws JMSException {
??????? Session session = session(connection());
??????? try {
??????????? Message message = message(session, payload);
??????????? LOGGER.info("About to put message on queue");
??????????? producer(session).send(message);
??????????? // without session.commit()-- no messages get put on the queue.
??????????? session.commit();// messages seem to be enqueued now.
???????????
??????? } catch ( BadRequestException e) { //to avoid badly formed requests?
??????????? LOGGER.info("Badly formed request. Not attempting retry!");
??????????? return;
??????? } catch (JMSException jmsExcpetion) {
??????????? LOGGER.info("Caught JMSException will retry");
??????????? session.rollback();// assume rollback is followed by a retry?
??????? } ???????
??? }
private MessageProducer producer(Session session) throws JMSException {
return session.createProducer(destination());
}
private Connection connection() throws JMSException {
ActiveMQConnectionFactory connectionFactory= new ActiveMQConnectionFactory();
Connection connection = connectionFactory.createConnection();
connectionFactory.setRedeliveryPolicy(getRedeliveryPolicy());//redelivery policy with three retries and redelivery time of 1000ms
return connection;
}
private Session session(Connection connection) throws JMSException {
Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
connection.start();
return session;
}
监听器:
public class UpdateMessageListener implements MessageListener{
….
public void onMessage(Message message) {
String json = null;
try {
//Does the listener need to do anything to deal with retry?
json = ((TextMessage) message).getText();
MessageBody request = SerializeUtils.deserialize(json, MessageBody.class);
processTransaction(request.getUpdateMessageBody(), headers);//perform some additional processing.
} catch (Throwable e) {
LOGGER.error("Error processing request: {}", json);
}
}
}
消费者:
private MessageConsumer consumer() throws JMSException {
LOGGER.info("Creating consumer");
MessageConsumer consumer = session().createConsumer(destination());
consumer.setMessageListener(new UpdateMessageListener()); //wire listener to consumer
return consumer;
}
private Session session() throws JMSException {
Connection connection=connection();
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);//create an auto-ack from the consumer side? Is this correct?
connection.start();
return session;
}
如果有必要,我也愿意提供更多代码.
解决方法:
你的解决方案有一点缺陷.
根据JMS document,如果在onMessage函数中存在异常,则在Session.AUTO_ACKNOWLEDGE模式下,失败的执行消息将由消息队列(例如ActiveMQ)重新传递.但是这个流程被打破了,因为侦听器在onMessage函数中捕获了Throwable或Exception.数据流如下图所示:
如果要实现本地事务,则在消息处理程序执行失败时应该抛出异常.在pesudo代码中,异步重新传递消息使用者可能如下所示:
Session session = connection.getSession(consumerId);
sessionQueueBuffer.enqueue(message);
Runnable runnable = new Ruannale(){
run(){
Consumer consumer = session.getConsumer(consumerId);
Message md = sessionQueueBuffer.dequeue();
try{
consumer.messageListener.onMessage(md);
ack(md);//send an STANDARD_ACK_TYPE, tell broker success
}catch(Exception e){
redelivery();//send redelivery ack. DELIVERED_ACK_TYPE, require broker keep message and redeliver it.
}
}
threadPool.execute(runnable);
内容总结
以上是互联网集市为您收集整理的java – 我正确实现ActiveMQ吗?实现事务处理会话并重试全部内容,希望文章能够帮你解决java – 我正确实现ActiveMQ吗?实现事务处理会话并重试所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。