java – 提供单个TaskExecutor / ThreadPool的多个DefaultMessageListenerContainer(公平)
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了java – 提供单个TaskExecutor / ThreadPool的多个DefaultMessageListenerContainer(公平),小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含11355字,纯文字阅读大概需要17分钟。
内容图文
![java – 提供单个TaskExecutor / ThreadPool的多个DefaultMessageListenerContainer(公平)](/upload/InfoBanner/zyjiaocheng/818/dc80e225e39843939de9d63655a1da89.jpg)
我正在开发一个Spring Boot应用程序,我打算在其中使用来自多个队列的JMS消息 – 但是我想控制处理这些消息的执行线程的总数,而不是控制每个JMS队列执行的线程数.
简而言之:大量的JMS队列.一个线程池来进行处理.
有些队列比其他队列更繁忙(有些可能总是有工作,有些则长时间闲置) – 所以我想利用可用的处理能力做任何需要完成的工作,无论源队列如何.
我已经设置了一系列DefaultMessageListenerContainers,每个都使用具有固定池大小的共享TaskExecutor.我观察到的行为是,一个队列将占用所有可用的插槽 – 然后(即使第一个队列变空),插槽也不可用于其他队列的DefaultMessageListenerContainers使用.
这在javadocs for DefaultMessageListenerContainer.setTaskExecutor()中有详细说明:
A plain thread pool does not add much value, as this listener
container will occupy a number of threads for its entire lifetime.
>有办法解决这个问题吗?它如何在javadocs中提到的“J2EE环境”中以不同的方式工作?
>例如,我可以执行类似MessageListenerContainer的操作吗?
>如果我想要的是可能的 – 那么是否可以应用一些排序,从而可以给予先前空闲队列上收到的消息更高的优先级?
解决方法:
我试图重新创建你的问题,但我不能.可能是因为您在此问题中缺少源代码.但这就是我的尝试.
import java.io.File;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.jms.listener.adapter.MessageListenerAdapter;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.FileSystemUtils;
@Configuration
@EnableAutoConfiguration
public class Application {
public class Receiver {
private String name;
public Receiver(String name) {
this.name = name;
}
/**
* When you receive a message, print it out, then shut down the application.
* Finally, clean up any ActiveMQ server stuff.
*/
public void receiveMessage(String message) {
System.out.println("Received <" + message + "> @ "+name);
//context.close();
FileSystemUtils.deleteRecursively(new File("activemq-data"));
}
}
static String mailboxDestination = "mailbox-destination";
static String mailboxDestination2= "mailbox-destination2";
@Bean
MessageListenerAdapter adapter1() {
MessageListenerAdapter messageListener
= new MessageListenerAdapter(new Receiver("MailBox1"));
messageListener.setDefaultListenerMethod("receiveMessage");
return messageListener;
}
@Bean
MessageListenerAdapter adapter2() {
MessageListenerAdapter messageListener
= new MessageListenerAdapter(new Receiver("MailBox2"));
messageListener.setDefaultListenerMethod("receiveMessage");
return messageListener;
}
@Bean
DefaultMessageListenerContainer container(ConnectionFactory connectionFactory) throws Exception {
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setMessageListener(adapter1());
container.setConnectionFactory(connectionFactory);
container.setDestinationName(mailboxDestination);
container.setTaskExecutor(taskExecutor());
return container;
}
@Bean
DefaultMessageListenerContainer container2(ConnectionFactory connectionFactory) throws Exception {
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setMessageListener(adapter2());
container.setConnectionFactory(connectionFactory);
container.setDestinationName(mailboxDestination2);
container.setTaskExecutor(taskExecutor());
return container;
}
@Bean
TaskExecutor taskExecutor() throws Exception {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(10);
taskExecutor.setMaxPoolSize(100);
taskExecutor.setQueueCapacity(1000);
taskExecutor.setThreadGroupName("MyThreads");
return taskExecutor;
}
public static void main(String[] args) {
// Clean out any ActiveMQ data from a previous run
FileSystemUtils.deleteRecursively(new File("activemq-data"));
// Launch the application
ConfigurableApplicationContext context = SpringApplication.run(Application.class, args);
// Send a message
final JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class);
System.out.println("Sending a new message.");
new Thread() {
public void run() {
for(int i=0;i<100;i++) {
final String message = (i+1)+" ping!";
MessageCreator messageCreator = new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message);
}
};
jmsTemplate.send(mailboxDestination, messageCreator);
}
}
}.start();
new Thread() {
public void run() {
for(int i=0;i<100;i++) {
final String message = (i+1)+" ping!";
MessageCreator messageCreator = new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message);
}
};
jmsTemplate.send(mailboxDestination2, messageCreator);
}
}
}.start();
}
}
我得到的输出是;
Received <1 ping!> @ MailBox2
Received <1 ping!> @ MailBox1
Received <2 ping!> @ MailBox2
Received <2 ping!> @ MailBox1
Received <3 ping!> @ MailBox2
Received <3 ping!> @ MailBox1
Received <4 ping!> @ MailBox2
Received <4 ping!> @ MailBox1
Received <5 ping!> @ MailBox2
Received <5 ping!> @ MailBox1
Received <6 ping!> @ MailBox2
Received <6 ping!> @ MailBox1
Received <7 ping!> @ MailBox2
Received <7 ping!> @ MailBox1
Received <8 ping!> @ MailBox2
Received <8 ping!> @ MailBox1
Received <9 ping!> @ MailBox2
Received <9 ping!> @ MailBox1
Received <10 ping!> @ MailBox2
Received <10 ping!> @ MailBox1
Received <11 ping!> @ MailBox2
Received <11 ping!> @ MailBox1
Received <12 ping!> @ MailBox1
Received <12 ping!> @ MailBox2
Received <13 ping!> @ MailBox1
Received <13 ping!> @ MailBox2
Received <14 ping!> @ MailBox1
Received <14 ping!> @ MailBox2
Received <15 ping!> @ MailBox2
Received <15 ping!> @ MailBox1
Received <16 ping!> @ MailBox2
Received <16 ping!> @ MailBox1
Received <17 ping!> @ MailBox2
Received <17 ping!> @ MailBox1
Received <18 ping!> @ MailBox2
Received <18 ping!> @ MailBox1
Received <19 ping!> @ MailBox1
Received <19 ping!> @ MailBox2
Received <20 ping!> @ MailBox1
Received <20 ping!> @ MailBox2
Received <21 ping!> @ MailBox1
Received <21 ping!> @ MailBox2
Received <22 ping!> @ MailBox1
Received <22 ping!> @ MailBox2
Received <23 ping!> @ MailBox1
Received <23 ping!> @ MailBox2
Received <24 ping!> @ MailBox1
Received <24 ping!> @ MailBox2
Received <25 ping!> @ MailBox1
Received <25 ping!> @ MailBox2
Received <26 ping!> @ MailBox1
Received <26 ping!> @ MailBox2
Received <27 ping!> @ MailBox1
Received <27 ping!> @ MailBox2
Received <28 ping!> @ MailBox2
Received <28 ping!> @ MailBox1
Received <29 ping!> @ MailBox2
Received <29 ping!> @ MailBox1
Received <30 ping!> @ MailBox2
Received <30 ping!> @ MailBox1
Received <31 ping!> @ MailBox2
Received <31 ping!> @ MailBox1
Received <32 ping!> @ MailBox2
Received <33 ping!> @ MailBox2
Received <32 ping!> @ MailBox1
Received <34 ping!> @ MailBox2
Received <33 ping!> @ MailBox1
Received <34 ping!> @ MailBox1
Received <35 ping!> @ MailBox2
Received <35 ping!> @ MailBox1
Received <36 ping!> @ MailBox2
Received <36 ping!> @ MailBox1
Received <37 ping!> @ MailBox2
Received <37 ping!> @ MailBox1
Received <38 ping!> @ MailBox2
Received <38 ping!> @ MailBox1
Received <39 ping!> @ MailBox2
Received <39 ping!> @ MailBox1
Received <40 ping!> @ MailBox2
Received <40 ping!> @ MailBox1
Received <41 ping!> @ MailBox2
Received <42 ping!> @ MailBox2
Received <43 ping!> @ MailBox2
Received <44 ping!> @ MailBox2
Received <45 ping!> @ MailBox2
Received <46 ping!> @ MailBox2
Received <47 ping!> @ MailBox2
Received <48 ping!> @ MailBox2
Received <49 ping!> @ MailBox2
Received <50 ping!> @ MailBox2
Received <51 ping!> @ MailBox2
Received <52 ping!> @ MailBox2
Received <53 ping!> @ MailBox2
Received <54 ping!> @ MailBox2
Received <55 ping!> @ MailBox2
Received <56 ping!> @ MailBox2
Received <57 ping!> @ MailBox2
Received <58 ping!> @ MailBox2
Received <59 ping!> @ MailBox2
Received <60 ping!> @ MailBox2
Received <61 ping!> @ MailBox2
Received <62 ping!> @ MailBox2
Received <63 ping!> @ MailBox2
Received <64 ping!> @ MailBox2
Received <65 ping!> @ MailBox2
Received <66 ping!> @ MailBox2
Received <67 ping!> @ MailBox2
Received <68 ping!> @ MailBox2
Received <69 ping!> @ MailBox2
Received <70 ping!> @ MailBox2
Received <71 ping!> @ MailBox2
Received <72 ping!> @ MailBox2
Received <73 ping!> @ MailBox2
Received <74 ping!> @ MailBox2
Received <75 ping!> @ MailBox2
Received <76 ping!> @ MailBox2
Received <77 ping!> @ MailBox2
Received <78 ping!> @ MailBox2
Received <79 ping!> @ MailBox2
Received <80 ping!> @ MailBox2
Received <81 ping!> @ MailBox2
Received <82 ping!> @ MailBox2
Received <83 ping!> @ MailBox2
Received <84 ping!> @ MailBox2
Received <85 ping!> @ MailBox2
Received <86 ping!> @ MailBox2
Received <87 ping!> @ MailBox2
Received <88 ping!> @ MailBox2
Received <89 ping!> @ MailBox2
Received <90 ping!> @ MailBox2
Received <91 ping!> @ MailBox2
Received <92 ping!> @ MailBox2
Received <93 ping!> @ MailBox2
Received <94 ping!> @ MailBox2
Received <95 ping!> @ MailBox2
Received <96 ping!> @ MailBox2
Received <97 ping!> @ MailBox2
Received <98 ping!> @ MailBox2
Received <99 ping!> @ MailBox2
Received <100 ping!> @ MailBox2
Received <41 ping!> @ MailBox1
Received <42 ping!> @ MailBox1
Received <43 ping!> @ MailBox1
Received <44 ping!> @ MailBox1
Received <45 ping!> @ MailBox1
Received <46 ping!> @ MailBox1
Received <47 ping!> @ MailBox1
Received <48 ping!> @ MailBox1
Received <49 ping!> @ MailBox1
Received <50 ping!> @ MailBox1
Received <51 ping!> @ MailBox1
Received <52 ping!> @ MailBox1
Received <53 ping!> @ MailBox1
Received <54 ping!> @ MailBox1
Received <55 ping!> @ MailBox1
Received <56 ping!> @ MailBox1
Received <57 ping!> @ MailBox1
Received <58 ping!> @ MailBox1
Received <59 ping!> @ MailBox1
Received <60 ping!> @ MailBox1
Received <61 ping!> @ MailBox1
Received <62 ping!> @ MailBox1
Received <63 ping!> @ MailBox1
Received <64 ping!> @ MailBox1
Received <65 ping!> @ MailBox1
Received <66 ping!> @ MailBox1
Received <67 ping!> @ MailBox1
Received <68 ping!> @ MailBox1
Received <69 ping!> @ MailBox1
Received <70 ping!> @ MailBox1
Received <71 ping!> @ MailBox1
Received <72 ping!> @ MailBox1
Received <73 ping!> @ MailBox1
Received <74 ping!> @ MailBox1
Received <75 ping!> @ MailBox1
Received <76 ping!> @ MailBox1
Received <77 ping!> @ MailBox1
Received <78 ping!> @ MailBox1
Received <79 ping!> @ MailBox1
Received <80 ping!> @ MailBox1
Received <81 ping!> @ MailBox1
Received <82 ping!> @ MailBox1
Received <83 ping!> @ MailBox1
Received <84 ping!> @ MailBox1
Received <85 ping!> @ MailBox1
Received <86 ping!> @ MailBox1
Received <87 ping!> @ MailBox1
Received <88 ping!> @ MailBox1
Received <89 ping!> @ MailBox1
Received <90 ping!> @ MailBox1
Received <91 ping!> @ MailBox1
Received <92 ping!> @ MailBox1
Received <93 ping!> @ MailBox1
Received <94 ping!> @ MailBox1
Received <95 ping!> @ MailBox1
Received <96 ping!> @ MailBox1
Received <97 ping!> @ MailBox1
Received <98 ping!> @ MailBox1
Received <99 ping!> @ MailBox1
Received <100 ping!> @ MailBox1
如果我错了,请纠正我但是使用这种配置我没有看到任何问题.两个队列和一个任务执行器.
内容总结
以上是互联网集市为您收集整理的java – 提供单个TaskExecutor / ThreadPool的多个DefaultMessageListenerContainer(公平)全部内容,希望文章能够帮你解决java – 提供单个TaskExecutor / ThreadPool的多个DefaultMessageListenerContainer(公平)所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。