Spark Streaming - Kafka Direct API - Store Offsets in ZK

If you are using Spark Streaming's new Kafka Direct API you might be missing some things, like monitoring tools that only work with offsets in ZK. For example, Kafka's kafka-consumer-offset-checker.sh which is very helpful to check consumer lag, meaning how behind a consumer is) -- you might find this helpful.

  • First "pimp" DStream
  implicit class PimpDStream[T: ClassTag](dstream: DStream[T]) extends Serializable {

    def foreachRDDWithOffsets(zkQuorum: String, name: String, group: String, topic: String)(f: RDD[T] => Unit)(implicit logger: org.log4s.Logger): Unit = {
      dstream.foreachRDD { rdd =>
        // Connect to ZK
        implicit val zk = new ZkClientImpl(zkQuorum, name)

        try {
          // Try to store offsets in ZK
          try {
            rdd.zkOffsets(group)
          } catch {
            case NonFatal(error) =>
              logger error "DStream Foreach RDD with Offset - Store Offsets - Error: $error"
          }

          // Do the work
          f(rdd)

        } catch {
          case NonFatal(error) =>
            logger.error "DStream Foreach RDD with Offset - Error: $error"
            throw error

        } finally {
          // Always cleanup ZK connections
          zk.close()
        }
      }
    }
  }
  • Now "pimp" RDD
implicit class PimpRDD[T: ClassTag](rdd: RDD[T]) {

    def zkOffsets(group: String)(implicit zk: ZkClientImpl, l: org.log4s.Logger): Unit = {
      rdd match {
        case offsetRdd: HasOffsetRanges =>
          val offsets = offsetRdd.offsetRanges
          offsets.foreach { o =>
            // Consumer Offset
            locally {
              val nodePath = s"/consumers/$group/offsets/${o.topic}/${o.partition}"

              logger info s"Kafka Direct Stream - Offset Range - Topic: ${o.topic}, Partition: ${o.partition}, From Offset: ${o.fromOffset}, To Offset: ${o.untilOffset}, ZK Node: $nodePath"

              zk.exists(nodePath) match {
                case true =>
                  logger info s"Kafka Direct Stream - Offset - ZK Node ($nodePath) exists, setting value: ${o.untilOffset}"
                  zk.setRaw(nodePath, o.untilOffset.toString.getBytes)

                case false =>
                  logger info s"Kafka Direct Stream - Offset - ZK Node ($nodePath) does NOT exist -- setting value: ${o.untilOffset}"
                  zk.createRaw(nodePath, o.untilOffset.toString.getBytes(), createMode = Some(CreateMode.PERSISTENT))
              }
            }

            val hostname = InetAddress.getLocalHost().getHostName()
            val ownerId = s"${group}-${hostname}-${o.partition}"
            val now = org.joda.time.DateTime.now.getMillis

            // Consumer Ids
            locally {
              val nodePath = s"/consumers/$group/ids/${ownerId}"
              val value = s"""{"version":1,"subscription":{"${o.topic}":${o.partition},"pattern":"white_list","timestamp":"$now"}"""

              zk.exists(nodePath) match {
                case true =>
                  logger info s"Kafka Direct Stream - Id - ZK Node ($nodePath) exists, setting value: ${value}"
                  zk.setRaw(nodePath, value.getBytes)

                case false =>
                  logger info s"Kafka Direct Stream - Id - ZK Node ($nodePath) does NOT exist -- setting value: ${value}"
                  zk.createRaw(nodePath, value.getBytes, createMode = Some(CreateMode.PERSISTENT))
              }
            }

            // Consumer Owners
            locally {
              val nodePath = s"/consumers/$group/owners/${o.topic}/${o.partition}"
              val value = ownerId

              zk.exists(nodePath) match {
                case true =>
                  logger info s"Kafka Direct Stream - Owner - ZK Node ($nodePath) exists, setting value: ${value}"
                  zk.setRaw(nodePath, value.getBytes)

                case false =>
                  logger info s"Kafka Direct Stream - Owner - ZK Node ($nodePath) does NOT exist -- setting value: ${value}"
                  zk.createRaw(nodePath, value.getBytes, createMode = Some(CreateMode.PERSISTENT))
              }
            }
          }

        case _ =>
          logger warn s"DStream - ZK Offsets - Cannot store on ZK since RDD is not of type of HasOffsetRanges: $rdd"
      }
    }
  }
  • Now you can finally use it in your job
val dstream = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, topicSet)

dstream.foreachRDDWithOffsets(zkQuorum, name, group, topic) { rdd =>  
  // apply your business logic here
}

Once you do that you can finally use Kafka's built-in consumer offset checker tool

bash $KAFKA_HOME/kafka-consumer-offset-checker.sh --broker-info kafka-host:9092 --zookeeper my-zk:2181 --group my-group  

You can also use these offsets from ZK when you call KafkaUtils.createDirectStream(), see parameters fromOffsets: Map[TopicAndPartition, Long].

If you want to reuse the offsets from Zookeeper instead of checkpoint, check out this other post.