kafka-spark偏移量提交至redis kafka1.0版本
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了kafka-spark偏移量提交至redis kafka1.0版本,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含5811字,纯文字阅读大概需要9分钟。
内容图文
![kafka-spark偏移量提交至redis kafka1.0版本](/upload/InfoBanner/zyjiaocheng/879/7bb48ac220c849a7b0a52722c7b44fef.jpg)
kafka版本 1.0.0
spark版本 spark-streaming-kafka-0-10_2.11
class KafkaManagerByRedis(kafkaParams: Map[String, Object]) extends Serializable {
private val jedis = JedisUtil.getInstance().getJedis
/**
* def createDirectStream:InputDStream
**/
def createDirectStream[K: ClassTag, V: ClassTag](ssc: StreamingContext, topics: Seq[String]): InputDStream[ConsumerRecord[K, V]] = {
//1:readOffset
val groupId = kafkaParams("group.id").toString
val topic = topics(0)
val redisKey = topic + ":" + kafkaParams("group.id").toString
val offsetInfo = jedis.hgetAll(redisKey)
val stream: InputDStream[ConsumerRecord[K, V]] =
if ( offsetInfo.size() == 0) {
val newKafkaParams = mutable.Map[String, Object]()
newKafkaParams ++= kafkaParams
newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
// 程序第一次启动
KafkaUtils.createDirectStream[K, V](
ssc,
PreferConsistent,
Subscribe[K, V](topics, newKafkaParams)
)
} else {
// val topicPartition: Map[TopicPartition, Long] = readOffset(topic, groupId)
// //2:KafkaUtils.createDirectStream ---> InputDStream
// KafkaUtils.createDirectStream[K, V](
// ssc,
// PreferConsistent,
// ConsumerStrategies.Subscribe[K, V](topics, kafkaParams, topicPartition)
val newKafkaParams = mutable.Map[String, Object]()
newKafkaParams ++= kafkaParams
newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
// 程序第一次启动
KafkaUtils.createDirectStream[K, V](
ssc,
PreferConsistent,
Subscribe[K, V](topics, newKafkaParams)
)
}
stream
}
/**
* 读取偏移量
*
* @param topics
* @param groupId 消费组
* @return Map[car-1 , car-2 , Long]
**/
private def readOffset(topic: String, groupId: String): Map[TopicPartition, Long] = {
val topicPartitionMap = collection.mutable.HashMap.empty[TopicPartition, Long]
//拿topic和分区信息
/**
* 在redis中进行 存储 topic_consumerId:partition,offset
*/
val redisKey = topic + ":" + kafkaParams("group.id").toString
val map: util.Map[String, String] = jedis.hgetAll(redisKey)
val topicAndPartitionMaps: mutable.Map[String, String] = mapAsScalaMap(map)
topicAndPartitionMaps.foreach(partitionAndOffset => {
//将topic 、分区、偏移量返回出去
topicPartitionMap.put(new TopicPartition(topic, Integer.valueOf(partitionAndOffset._1.toInt)), partitionAndOffset._2.toLong)
})
//currentoffset 、 earliestoffset leatestOffset
//cur < ear || cur > leaty ==> 矫正--> ear
//TODO 矫正
val earliestOffsets = getEarliestOffsets(kafkaParams, topic)
val topics = List(topic)
val latestOffsets = getLatestOffsets(kafkaParams, topics)
for ((k, v) <- topicPartitionMap) {
val current = v
val earliest = earliestOffsets.get(k).get
val latest = latestOffsets.get(k).get
if (current < earliest || current > latest) {
topicPartitionMap.put(k, earliest)
}
}
topicPartitionMap.toMap
}
/**
* 获取最早的偏移量
*
* @param kafkaParams
* @param topics
* @return
*/
private def getEarliestOffsets(kafkaParams: Map[String, Object], topic: String) = {
val newKafkaParams = mutable.Map[String, Object]()
newKafkaParams ++= kafkaParams
newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
//kafka api
val consumer = new KafkaConsumer(kafkaParams)
//订阅
val topics = Seq[String](topic)
consumer.subscribe(topics)
val noOffsetForPartitionExceptionSet: mutable.Set[Nothing] = mutable.Set()
try {
consumer.poll(0)
} catch {
case e: NoOffsetForPartitionException =>
// noOffsetForPartitionExceptionSet.add(e.partition())
//邮件报警
}
//获取 分区信息
val topicp = consumer.assignment().toSet
//暂定消费
consumer.pause(topicp)
//从头开始
consumer.seekToBeginning(topicp)
val toMap = topicp.map(line => line -> consumer.position(line)).toMap
val earliestOffsetMap = toMap
consumer.unsubscribe()
consumer.close()
earliestOffsetMap
}
private def getLatestOffsets(kafkaParams: Map[String, Object], topic: Seq[String]) = {
val newKafkaParams = mutable.Map[String, Object]()
newKafkaParams ++= kafkaParams
newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
//kafka api
val consumer = new KafkaConsumer[String, Array[Byte]](newKafkaParams)
//订阅
consumer.subscribe(topic)
val noOffsetForPartitionExceptionSet = mutable.Set()
try {
consumer.poll(0)
} catch {
case e: NoOffsetForPartitionException =>
// noOffsetForPartitionExceptionSet.add(e.partition())
//邮件报警
}
//获取 分区信息
val topicp = consumer.assignment().toSet
//暂定消费
consumer.pause(topicp)
//从尾开始
consumer.seekToEnd(topicp)
val toMap: Map[TopicPartition, Long] = topicp.map(line => line -> consumer.position(line)).toMap
val earliestOffsetMap = toMap
consumer.unsubscribe()
consumer.close()
earliestOffsetMap
}
def persistOffset[K, V](rdd: RDD[ConsumerRecord[K, V]], storeOffset: Boolean = true, topic: String) = {
val groupId = kafkaParams("group.id").toString
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
offsetRanges.foreach(offsetRange => {
val redisKey = offsetRange.topic + "_" + groupId
val data = if (storeOffset) offsetRange.untilOffset else offsetRange.fromOffset
jedis.hset(redisKey, offsetRange.partition.toString, data.toString)
println("topic:" + offsetRange.topic + "分区:" + offsetRange.partition + "开始消费" + offsetRange.fromOffset + "消费到" + offsetRange.untilOffset + "共计" + offsetRange.count())
})
}
内容总结
以上是互联网集市为您收集整理的kafka-spark偏移量提交至redis kafka1.0版本全部内容,希望文章能够帮你解决kafka-spark偏移量提交至redis kafka1.0版本所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。