flink对接kafka并且写进MySQL(只有代码)
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了flink对接kafka并且写进MySQL(只有代码),小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含2354字,纯文字阅读大概需要4分钟。
内容图文
![flink对接kafka并且写进MySQL(只有代码)](/upload/InfoBanner/zyjiaocheng/895/7dffbb5dbd0b4c73bc28786b46c496c4.jpg)
package ToMysql
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.{DataStream, SingleOutputStreamOperator}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
import java.util.Properties
object MysqlTest {
def main(args: Array[String]): Unit = {
// 创建环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 并行度为1
env.setParallelism(1)
// 连接kafka
val properties = new Properties()
properties.setProperty("bootstrap.servers","node2:9092")
properties.setProperty("zookeeper.connect","node2:2181")
properties.setProperty("group.id","jos")
// 消费kafka
val KafkaTest: DataStream[String] = env.addSource(new FlinkKafkaConsumer08[String]("test",new SimpleStringSchema(),properties))
KafkaTest.print()
val map: SingleOutputStreamOperator[student] = KafkaTest.map(new MapFunction[String, student]() {
override def map(value: String): student = {
val split: Array[String] = value.split(" ")
// new 一个表的实体类
val stu: student = new student
// 赋值
stu.setId(split(0))
stu.setName(split(1))
stu.setAge(split(2).toInt)
stu
}
})
map.addSink(new SinkToMySql())
env.execute("MyJdbcSink2")
}
class SinkToMySql() extends RichSinkFunction[student] {
var conn: Connection = null;
var ps: PreparedStatement = null
val driver = "com.mysql.jdbc.Driver"
val url: String = "jdbc:mysql://node01:3306/test"
val username = "root"
val password = "root"
val maxActive = "20"
//初始化的操作
override def open(parameters: Configuration): Unit = {
super.open(parameters)
super.open(parameters)
Class.forName("com.mysql.jdbc.Driver")
conn = DriverManager.getConnection(url, username, password)
conn.setAutoCommit(false)
}
//反复调用的函数
override def invoke(value: student): Unit = {
val sql: String = "insert into student(name,age) values(?,?)"
ps = conn.prepareStatement(sql)
//ps.setString(0,value.getId)
ps.setString(1,value.getName)
ps.setString(2,value.getAge.toString)
ps.execute()
conn.commit()
}
override def close(): Unit = {
super.close()
if (conn != null) {
conn.close()
}
}
}
}
内容总结
以上是互联网集市为您收集整理的flink对接kafka并且写进MySQL(只有代码)全部内容,希望文章能够帮你解决flink对接kafka并且写进MySQL(只有代码)所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。