Spark Streaming - Direct API - Reusing Offset from Zookeeper

If you are using Spark Streaming Kafka Direct API and you are storing offsets in Zookeeper, you might want to reuse offsets from Zookeeper and not always rely on checkpoint.

KafkaUtils.createDirectStream() has a fromOffsets parameter which you can provide a list offsets for each topic/partition combination.

Here's a function to get the offsets from Zookeeper:

  def getFromOffsets(zk: String, topic: String, group: String, partitions: Int)(implicit logger: org.log4s.Logger): Option[Map[TopicAndPartition, Long]] = {
    val zkClient = new ZkClientImpl(zk, java.util.UUID.randomUUID.toString)

    val list = (0 to partitions - 1).toList.flatMap { i =>
      val nodePath = s"/consumers/$group/offsets/$topic/$i"

      try {
        zkClient.exists(nodePath) match {
          case true =>
            val maybeOffset = Option(zkClient.getRaw(nodePath))
            logger info s"Kafka Direct Stream - From Offset - ZK Node ($nodePath) - Value: $maybeOffset"
            maybeOffset.map { offset =>
              TopicAndPartition(topic, i) -> new String(offset).toLong
            }

          case false =>
            logger info s"Kafka Direct Stream - From Offset - ZK Node ($nodePath) does NOT exist"
            None
        }
      } catch {
        case scala.util.control.NonFatal(error) =>
          logger error s"Kafka Direct Stream - From Offset - ZK Node ($nodePath) - Error: $error"
          None
      }
    }

    (list.size =:= partitions) match {
      case true =>
        list.foreach {
          case (t, offset) =>
            logger info s"From Offset - Topic: ${t.topic}, Partition: ${t.partition}, Offset: ${offset}"
        }

        Some(list.toMap)

      case false =>
        logger info s"Current ZK offsets (${list.size} doesn't matc partition size, so gonna rely on Spark checkpoint to reuse Kafka offsets"
        None
    }
  }

Then you can use that to call KafkaUtils.createDirectStream:

val dstream: DStream[(String, Array[Byte])] = fromOffsets.map { fo =>  
        val messageHandler: MessageAndMetadata[String, Array[Byte]] => (String, Array[Byte]) = (mmd: MessageAndMetadata[String, Array[Byte]]) => (mmd.key, mmd.message)
        KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder, Tuple2[String, Array[Byte]]](ssc, kafkaParams, fo, messageHandler)
      } getOrElse KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc = ssc, kafkaParams = kafkaParams, topics = topicSet)

Enjoy!