Flink 1.11.2 SQL 读写 MySQL
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了Flink 1.11.2 SQL 读写 MySQL,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含4191字,纯文字阅读大概需要6分钟。
内容图文
FlinkSQL读取MySQL大多用作维表关联, 聚合结果写入MySQL,简单记录一下用法。
添加依赖
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>1.11.2</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.20</version>
<!-- <scope>provided</scope>-->
</dependency>
package flinksql
import java.time.Duration
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.{CheckpointConfig, ExecutionCheckpointingOptions}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala._
import org.slf4j.LoggerFactory
import org.slf4j.event.Level
/**
* create by young
* date:20/12/6
* desc:
*/
object Demo02Mysql {
def main(args: Array[String]): Unit = {
// var logger: org.slf4j.Logger = LoggerFactory.getLogger(this.getClass.getSimpleName)
// org.apache.log4j.Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
// org.apache.log4j.Logger.getLogger("org.apache").setLevel(Level.INFO)
// org.apache.log4j.Logger.getLogger("io.debezium").setLevel(Level.INFO)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
// 失败重启,固定间隔,每隔3秒重启1次,总尝试重启10次
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 3))
// 本地测试线程 1
env.setParallelism(1)
// 事件处理的时间,由系统时间决定
// env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
// checkpoint 设置
val tableConfig = tEnv.getConfig.getConfiguration
// 开启checkpoint
tableConfig.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE)
// checkpoint的超时时间周期,1 分钟做一次checkpoint, 每次checkpoint 完成后 sink 才会执行
tableConfig.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(60))
// checkpoint的超时时间, 检查点一分钟内没有完成将被丢弃
tableConfig.set(ExecutionCheckpointingOptions.CHECKPOINTING_TIMEOUT, Duration.ofSeconds(60))
// checkpoint 最小间隔,两个检查点之间至少间隔 30 秒
tableConfig.set(ExecutionCheckpointingOptions.MIN_PAUSE_BETWEEN_CHECKPOINTS, Duration.ofSeconds(30))
// 同一时间只允许进行一个检查点
tableConfig.set(ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS, Integer.valueOf(1))
// 手动cancel时是否保留checkpoint
tableConfig.set(ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT, CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
/**
* mysql 源表
*/
val mysqlSourceSql =
"""
|create table mysqlSourceTable (
| id int,
| name string,
| gender string,
| age int
|) with (
| 'connector' = 'jdbc',
| 'url' = 'jdbc:mysql://localhost:3306/spark?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&serverTimezone=UTC',
| 'username' = 'root',
| 'password' = 'root',
| 'table-name' = 'student',
| 'driver' = 'com.mysql.cj.jdbc.Driver',
| 'scan.fetch-size' = '200'
|)
""".stripMargin
/**
* mysql sink
*/
val printSinkSql =
"""
|create table printSinkTable (
| id int,
| name string,
| gender string,
| age int
|) with (
| 'connector' = 'print'
|)
""".stripMargin
val writeMysqlTable =
"""
|create table writeMysqlTable (
|id int,
|name string,
|gender string,
|age int
|) with (
| 'connector' = 'jdbc',
| 'url' = 'jdbc:mysql://localhost:3306/spark?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&serverTimezone=UTC',
| 'username' = 'root',
| 'password' = 'root',
| 'table-name' = 'student',
| 'driver' = 'com.mysql.cj.jdbc.Driver',
| 'sink.buffer-flush.interval' = '3s',
| 'sink.buffer-flush.max-rows' = '1',
| 'sink.max-retries' = '5'
|)
""".stripMargin
var insertSql = "insert into printSinkTable select * from mysqlSourceTable "
insertSql = "insert into writeMysqlTable select * from mysqlSourceTable"
tEnv.executeSql(mysqlSourceSql)
tEnv.executeSql(writeMysqlTable)
// tEnv.executeSql(printSinkSql)
tEnv.executeSql(insertSql)
// tEnv.executeSql("select * from mysqlSourceTable").print()
tEnv.executeSql("select * from writeMysqlTable").print()
}
}
内容总结
以上是互联网集市为您收集整理的Flink 1.11.2 SQL 读写 MySQL全部内容,希望文章能够帮你解决Flink 1.11.2 SQL 读写 MySQL所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。