kafka-spark偏移量提交至redis kafka1.0版本教程
kafka版本 1.0.0
spark版本 spark-streaming-kafka-0-10\_2.11
class KafkaManagerByRedis(kafkaParams: Map[String, Object]) extends Serializable {<br></br><br></br><br></br> private val jedis = JedisUtil.getInstance().getJedis<br></br> <br></br> /**<br></br> * def createDirectStream:InputDStream<br></br> **/<br></br><br></br> def createDirectStream[K: ClassTag, V: ClassTag](ssc: StreamingContext, topics: Seq[String]): InputDStream[ConsumerRecord[K, V]] = {<br></br> //1:readOffset<br></br> val groupId = kafkaParams("group.id").toString<br></br> val topic = topics(0)<br></br> val redisKey = topic + ":" + kafkaParams("group.id").toString<br></br><br></br> val offsetInfo = jedis.hgetAll(redisKey)<br></br> val stream: InputDStream[ConsumerRecord[K, V]] =<br></br> if ( offsetInfo.size() == 0) {<br></br> val newKafkaParams = mutable.Map[String, Object]()<br></br> newKafkaParams ++= kafkaParams<br></br> newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")<br></br> // 程序第一次启动<br></br> KafkaUtils.createDirectStream[K, V](<br></br> ssc,<br></br> PreferConsistent,<br></br> Subscribe[K, V](topics, newKafkaParams)<br></br> )<br></br> } else {<br></br>// val topicPartition: Map[TopicPartition, Long] = readOffset(topic, groupId)<br></br>// //2:KafkaUtils.createDirectStream ---> InputDStream<br></br>// KafkaUtils.createDirectStream[K, V](<br></br>// ssc,<br></br>// PreferConsistent,<br></br>// ConsumerStrategies.Subscribe[K, V](topics, kafkaParams, topicPartition)<br></br><br></br> val newKafkaParams = mutable.Map[String, Object]()<br></br> newKafkaParams ++= kafkaParams<br></br> newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")<br></br> // 程序第一次启动<br></br> KafkaUtils.createDirectStream[K, V](<br></br> ssc,<br></br> PreferConsistent,<br></br> Subscribe[K, V](topics, newKafkaParams)<br></br> )<br></br> }<br></br><br></br> stream<br></br> }<br></br><br></br> /**<br></br> * 读取偏移量<br></br> *<br></br> * @param topics<br></br> * @param groupId 消费组<br></br> * @return Map[car-1 , car-2 , Long]<br></br> **/<br></br><br></br> private def readOffset(topic: String, groupId: String): Map[TopicPartition, Long] = {<br></br> val topicPartitionMap = collection.mutable.HashMap.empty[TopicPartition, Long]<br></br> //拿topic和分区信息<br></br> /**<br></br> * 在redis中进行 存储 topic_consumerId:partition,offset<br></br> */<br></br> val redisKey = topic + ":" + kafkaParams("group.id").toString<br></br> val map: util.Map[String, String] = jedis.hgetAll(redisKey)<br></br> val topicAndPartitionMaps: mutable.Map[String, String] = mapAsScalaMap(map)<br></br> topicAndPartitionMaps.foreach(partitionAndOffset => {<br></br> //将topic 、分区、偏移量返回出去<br></br> topicPartitionMap.put(new TopicPartition(topic, Integer.valueOf(partitionAndOffset._1.toInt)), partitionAndOffset._2.toLong)<br></br> })<br></br> //currentoffset 、 earliestoffset leatestOffset<br></br> //cur < ear || cur > leaty ==> 矫正--> ear<br></br> //TODO 矫正<br></br> val earliestOffsets = getEarliestOffsets(kafkaParams, topic)<br></br> val topics = List(topic)<br></br> val latestOffsets = getLatestOffsets(kafkaParams, topics)<br></br> for ((k, v) <- topicPartitionMap) {<br></br> val current = v<br></br> val earliest = earliestOffsets.get(k).get<br></br> val latest = latestOffsets.get(k).get<br></br> if (current < earliest || current > latest) {<br></br> topicPartitionMap.put(k, earliest)<br></br> }<br></br> }<br></br> topicPartitionMap.toMap<br></br> }<br></br><br></br><br></br> /**<br></br> * 获取最早的偏移量<br></br> *<br></br> * @param kafkaParams<br></br> * @param topics<br></br> * @return<br></br> */<br></br> private def getEarliestOffsets(kafkaParams: Map[String, Object], topic: String) = {<br></br> val newKafkaParams = mutable.Map[String, Object]()<br></br> newKafkaParams ++= kafkaParams<br></br> newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")<br></br> //kafka api<br></br> val consumer = new KafkaConsumer(kafkaParams)<br></br> //订阅<br></br> val topics = Seq[String](topic)<br></br> consumer.subscribe(topics)<br></br> val noOffsetForPartitionExceptionSet: mutable.Set[Nothing] = mutable.Set()<br></br> try {<br></br> consumer.poll(0)<br></br> } catch {<br></br> case e: NoOffsetForPartitionException =><br></br> // noOffsetForPartitionExceptionSet.add(e.partition())<br></br> //邮件报警<br></br> }<br></br> //获取 分区信息<br></br> val topicp = consumer.assignment().toSet<br></br> //暂定消费<br></br> consumer.pause(topicp)<br></br> //从头开始<br></br> consumer.seekToBeginning(topicp)<br></br> val toMap = topicp.map(line => line -> consumer.position(line)).toMap<br></br> val earliestOffsetMap = toMap<br></br> consumer.unsubscribe()<br></br> consumer.close()<br></br> earliestOffsetMap<br></br> }<br></br><br></br><br></br> private def getLatestOffsets(kafkaParams: Map[String, Object], topic: Seq[String]) = {<br></br> val newKafkaParams = mutable.Map[String, Object]()<br></br> newKafkaParams ++= kafkaParams<br></br> newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")<br></br><br></br> //kafka api<br></br> val consumer = new KafkaConsumer[String, Array[Byte]](newKafkaParams)<br></br> //订阅<br></br> consumer.subscribe(topic)<br></br> val noOffsetForPartitionExceptionSet = mutable.Set()<br></br> try {<br></br> consumer.poll(0)<br></br> } catch {<br></br> case e: NoOffsetForPartitionException =><br></br> // noOffsetForPartitionExceptionSet.add(e.partition())<br></br> //邮件报警<br></br> }<br></br> //获取 分区信息<br></br> val topicp = consumer.assignment().toSet<br></br> //暂定消费<br></br> consumer.pause(topicp)<br></br> //从尾开始<br></br> consumer.seekToEnd(topicp)<br></br> val toMap: Map[TopicPartition, Long] = topicp.map(line => line -> consumer.position(line)).toMap<br></br> val earliestOffsetMap = toMap<br></br> consumer.unsubscribe()<br></br> consumer.close()<br></br> earliestOffsetMap<br></br> }<br></br><br></br><br></br> def persistOffset[K, V](rdd: RDD[ConsumerRecord[K, V]], storeOffset: Boolean = true, topic: String) = {<br></br><br></br> val groupId = kafkaParams("group.id").toString<br></br><br></br> val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges<br></br><br></br> offsetRanges.foreach(offsetRange => {<br></br> val redisKey = offsetRange.topic + "_" + groupId<br></br> val data = if (storeOffset) offsetRange.untilOffset else offsetRange.fromOffset<br></br> jedis.hset(redisKey, offsetRange.partition.toString, data.toString)<br></br> println("topic:" + offsetRange.topic + "分区:" + offsetRange.partition + "开始消费" + offsetRange.fromOffset + "消费到" + offsetRange.untilOffset + "共计" + offsetRange.count())<br></br> })<br></br><br></br><br></br> }