浅析Java CompletionService
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了浅析Java CompletionService,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含3817字,纯文字阅读大概需要6分钟。
内容图文
JDK的CompletionService提供了一种将生产新的异步任务与使用已完成任务的结果分离开来的服务,生产者 submit 执行的任务。使用者 take 已完成的任务,并按照完成这些任务的顺序处理它们的结果。例如,CompletionService 可以用来管理异步 IO ,执行读操作的任务作为程序或系统的一部分提交,然后,当完成读操作时,会在程序的不同部分执行其他操作,执行操作的顺序可能与所请求的顺序不同。举个例子:现在要向服务器发送HTTP请求,服务端对于每个请求都需要做很多额外操作,很消耗时间,则可以将每个请求接受之后,提交到CompletionService异步处理,等执行完毕之后,在返回给客户端
package com.yf.concurrent; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class CompletionServiceTest { private ExecutorService threadPool = Executors.newCachedThreadPool(); private CompletionService<Response> completionService = new ExecutorCompletionService<Response>( Executors.newCachedThreadPool()); public CompletionServiceTest() { new Thread() { public void run() { while (true) { try { Future<Response> f = completionService.take(); /** * 获取响应信息,返回给客户端 * 如果completionService任务队列为空,此处将阻塞 */ Response resp = f.get(); System.out.println(resp.getId()); } catch (Exception e) { System.out.println("Exception happened:"+e.getMessage()); } } }; }.start(); } class Request{ private int rid; private String body; public int getRid() { return rid; } public void setRid(int rid) { this.rid = rid; } public String getBody() { return body; } public void setBody(String body) { this.body = body; } } class Response { private int id; private String body; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getBody() { return body; } public void setBody(String body) { this.body = body; } } class HTTPExecutor { public Future<Response> execute(final Request request) { Future<Response> f = threadPool.submit(new Callable<Response>() { public Response call() throws Exception { Response response = new Response(); Thread.currentThread().sleep(3000); response.setId(request.getRid()); response.setBody("response"); return response; } }); return f; } } public void submitHTTP(final Request request) { completionService.submit(new Callable<Response>() { public Response call() throws Exception { return new HTTPExecutor().execute(request).get(); } }); } public static void main(String[] args) { CompletionServiceTest t = new CompletionServiceTest(); for (int i = 0; i < 10; i++) { /** * 发送10个HTTP请求 */ Request request =t.new Request(); request.setRid(i); request.setBody("request"); t.submitHTTP(request); } } }
可以简单查看一下CompletionService的唯一实现类ExecutorCompletionService源码
关键代码如下:
public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue<Future<V>>(); } public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) { if (executor == null || completionQueue == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = completionQueue; } public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; }
通过ExecutorCompletionService的构造器可知,CompletionService 依赖于一个单独的 Executor 来实际执行任务,内部管理了一个阻塞队列来,在调用submit方法时,会向创建一个新的RunnableFuture,然后异步执行该RunnableFuture,当其状态变为done后,添加CompletionService的阻塞队列中,外部通过调用take()(阻塞)或者poll()(非阻塞,为空返回null)方法获取执行结果。
原文:http://blog.csdn.net/yangfei001/article/details/30312719
内容总结
以上是互联网集市为您收集整理的浅析Java CompletionService全部内容,希望文章能够帮你解决浅析Java CompletionService所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。