首页 / JAVA / Java操作storm入门
Java操作storm入门
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了Java操作storm入门,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含3846字,纯文字阅读大概需要6分钟。
内容图文
![Java操作storm入门](/upload/InfoBanner/zyjiaocheng/599/c20631128e6c4f5ebe87e8c358176497.jpg)
1.导入依赖
<!--https://mvnrepository.com/artifact/org.apache.storm/storm-core-->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>2.2.0</version>
</dependency>
2.创建 spout 继承 BaseRichSpout
public class WordSpout extends BaseRichSpout {
//模拟数据来源
String[] init_data = {"hello java", "hello python", "hello C++", "hello scala"};
/**
* 放射方法在里面,应该在nextTuple中调用,可以把他提出来在初始化中赋值
*/
private SpoutOutputCollector collector;
/**
* 初始化方法,只执行一次
* @param map
* @param topologyContext
* @param spoutOutputCollector
*/
public void open(Map<String, Object> map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
collector = spoutOutputCollector;
}
/**
* 死循环,storm 内部一直在调用
* 数据来源 kafka flume ...
*/
public void nextTuple() {
//拿数据
String init_datum = init_data[new Random().nextInt(init_data.length)];
//拆分
String[] split = init_datum.split(" ");
//循环发射到 bolt 中
for (String str:split){
// List list = Arrays.asList(str);
// collector.emit(list);
//第二种
collector.emit(new Values(str));
}
}
/**
* 定义发射出去,tuple 的字段名
* @param outputFieldsDeclarer
*/
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word"));
}
}
3.创建求和 bolt 继承 BaseRichBolt
public class WordBolt extends BaseRichBolt {
/**
* 临时解决方案 结果集
*/
private Map<String, Long> resMap;
private OutputCollector collector;
public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {
resMap = new HashMap<String, Long>();
collector = outputCollector;
}
public void execute(Tuple tuple) {
//根据字段名拿到每一个 tuple
String word = tuple.getStringByField("word");
//给每一个单词次数累加
Long time = resMap.get(word);
if (time != null){
resMap.put(word, time + 1L);
}else {
resMap.put(word, 1L);
}
//发射
collector.emit(new Values(resMap));
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("resMap"));
}
}
4.创建输出 bolt
public class PrintBolt extends BaseRichBolt {
public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {
}
public void execute(Tuple tuple) {
//取值
Map<String, Long> resMap = (Map<String, Long>)tuple.getValueByField("resMap");
//处理
for (String key:resMap.keySet()){
System.out.println(key + " ==> " + resMap.get(key));
}
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
5.创建 topology
public class WordTopoigy {
public static void main(String[] args) throws Exception {
//调用主的 Api
TopologyBuilder builder = new TopologyBuilder();
//关联 spout bolt
builder.setSpout("spout01", new WordSpout());
builder.setBolt("count01", new WordBolt())
//关联线(放射方向)
.shuffleGrouping("spout01");
builder.setBolt("print01", new PrintBolt()).shuffleGrouping("count01");
//本地发布 开发时用
LocalCluster cluster = new LocalCluster();
LocalCluster.LocalTopology topology01 = cluster.submitTopology("topology01",
new HashMap<String, Object>(), builder.createTopology());
}
}
内容总结
以上是互联网集市为您收集整理的Java操作storm入门全部内容,希望文章能够帮你解决Java操作storm入门所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。