kafka版本 1.0.0

spark版本 spark-streaming-kafka-0-10\_2.11

class KafkaManagerByRedis(kafkaParams: Map[String, Object]) extends Serializable {<br></br><br></br><br></br>  private val jedis = JedisUtil.getInstance().getJedis<br></br>  <br></br>  /**<br></br>    * def createDirectStream:InputDStream<br></br>    **/<br></br><br></br>  def createDirectStream[K: ClassTag, V: ClassTag](ssc: StreamingContext, topics: Seq[String]): InputDStream[ConsumerRecord[K, V]] = {<br></br>    //1:readOffset<br></br>    val groupId = kafkaParams("group.id").toString<br></br>    val topic = topics(0)<br></br>    val redisKey = topic + ":" + kafkaParams("group.id").toString<br></br><br></br>    val offsetInfo = jedis.hgetAll(redisKey)<br></br>    val stream: InputDStream[ConsumerRecord[K, V]] =<br></br>      if ( offsetInfo.size() == 0) {<br></br>        val newKafkaParams = mutable.Map[String, Object]()<br></br>        newKafkaParams ++= kafkaParams<br></br>        newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")<br></br>        //     程序第一次启动<br></br>        KafkaUtils.createDirectStream[K, V](<br></br>          ssc,<br></br>          PreferConsistent,<br></br>          Subscribe[K, V](topics, newKafkaParams)<br></br>        )<br></br>      } else {<br></br>//        val topicPartition: Map[TopicPartition, Long] = readOffset(topic, groupId)<br></br>//        //2:KafkaUtils.createDirectStream ---> InputDStream<br></br>//        KafkaUtils.createDirectStream[K, V](<br></br>//          ssc,<br></br>//          PreferConsistent,<br></br>//          ConsumerStrategies.Subscribe[K, V](topics, kafkaParams, topicPartition)<br></br><br></br>        val newKafkaParams = mutable.Map[String, Object]()<br></br>        newKafkaParams ++= kafkaParams<br></br>        newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")<br></br>        //     程序第一次启动<br></br>        KafkaUtils.createDirectStream[K, V](<br></br>          ssc,<br></br>          PreferConsistent,<br></br>          Subscribe[K, V](topics, newKafkaParams)<br></br>        )<br></br>      }<br></br><br></br>    stream<br></br>  }<br></br><br></br>  /**<br></br>    * 读取偏移量<br></br>    *<br></br>    * @param topics<br></br>    * @param groupId 消费组<br></br>    * @return Map[car-1 , car-2 , Long]<br></br>    **/<br></br><br></br>  private def readOffset(topic: String, groupId: String): Map[TopicPartition, Long] = {<br></br>    val topicPartitionMap = collection.mutable.HashMap.empty[TopicPartition, Long]<br></br>    //拿topic和分区信息<br></br>    /**<br></br>      * 在redis中进行 存储 topic_consumerId:partition,offset<br></br>      */<br></br>    val redisKey = topic + ":" + kafkaParams("group.id").toString<br></br>    val map: util.Map[String, String] = jedis.hgetAll(redisKey)<br></br>    val topicAndPartitionMaps: mutable.Map[String, String] = mapAsScalaMap(map)<br></br>    topicAndPartitionMaps.foreach(partitionAndOffset => {<br></br>      //将topic 、分区、偏移量返回出去<br></br>      topicPartitionMap.put(new TopicPartition(topic, Integer.valueOf(partitionAndOffset._1.toInt)), partitionAndOffset._2.toLong)<br></br>    })<br></br>    //currentoffset 、 earliestoffset  leatestOffset<br></br>    //cur < ear || cur > leaty ==> 矫正-->  ear<br></br>    //TODO 矫正<br></br>    val earliestOffsets = getEarliestOffsets(kafkaParams, topic)<br></br>    val topics = List(topic)<br></br>    val latestOffsets = getLatestOffsets(kafkaParams, topics)<br></br>    for ((k, v) <- topicPartitionMap) {<br></br>      val current = v<br></br>      val earliest = earliestOffsets.get(k).get<br></br>      val latest = latestOffsets.get(k).get<br></br>      if (current < earliest || current > latest) {<br></br>        topicPartitionMap.put(k, earliest)<br></br>      }<br></br>    }<br></br>    topicPartitionMap.toMap<br></br>  }<br></br><br></br><br></br>  /**<br></br>    * 获取最早的偏移量<br></br>    *<br></br>    * @param kafkaParams<br></br>    * @param topics<br></br>    * @return<br></br>    */<br></br>  private def getEarliestOffsets(kafkaParams: Map[String, Object], topic: String) = {<br></br>    val newKafkaParams = mutable.Map[String, Object]()<br></br>    newKafkaParams ++= kafkaParams<br></br>    newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")<br></br>    //kafka api<br></br>    val consumer = new KafkaConsumer(kafkaParams)<br></br>    //订阅<br></br>    val topics = Seq[String](topic)<br></br>    consumer.subscribe(topics)<br></br>    val noOffsetForPartitionExceptionSet: mutable.Set[Nothing] = mutable.Set()<br></br>    try {<br></br>      consumer.poll(0)<br></br>    } catch {<br></br>      case e: NoOffsetForPartitionException =><br></br>      //        noOffsetForPartitionExceptionSet.add(e.partition())<br></br>      //邮件报警<br></br>    }<br></br>    //获取 分区信息<br></br>    val topicp = consumer.assignment().toSet<br></br>    //暂定消费<br></br>    consumer.pause(topicp)<br></br>    //从头开始<br></br>    consumer.seekToBeginning(topicp)<br></br>    val toMap = topicp.map(line => line -> consumer.position(line)).toMap<br></br>    val earliestOffsetMap = toMap<br></br>    consumer.unsubscribe()<br></br>    consumer.close()<br></br>    earliestOffsetMap<br></br>  }<br></br><br></br><br></br>  private def getLatestOffsets(kafkaParams: Map[String, Object], topic: Seq[String]) = {<br></br>    val newKafkaParams = mutable.Map[String, Object]()<br></br>    newKafkaParams ++= kafkaParams<br></br>    newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")<br></br><br></br>    //kafka api<br></br>    val consumer = new KafkaConsumer[String, Array[Byte]](newKafkaParams)<br></br>    //订阅<br></br>    consumer.subscribe(topic)<br></br>    val noOffsetForPartitionExceptionSet = mutable.Set()<br></br>    try {<br></br>      consumer.poll(0)<br></br>    } catch {<br></br>      case e: NoOffsetForPartitionException =><br></br>      //        noOffsetForPartitionExceptionSet.add(e.partition())<br></br>      //邮件报警<br></br>    }<br></br>    //获取 分区信息<br></br>    val topicp = consumer.assignment().toSet<br></br>    //暂定消费<br></br>    consumer.pause(topicp)<br></br>    //从尾开始<br></br>    consumer.seekToEnd(topicp)<br></br>    val toMap: Map[TopicPartition, Long] = topicp.map(line => line -> consumer.position(line)).toMap<br></br>    val earliestOffsetMap = toMap<br></br>    consumer.unsubscribe()<br></br>    consumer.close()<br></br>    earliestOffsetMap<br></br>  }<br></br><br></br><br></br>  def persistOffset[K, V](rdd: RDD[ConsumerRecord[K, V]], storeOffset: Boolean = true, topic: String) = {<br></br><br></br>    val groupId = kafkaParams("group.id").toString<br></br><br></br>    val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges<br></br><br></br>    offsetRanges.foreach(offsetRange => {<br></br>      val redisKey = offsetRange.topic + "_" + groupId<br></br>      val data = if (storeOffset) offsetRange.untilOffset else offsetRange.fromOffset<br></br>      jedis.hset(redisKey, offsetRange.partition.toString, data.toString)<br></br>      println("topic:" + offsetRange.topic + "分区:" + offsetRange.partition + "开始消费" + offsetRange.fromOffset + "消费到" + offsetRange.untilOffset + "共计" + offsetRange.count())<br></br>    })<br></br><br></br><br></br>  }

标签: String, kafka, redis, val, consumer, 偏移量, topic, newKafkaParams, kafkaParams

相关文章推荐

添加新评论,含*的栏目为必填