java-SubmissionPublisher在提交时未调用订户的Next
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了java-SubmissionPublisher在提交时未调用订户的Next,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含3145字,纯文字阅读大概需要5分钟。
内容图文
![java-SubmissionPublisher在提交时未调用订户的Next](/upload/InfoBanner/zyjiaocheng/685/007ca62cb37b49c9828d069602c0b238.jpg)
每隔一段时间,我都会通过某个查询来检索推文.
这些推文必须传递给计算和操纵这些推文的服务.
因此,这些服务已订阅我的发布者.因此Publisher.hasSubscribers()返回true.但是Submit或offer函数不会调用我的订阅者的onNext.
因此,作为“修复”,我循环浏览订户并自己调用它.但是事实并非如此.
这是我的发布者的构造函数.
public TwitterStreamer(Executor executor, int maxBufferCapacity, long period, TimeUnit unit, String searchQuery){
super(executor, maxBufferCapacity);
this.searchQuery = searchQuery;
scheduler = new ScheduledThreadPoolExecutor(1);
this.tweetGetter = scheduler.scheduleAtFixedRate(
() -> {
List<String> tweets = getTweets(searchQuery);
/* this.lastCall = LocalDateTime.now();
for(Flow.Subscriber sub : this.getSubscribers()){
sub.onNext(tweets);
}*/
this.submit(tweets);
if(tweets.size() >= 20) this.close();
}, 0, period, unit);
}
这是我的订户
package myFlowAPI;
import Interfaces.IProcess;
import Services.LogToFileService;
import java.util.List;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicInteger;
public class MySubscriber implements Flow.Subscriber<List<String>> {
private Flow.Subscription subscription;
private AtomicInteger count;
private IProcess processor;
private String name;
private int DEMAND = 0;
public MySubscriber(String name, IProcess processor){
this.name = name;
this.processor = processor;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
}
@Override
public void onNext(List<String> item) {
Object result = this.processor.process(item);
this.readResult(result);
switch (this.processor.getClass().getSimpleName()){
case "CalculateTweetStatsService":
if((Integer) result >= 20){
this.subscription.cancel();
}
break;
}
}
@Override
public void one rror(Throwable throwable) {
System.out.println("Error is thrown " + throwable.getMessage());
}
@Override
public void onComplete() {
if(this.processor instanceof LogToFileService){
((LogToFileService) processor).closeResource();
}
System.out.println("complete");
}
private void readResult(Object result){
System.out.println("Result of " + this.processor.getClass().getSimpleName() + " processor is " + result.toString());
}
}
这是我订阅发布者的主要地点
public static void main(String[] args) {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
String searchQuery;
try{
searchQuery = args[0] != null ? args[0] : "#capgemini50";
}catch (ArrayIndexOutOfBoundsException ex){
searchQuery = "#capgemini50";
}
TwitterStreamer streamer = new TwitterStreamer(executor, 5, 15L, SECONDS, searchQuery);
MySubscriber subscriber1 = new MySubscriber("LogFileSubscriber", new LogToFileService("./tweetsLogger.txt"));
MySubscriber subscriber2 = new MySubscriber("TotalTweetSubscriber",new CalculateTweetStatsService());
streamer.subscribe(subscriber1);
streamer.subscribe(subscriber2);
}
解决方法:
您需要订阅者明确请求数据,例如订阅后(请参见http://download.java.net/java/jdk9/docs/api/java/util/concurrent/Flow.Subscription.html#request-long-):
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1);
}
在onNext()中处理以请求下一个项目时也是如此.
内容总结
以上是互联网集市为您收集整理的java-SubmissionPublisher在提交时未调用订户的Next全部内容,希望文章能够帮你解决java-SubmissionPublisher在提交时未调用订户的Next所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。