Flink通过SQLClinet创建kafka源表并进行实时计算
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了Flink通过SQLClinet创建kafka源表并进行实时计算,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含1665字,纯文字阅读大概需要3分钟。
内容图文
![Flink通过SQLClinet创建kafka源表并进行实时计算](/upload/InfoBanner/zyjiaocheng/522/e4a12b7a6ffb43268e1a185e516c9295.jpg)
数据
{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} {"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} {"user_id": "662868", "item_id":"1784", "category_id": "54123654", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} {"user_id": "662854", "item_id":"1456", "category_id": "12345678", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} {"user_id": "662858", "item_id":"1457", "category_id": "12345679", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
2.在kafka进行消费
/bin/kafka-console-consumer.sh --bootstrap-server 192.168.58.177:9092 --topic my_topic --partition 0 --offset 0
3.在Flink的sqlclient 创建表
CREATE TABLE user_log1 ( user_id VARCHAR, item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts VARCHAR ) WITH ( ‘connector.type‘ = ‘kafka‘, ‘connector.version‘ = ‘universal‘, ‘connector.topic‘ = ‘my-topic-one‘, ‘connector.startup-mode‘ = ‘earliest-offset‘, ‘connector.properties.group.id‘ = ‘testGroup‘, ‘connector.properties.zookeeper.connect‘ = ‘192.168.58.171:2181,192.168.58.177:2181,192.168.58.178:2181‘, ‘connector.properties.bootstrap.servers‘ = ‘192.168.58.177:9092‘, ‘format.type‘ = ‘json‘ );
实时计算
select item_id,count(*) from user_log1 group by item_id;
Flink通过SQLClinet创建kafka源表并进行实时计算
标签:_id fse prope bootstra bin category bootstrap 技术 nec
本文系统来源:https://www.cnblogs.com/yaowentao/p/12668885.html
内容总结
以上是互联网集市为您收集整理的Flink通过SQLClinet创建kafka源表并进行实时计算全部内容,希望文章能够帮你解决Flink通过SQLClinet创建kafka源表并进行实时计算所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。