Java-Flink-多个源的集成测试
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了Java-Flink-多个源的集成测试,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含4105字,纯文字阅读大概需要6分钟。
内容图文
![Java-Flink-多个源的集成测试](/upload/InfoBanner/zyjiaocheng/650/70a09fe9de914d85ac2794446065594d.jpg)
我有一个Flink作业,正在使用此处描述的方法进行集成测试:https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#integration-testing
作业从两个来源获取输入,这两个来源合并在CoFlatMapFuntion中.在测试环境中,我当前正在使用两个简单的SourceFunction来发出值,但是这不能对事件的发出顺序进行任何控制.为了正确测试作业的功能,这是必需的.
如何修改测试以确保一个源函数在第二个源函数之前发出所有数据?
我已经看到了Integration test for complex topology (multiple inputs) in Flink中建议的方法,这对于单元测试很好,但是我正在寻找一种解决方案,允许我对整个工作进行集成测试.
解决方法:
我建议将控制代码添加到您的两个SourceFunction中,并使用MiniClusterWithClientResource.它可能看起来如下所示:
public class JobITCase {
private static final int NUM_TMS = 2;
private static final int NUM_SLOTS = 2;
private static final int PARALLELISM = NUM_SLOTS * NUM_TMS;
@ClassRule
public final static MiniClusterWithClientResource MINI_CLUSTER_WITH_CLIENT_RESOURCE = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(NUM_SLOTS)
.setNumberTaskManagers(NUM_TMS)
.build());
@Test
public void testJob() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(PARALLELISM);
final MyControllableSourceFunction source1 = new MyControllableSourceFunction("source1");
final MyControllableSourceFunction source2 = new MyControllableSourceFunction("source2");
final DataStreamSource<Integer> input1 = env.addSource(source1);
final DataStreamSource<Integer> input2 = env.addSource(source2);
input1.connect(input2).map(new CoMapFunction<Integer, Integer, Integer>() {
@Override
public Integer map1(Integer integer) {
System.out.println("Input 1: " + integer);
return integer;
}
@Override
public Integer map2(Integer integer) {
System.out.println("Input 2: " + integer);
return integer;
}
}).print();
final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
MINI_CLUSTER_WITH_CLIENT_RESOURCE.getMiniCluster().submitJob(jobGraph).get();
final CompletableFuture<JobResult> jobResultFuture = MINI_CLUSTER_WITH_CLIENT_RESOURCE.getMiniCluster().requestJobResult(jobGraph.getJobID());
final ArrayList<CompletableFuture<Void>> finishedFutures = new ArrayList<>(PARALLELISM);
for (int i = 0; i < PARALLELISM; i++) {
MyControllableSourceFunction.startExecution(source1, i);
finishedFutures.add(MyControllableSourceFunction.getFinishedFuture(source1, i));
}
FutureUtils.waitForAll(finishedFutures).join();
for (int i = 0; i < PARALLELISM; i++) {
MyControllableSourceFunction.startExecution(source2, i);
}
jobResultFuture.join();
}
private static class MyControllableSourceFunction extends RichParallelSourceFunction<Integer> {
private static final ConcurrentMap<String, CountDownLatch> startLatches = new ConcurrentHashMap<>();
private static final ConcurrentMap<String, CompletableFuture<Void>> finishedFutures = new ConcurrentHashMap<>();
private final String name;
private boolean running = true;
private MyControllableSourceFunction(String name) {
this.name = name;
}
@Override
public void run(SourceContext<Integer> sourceContext) throws Exception {
final int index = getRuntimeContext().getIndexOfThisSubtask();
final CountDownLatch startLatch = startLatches.computeIfAbsent(getId(index), ignored -> new CountDownLatch(1));
final CompletableFuture<Void> finishedFuture = finishedFutures.computeIfAbsent(getId(index), ignored -> new CompletableFuture<>());
startLatch.await();
int counter = 0;
while (running && counter < 10) {
synchronized (sourceContext.getCheckpointLock()) {
sourceContext.collect(counter++);
}
}
finishedFuture.complete(null);
}
@Override
public void cancel() {
running = false;
}
private String getId(int index) {
return name + '_' + index;
}
static void startExecution(MyControllableSourceFunction source, int index) {
final CountDownLatch startLatch = startLatches.computeIfAbsent(source.getId(index), ignored -> new CountDownLatch(1));
startLatch.countDown();
}
static CompletableFuture<Void> getFinishedFuture(MyControllableSourceFunction source, int index) {
return finishedFutures.computeIfAbsent(source.getId(index), ignored -> new CompletableFuture<>());
}
}
}
内容总结
以上是互联网集市为您收集整理的Java-Flink-多个源的集成测试全部内容,希望文章能够帮你解决Java-Flink-多个源的集成测试所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。