aboutsummaryrefslogtreecommitdiff
path: root/external/kafka/src
diff options
context:
space:
mode:
authorcody koeninger <cody@koeninger.org>2016-05-11 12:15:41 -0700
committerReynold Xin <rxin@databricks.com>2016-05-11 12:15:41 -0700
commit89e67d6667d5f8be9c6fb6c120fbcd350ae2950d (patch)
tree670699f20dcc785e1889c8c1afc4db1a0b2b11ee /external/kafka/src
parent6d0368ab8d1043735e5fe89f801aae1c6826876c (diff)
downloadspark-89e67d6667d5f8be9c6fb6c120fbcd350ae2950d.tar.gz
spark-89e67d6667d5f8be9c6fb6c120fbcd350ae2950d.tar.bz2
spark-89e67d6667d5f8be9c6fb6c120fbcd350ae2950d.zip
[SPARK-15085][STREAMING][KAFKA] Rename streaming-kafka artifact
## What changes were proposed in this pull request? Renaming the streaming-kafka artifact to include kafka version, in anticipation of needing a different artifact for later kafka versions ## How was this patch tested? Unit tests Author: cody koeninger <cody@koeninger.org> Closes #12946 from koeninger/SPARK-15085.
Diffstat (limited to 'external/kafka/src')
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala66
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala227
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala425
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala142
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala269
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala42
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala275
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala805
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala109
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala302
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package-info.java21
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package.scala23
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java175
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java156
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java135
-rw-r--r--external/kafka/src/test/resources/log4j.properties28
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala531
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala81
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala175
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala84
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala148
21 files changed, 0 insertions, 4219 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala
deleted file mode 100644
index 9159051ba0..0000000000
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Broker.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import org.apache.spark.annotation.Experimental
-
-/**
- * Represents the host and port info for a Kafka broker.
- * Differs from the Kafka project's internal kafka.cluster.Broker, which contains a server ID.
- */
-final class Broker private(
- /** Broker's hostname */
- val host: String,
- /** Broker's port */
- val port: Int) extends Serializable {
- override def equals(obj: Any): Boolean = obj match {
- case that: Broker =>
- this.host == that.host &&
- this.port == that.port
- case _ => false
- }
-
- override def hashCode: Int = {
- 41 * (41 + host.hashCode) + port
- }
-
- override def toString(): String = {
- s"Broker($host, $port)"
- }
-}
-
-/**
- * :: Experimental ::
- * Companion object that provides methods to create instances of [[Broker]].
- */
-@Experimental
-object Broker {
- def create(host: String, port: Int): Broker =
- new Broker(host, port)
-
- def apply(host: String, port: Int): Broker =
- new Broker(host, port)
-
- def unapply(broker: Broker): Option[(String, Int)] = {
- if (broker == null) {
- None
- } else {
- Some((broker.host, broker.port))
- }
- }
-}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
deleted file mode 100644
index fb58ed7898..0000000000
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import scala.annotation.tailrec
-import scala.collection.mutable
-import scala.reflect.ClassTag
-
-import kafka.common.TopicAndPartition
-import kafka.message.MessageAndMetadata
-import kafka.serializer.Decoder
-
-import org.apache.spark.SparkException
-import org.apache.spark.internal.Logging
-import org.apache.spark.streaming.{StreamingContext, Time}
-import org.apache.spark.streaming.dstream._
-import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
-import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo}
-import org.apache.spark.streaming.scheduler.rate.RateEstimator
-
-/**
- * A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
- * each given Kafka topic/partition corresponds to an RDD partition.
- * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number
- * of messages
- * per second that each '''partition''' will accept.
- * Starting offsets are specified in advance,
- * and this DStream is not responsible for committing offsets,
- * so that you can control exactly-once semantics.
- * For an easy interface to Kafka-managed offsets,
- * see {@link org.apache.spark.streaming.kafka.KafkaCluster}
- * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
- * configuration parameters</a>.
- * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
- * NOT zookeeper servers, specified in host1:port1,host2:port2 form.
- * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive)
- * starting point of the stream
- * @param messageHandler function for translating each message into the desired type
- */
-private[streaming]
-class DirectKafkaInputDStream[
- K: ClassTag,
- V: ClassTag,
- U <: Decoder[K]: ClassTag,
- T <: Decoder[V]: ClassTag,
- R: ClassTag](
- _ssc: StreamingContext,
- val kafkaParams: Map[String, String],
- val fromOffsets: Map[TopicAndPartition, Long],
- messageHandler: MessageAndMetadata[K, V] => R
- ) extends InputDStream[R](_ssc) with Logging {
- val maxRetries = context.sparkContext.getConf.getInt(
- "spark.streaming.kafka.maxRetries", 1)
-
- // Keep this consistent with how other streams are named (e.g. "Flume polling stream [2]")
- private[streaming] override def name: String = s"Kafka direct stream [$id]"
-
- protected[streaming] override val checkpointData =
- new DirectKafkaInputDStreamCheckpointData
-
-
- /**
- * Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker.
- */
- override protected[streaming] val rateController: Option[RateController] = {
- if (RateController.isBackPressureEnabled(ssc.conf)) {
- Some(new DirectKafkaRateController(id,
- RateEstimator.create(ssc.conf, context.graph.batchDuration)))
- } else {
- None
- }
- }
-
- protected val kc = new KafkaCluster(kafkaParams)
-
- private val maxRateLimitPerPartition: Int = context.sparkContext.getConf.getInt(
- "spark.streaming.kafka.maxRatePerPartition", 0)
-
- protected[streaming] def maxMessagesPerPartition(
- offsets: Map[TopicAndPartition, Long]): Option[Map[TopicAndPartition, Long]] = {
- val estimatedRateLimit = rateController.map(_.getLatestRate().toInt)
-
- // calculate a per-partition rate limit based on current lag
- val effectiveRateLimitPerPartition = estimatedRateLimit.filter(_ > 0) match {
- case Some(rate) =>
- val lagPerPartition = offsets.map { case (tp, offset) =>
- tp -> Math.max(offset - currentOffsets(tp), 0)
- }
- val totalLag = lagPerPartition.values.sum
-
- lagPerPartition.map { case (tp, lag) =>
- val backpressureRate = Math.round(lag / totalLag.toFloat * rate)
- tp -> (if (maxRateLimitPerPartition > 0) {
- Math.min(backpressureRate, maxRateLimitPerPartition)} else backpressureRate)
- }
- case None => offsets.map { case (tp, offset) => tp -> maxRateLimitPerPartition }
- }
-
- if (effectiveRateLimitPerPartition.values.sum > 0) {
- val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
- Some(effectiveRateLimitPerPartition.map {
- case (tp, limit) => tp -> (secsPerBatch * limit).toLong
- })
- } else {
- None
- }
- }
-
- protected var currentOffsets = fromOffsets
-
- @tailrec
- protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {
- val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)
- // Either.fold would confuse @tailrec, do it manually
- if (o.isLeft) {
- val err = o.left.get.toString
- if (retries <= 0) {
- throw new SparkException(err)
- } else {
- log.error(err)
- Thread.sleep(kc.config.refreshLeaderBackoffMs)
- latestLeaderOffsets(retries - 1)
- }
- } else {
- o.right.get
- }
- }
-
- // limits the maximum number of messages per partition
- protected def clamp(
- leaderOffsets: Map[TopicAndPartition, LeaderOffset]): Map[TopicAndPartition, LeaderOffset] = {
- val offsets = leaderOffsets.mapValues(lo => lo.offset)
-
- maxMessagesPerPartition(offsets).map { mmp =>
- mmp.map { case (tp, messages) =>
- val lo = leaderOffsets(tp)
- tp -> lo.copy(offset = Math.min(currentOffsets(tp) + messages, lo.offset))
- }
- }.getOrElse(leaderOffsets)
- }
-
- override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
- val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
- val rdd = KafkaRDD[K, V, U, T, R](
- context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)
-
- // Report the record number and metadata of this batch interval to InputInfoTracker.
- val offsetRanges = currentOffsets.map { case (tp, fo) =>
- val uo = untilOffsets(tp)
- OffsetRange(tp.topic, tp.partition, fo, uo.offset)
- }
- val description = offsetRanges.filter { offsetRange =>
- // Don't display empty ranges.
- offsetRange.fromOffset != offsetRange.untilOffset
- }.map { offsetRange =>
- s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
- s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
- }.mkString("\n")
- // Copy offsetRanges to immutable.List to prevent from being modified by the user
- val metadata = Map(
- "offsets" -> offsetRanges.toList,
- StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
- val inputInfo = StreamInputInfo(id, rdd.count, metadata)
- ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
-
- currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
- Some(rdd)
- }
-
- override def start(): Unit = {
- }
-
- def stop(): Unit = {
- }
-
- private[streaming]
- class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) {
- def batchForTime: mutable.HashMap[Time, Array[(String, Int, Long, Long)]] = {
- data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]]
- }
-
- override def update(time: Time) {
- batchForTime.clear()
- generatedRDDs.foreach { kv =>
- val a = kv._2.asInstanceOf[KafkaRDD[K, V, U, T, R]].offsetRanges.map(_.toTuple).toArray
- batchForTime += kv._1 -> a
- }
- }
-
- override def cleanup(time: Time) { }
-
- override def restore() {
- // this is assuming that the topics don't change during execution, which is true currently
- val topics = fromOffsets.keySet
- val leaders = KafkaCluster.checkErrors(kc.findLeaders(topics))
-
- batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) =>
- logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", "]")}")
- generatedRDDs += t -> new KafkaRDD[K, V, U, T, R](
- context.sparkContext, kafkaParams, b.map(OffsetRange(_)), leaders, messageHandler)
- }
- }
- }
-
- /**
- * A RateController to retrieve the rate from RateEstimator.
- */
- private[streaming] class DirectKafkaRateController(id: Int, estimator: RateEstimator)
- extends RateController(id, estimator) {
- override def publish(rate: Long): Unit = ()
- }
-}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
deleted file mode 100644
index 726b5d8ec3..0000000000
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala
+++ /dev/null
@@ -1,425 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import java.util.Properties
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-import scala.util.Random
-import scala.util.control.NonFatal
-
-import kafka.api._
-import kafka.common.{ErrorMapping, OffsetAndMetadata, OffsetMetadataAndError, TopicAndPartition}
-import kafka.consumer.{ConsumerConfig, SimpleConsumer}
-
-import org.apache.spark.SparkException
-import org.apache.spark.annotation.DeveloperApi
-
-/**
- * :: DeveloperApi ::
- * Convenience methods for interacting with a Kafka cluster.
- * See <a href="https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol">
- * A Guide To The Kafka Protocol</a> for more details on individual api calls.
- * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
- * configuration parameters</a>.
- * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
- * NOT zookeeper servers, specified in host1:port1,host2:port2 form
- */
-@DeveloperApi
-class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
- import KafkaCluster.{Err, LeaderOffset, SimpleConsumerConfig}
-
- // ConsumerConfig isn't serializable
- @transient private var _config: SimpleConsumerConfig = null
-
- def config: SimpleConsumerConfig = this.synchronized {
- if (_config == null) {
- _config = SimpleConsumerConfig(kafkaParams)
- }
- _config
- }
-
- def connect(host: String, port: Int): SimpleConsumer =
- new SimpleConsumer(host, port, config.socketTimeoutMs,
- config.socketReceiveBufferBytes, config.clientId)
-
- def connectLeader(topic: String, partition: Int): Either[Err, SimpleConsumer] =
- findLeader(topic, partition).right.map(hp => connect(hp._1, hp._2))
-
- // Metadata api
- // scalastyle:off
- // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
- // scalastyle:on
-
- def findLeader(topic: String, partition: Int): Either[Err, (String, Int)] = {
- val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion,
- 0, config.clientId, Seq(topic))
- val errs = new Err
- withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
- val resp: TopicMetadataResponse = consumer.send(req)
- resp.topicsMetadata.find(_.topic == topic).flatMap { tm: TopicMetadata =>
- tm.partitionsMetadata.find(_.partitionId == partition)
- }.foreach { pm: PartitionMetadata =>
- pm.leader.foreach { leader =>
- return Right((leader.host, leader.port))
- }
- }
- }
- Left(errs)
- }
-
- def findLeaders(
- topicAndPartitions: Set[TopicAndPartition]
- ): Either[Err, Map[TopicAndPartition, (String, Int)]] = {
- val topics = topicAndPartitions.map(_.topic)
- val response = getPartitionMetadata(topics).right
- val answer = response.flatMap { tms: Set[TopicMetadata] =>
- val leaderMap = tms.flatMap { tm: TopicMetadata =>
- tm.partitionsMetadata.flatMap { pm: PartitionMetadata =>
- val tp = TopicAndPartition(tm.topic, pm.partitionId)
- if (topicAndPartitions(tp)) {
- pm.leader.map { l =>
- tp -> (l.host -> l.port)
- }
- } else {
- None
- }
- }
- }.toMap
-
- if (leaderMap.keys.size == topicAndPartitions.size) {
- Right(leaderMap)
- } else {
- val missing = topicAndPartitions.diff(leaderMap.keySet)
- val err = new Err
- err.append(new SparkException(s"Couldn't find leaders for ${missing}"))
- Left(err)
- }
- }
- answer
- }
-
- def getPartitions(topics: Set[String]): Either[Err, Set[TopicAndPartition]] = {
- getPartitionMetadata(topics).right.map { r =>
- r.flatMap { tm: TopicMetadata =>
- tm.partitionsMetadata.map { pm: PartitionMetadata =>
- TopicAndPartition(tm.topic, pm.partitionId)
- }
- }
- }
- }
-
- def getPartitionMetadata(topics: Set[String]): Either[Err, Set[TopicMetadata]] = {
- val req = TopicMetadataRequest(
- TopicMetadataRequest.CurrentVersion, 0, config.clientId, topics.toSeq)
- val errs = new Err
- withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
- val resp: TopicMetadataResponse = consumer.send(req)
- val respErrs = resp.topicsMetadata.filter(m => m.errorCode != ErrorMapping.NoError)
-
- if (respErrs.isEmpty) {
- return Right(resp.topicsMetadata.toSet)
- } else {
- respErrs.foreach { m =>
- val cause = ErrorMapping.exceptionFor(m.errorCode)
- val msg = s"Error getting partition metadata for '${m.topic}'. Does the topic exist?"
- errs.append(new SparkException(msg, cause))
- }
- }
- }
- Left(errs)
- }
-
- // Leader offset api
- // scalastyle:off
- // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI
- // scalastyle:on
-
- def getLatestLeaderOffsets(
- topicAndPartitions: Set[TopicAndPartition]
- ): Either[Err, Map[TopicAndPartition, LeaderOffset]] =
- getLeaderOffsets(topicAndPartitions, OffsetRequest.LatestTime)
-
- def getEarliestLeaderOffsets(
- topicAndPartitions: Set[TopicAndPartition]
- ): Either[Err, Map[TopicAndPartition, LeaderOffset]] =
- getLeaderOffsets(topicAndPartitions, OffsetRequest.EarliestTime)
-
- def getLeaderOffsets(
- topicAndPartitions: Set[TopicAndPartition],
- before: Long
- ): Either[Err, Map[TopicAndPartition, LeaderOffset]] = {
- getLeaderOffsets(topicAndPartitions, before, 1).right.map { r =>
- r.map { kv =>
- // mapValues isn't serializable, see SI-7005
- kv._1 -> kv._2.head
- }
- }
- }
-
- private def flip[K, V](m: Map[K, V]): Map[V, Seq[K]] =
- m.groupBy(_._2).map { kv =>
- kv._1 -> kv._2.keys.toSeq
- }
-
- def getLeaderOffsets(
- topicAndPartitions: Set[TopicAndPartition],
- before: Long,
- maxNumOffsets: Int
- ): Either[Err, Map[TopicAndPartition, Seq[LeaderOffset]]] = {
- findLeaders(topicAndPartitions).right.flatMap { tpToLeader =>
- val leaderToTp: Map[(String, Int), Seq[TopicAndPartition]] = flip(tpToLeader)
- val leaders = leaderToTp.keys
- var result = Map[TopicAndPartition, Seq[LeaderOffset]]()
- val errs = new Err
- withBrokers(leaders, errs) { consumer =>
- val partitionsToGetOffsets: Seq[TopicAndPartition] =
- leaderToTp((consumer.host, consumer.port))
- val reqMap = partitionsToGetOffsets.map { tp: TopicAndPartition =>
- tp -> PartitionOffsetRequestInfo(before, maxNumOffsets)
- }.toMap
- val req = OffsetRequest(reqMap)
- val resp = consumer.getOffsetsBefore(req)
- val respMap = resp.partitionErrorAndOffsets
- partitionsToGetOffsets.foreach { tp: TopicAndPartition =>
- respMap.get(tp).foreach { por: PartitionOffsetsResponse =>
- if (por.error == ErrorMapping.NoError) {
- if (por.offsets.nonEmpty) {
- result += tp -> por.offsets.map { off =>
- LeaderOffset(consumer.host, consumer.port, off)
- }
- } else {
- errs.append(new SparkException(
- s"Empty offsets for ${tp}, is ${before} before log beginning?"))
- }
- } else {
- errs.append(ErrorMapping.exceptionFor(por.error))
- }
- }
- }
- if (result.keys.size == topicAndPartitions.size) {
- return Right(result)
- }
- }
- val missing = topicAndPartitions.diff(result.keySet)
- errs.append(new SparkException(s"Couldn't find leader offsets for ${missing}"))
- Left(errs)
- }
- }
-
- // Consumer offset api
- // scalastyle:off
- // https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
- // scalastyle:on
-
- // this 0 here indicates api version, in this case the original ZK backed api.
- private def defaultConsumerApiVersion: Short = 0
-
- /** Requires Kafka >= 0.8.1.1. Defaults to the original ZooKeeper backed api version. */
- def getConsumerOffsets(
- groupId: String,
- topicAndPartitions: Set[TopicAndPartition]
- ): Either[Err, Map[TopicAndPartition, Long]] =
- getConsumerOffsets(groupId, topicAndPartitions, defaultConsumerApiVersion)
-
- def getConsumerOffsets(
- groupId: String,
- topicAndPartitions: Set[TopicAndPartition],
- consumerApiVersion: Short
- ): Either[Err, Map[TopicAndPartition, Long]] = {
- getConsumerOffsetMetadata(groupId, topicAndPartitions, consumerApiVersion).right.map { r =>
- r.map { kv =>
- kv._1 -> kv._2.offset
- }
- }
- }
-
- /** Requires Kafka >= 0.8.1.1. Defaults to the original ZooKeeper backed api version. */
- def getConsumerOffsetMetadata(
- groupId: String,
- topicAndPartitions: Set[TopicAndPartition]
- ): Either[Err, Map[TopicAndPartition, OffsetMetadataAndError]] =
- getConsumerOffsetMetadata(groupId, topicAndPartitions, defaultConsumerApiVersion)
-
- def getConsumerOffsetMetadata(
- groupId: String,
- topicAndPartitions: Set[TopicAndPartition],
- consumerApiVersion: Short
- ): Either[Err, Map[TopicAndPartition, OffsetMetadataAndError]] = {
- var result = Map[TopicAndPartition, OffsetMetadataAndError]()
- val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq, consumerApiVersion)
- val errs = new Err
- withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
- val resp = consumer.fetchOffsets(req)
- val respMap = resp.requestInfo
- val needed = topicAndPartitions.diff(result.keySet)
- needed.foreach { tp: TopicAndPartition =>
- respMap.get(tp).foreach { ome: OffsetMetadataAndError =>
- if (ome.error == ErrorMapping.NoError) {
- result += tp -> ome
- } else {
- errs.append(ErrorMapping.exceptionFor(ome.error))
- }
- }
- }
- if (result.keys.size == topicAndPartitions.size) {
- return Right(result)
- }
- }
- val missing = topicAndPartitions.diff(result.keySet)
- errs.append(new SparkException(s"Couldn't find consumer offsets for ${missing}"))
- Left(errs)
- }
-
- /** Requires Kafka >= 0.8.1.1. Defaults to the original ZooKeeper backed api version. */
- def setConsumerOffsets(
- groupId: String,
- offsets: Map[TopicAndPartition, Long]
- ): Either[Err, Map[TopicAndPartition, Short]] =
- setConsumerOffsets(groupId, offsets, defaultConsumerApiVersion)
-
- def setConsumerOffsets(
- groupId: String,
- offsets: Map[TopicAndPartition, Long],
- consumerApiVersion: Short
- ): Either[Err, Map[TopicAndPartition, Short]] = {
- val meta = offsets.map { kv =>
- kv._1 -> OffsetAndMetadata(kv._2)
- }
- setConsumerOffsetMetadata(groupId, meta, consumerApiVersion)
- }
-
- /** Requires Kafka >= 0.8.1.1. Defaults to the original ZooKeeper backed api version. */
- def setConsumerOffsetMetadata(
- groupId: String,
- metadata: Map[TopicAndPartition, OffsetAndMetadata]
- ): Either[Err, Map[TopicAndPartition, Short]] =
- setConsumerOffsetMetadata(groupId, metadata, defaultConsumerApiVersion)
-
- def setConsumerOffsetMetadata(
- groupId: String,
- metadata: Map[TopicAndPartition, OffsetAndMetadata],
- consumerApiVersion: Short
- ): Either[Err, Map[TopicAndPartition, Short]] = {
- var result = Map[TopicAndPartition, Short]()
- val req = OffsetCommitRequest(groupId, metadata, consumerApiVersion)
- val errs = new Err
- val topicAndPartitions = metadata.keySet
- withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
- val resp = consumer.commitOffsets(req)
- val respMap = resp.commitStatus
- val needed = topicAndPartitions.diff(result.keySet)
- needed.foreach { tp: TopicAndPartition =>
- respMap.get(tp).foreach { err: Short =>
- if (err == ErrorMapping.NoError) {
- result += tp -> err
- } else {
- errs.append(ErrorMapping.exceptionFor(err))
- }
- }
- }
- if (result.keys.size == topicAndPartitions.size) {
- return Right(result)
- }
- }
- val missing = topicAndPartitions.diff(result.keySet)
- errs.append(new SparkException(s"Couldn't set offsets for ${missing}"))
- Left(errs)
- }
-
- // Try a call against potentially multiple brokers, accumulating errors
- private def withBrokers(brokers: Iterable[(String, Int)], errs: Err)
- (fn: SimpleConsumer => Any): Unit = {
- brokers.foreach { hp =>
- var consumer: SimpleConsumer = null
- try {
- consumer = connect(hp._1, hp._2)
- fn(consumer)
- } catch {
- case NonFatal(e) =>
- errs.append(e)
- } finally {
- if (consumer != null) {
- consumer.close()
- }
- }
- }
- }
-}
-
-@DeveloperApi
-object KafkaCluster {
- type Err = ArrayBuffer[Throwable]
-
- /** If the result is right, return it, otherwise throw SparkException */
- def checkErrors[T](result: Either[Err, T]): T = {
- result.fold(
- errs => throw new SparkException(errs.mkString("\n")),
- ok => ok
- )
- }
-
- case class LeaderOffset(host: String, port: Int, offset: Long)
-
- /**
- * High-level kafka consumers connect to ZK. ConsumerConfig assumes this use case.
- * Simple consumers connect directly to brokers, but need many of the same configs.
- * This subclass won't warn about missing ZK params, or presence of broker params.
- */
- class SimpleConsumerConfig private(brokers: String, originalProps: Properties)
- extends ConsumerConfig(originalProps) {
- val seedBrokers: Array[(String, Int)] = brokers.split(",").map { hp =>
- val hpa = hp.split(":")
- if (hpa.size == 1) {
- throw new SparkException(s"Broker not in the correct format of <host>:<port> [$brokers]")
- }
- (hpa(0), hpa(1).toInt)
- }
- }
-
- object SimpleConsumerConfig {
- /**
- * Make a consumer config without requiring group.id or zookeeper.connect,
- * since communicating with brokers also needs common settings such as timeout
- */
- def apply(kafkaParams: Map[String, String]): SimpleConsumerConfig = {
- // These keys are from other pre-existing kafka configs for specifying brokers, accept either
- val brokers = kafkaParams.get("metadata.broker.list")
- .orElse(kafkaParams.get("bootstrap.servers"))
- .getOrElse(throw new SparkException(
- "Must specify metadata.broker.list or bootstrap.servers"))
-
- val props = new Properties()
- kafkaParams.foreach { case (key, value) =>
- // prevent warnings on parameters ConsumerConfig doesn't know about
- if (key != "metadata.broker.list" && key != "bootstrap.servers") {
- props.put(key, value)
- }
- }
-
- Seq("zookeeper.connect", "group.id").foreach { s =>
- if (!props.containsKey(s)) {
- props.setProperty(s, "")
- }
- }
-
- new SimpleConsumerConfig(brokers, props)
- }
- }
-}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
deleted file mode 100644
index 3713bda41b..0000000000
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import java.util.Properties
-
-import scala.collection.Map
-import scala.reflect.{classTag, ClassTag}
-
-import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream}
-import kafka.serializer.Decoder
-import kafka.utils.VerifiableProperties
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.dstream._
-import org.apache.spark.streaming.receiver.Receiver
-import org.apache.spark.util.ThreadUtils
-
-/**
- * Input stream that pulls messages from a Kafka Broker.
- *
- * @param kafkaParams Map of kafka configuration parameters.
- * See: http://kafka.apache.org/configuration.html
- * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
- * in its own thread.
- * @param storageLevel RDD storage level.
- */
-private[streaming]
-class KafkaInputDStream[
- K: ClassTag,
- V: ClassTag,
- U <: Decoder[_]: ClassTag,
- T <: Decoder[_]: ClassTag](
- _ssc: StreamingContext,
- kafkaParams: Map[String, String],
- topics: Map[String, Int],
- useReliableReceiver: Boolean,
- storageLevel: StorageLevel
- ) extends ReceiverInputDStream[(K, V)](_ssc) with Logging {
-
- def getReceiver(): Receiver[(K, V)] = {
- if (!useReliableReceiver) {
- new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
- } else {
- new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
- }
- }
-}
-
-private[streaming]
-class KafkaReceiver[
- K: ClassTag,
- V: ClassTag,
- U <: Decoder[_]: ClassTag,
- T <: Decoder[_]: ClassTag](
- kafkaParams: Map[String, String],
- topics: Map[String, Int],
- storageLevel: StorageLevel
- ) extends Receiver[(K, V)](storageLevel) with Logging {
-
- // Connection to Kafka
- var consumerConnector: ConsumerConnector = null
-
- def onStop() {
- if (consumerConnector != null) {
- consumerConnector.shutdown()
- consumerConnector = null
- }
- }
-
- def onStart() {
-
- logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id"))
-
- // Kafka connection properties
- val props = new Properties()
- kafkaParams.foreach(param => props.put(param._1, param._2))
-
- val zkConnect = kafkaParams("zookeeper.connect")
- // Create the connection to the cluster
- logInfo("Connecting to Zookeeper: " + zkConnect)
- val consumerConfig = new ConsumerConfig(props)
- consumerConnector = Consumer.create(consumerConfig)
- logInfo("Connected to " + zkConnect)
-
- val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
- .newInstance(consumerConfig.props)
- .asInstanceOf[Decoder[K]]
- val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
- .newInstance(consumerConfig.props)
- .asInstanceOf[Decoder[V]]
-
- // Create threads for each topic/message Stream we are listening
- val topicMessageStreams = consumerConnector.createMessageStreams(
- topics, keyDecoder, valueDecoder)
-
- val executorPool =
- ThreadUtils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler")
- try {
- // Start the messages handler for each partition
- topicMessageStreams.values.foreach { streams =>
- streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
- }
- } finally {
- executorPool.shutdown() // Just causes threads to terminate after work is done
- }
- }
-
- // Handles Kafka messages
- private class MessageHandler(stream: KafkaStream[K, V])
- extends Runnable {
- def run() {
- logInfo("Starting MessageHandler.")
- try {
- val streamIterator = stream.iterator()
- while (streamIterator.hasNext()) {
- val msgAndMetadata = streamIterator.next()
- store((msgAndMetadata.key, msgAndMetadata.message))
- }
- } catch {
- case e: Throwable => reportError("Error handling message; exiting", e)
- }
- }
- }
-}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
deleted file mode 100644
index d4881b140d..0000000000
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import scala.collection.mutable.ArrayBuffer
-import scala.reflect.{classTag, ClassTag}
-
-import kafka.api.{FetchRequestBuilder, FetchResponse}
-import kafka.common.{ErrorMapping, TopicAndPartition}
-import kafka.consumer.SimpleConsumer
-import kafka.message.{MessageAndMetadata, MessageAndOffset}
-import kafka.serializer.Decoder
-import kafka.utils.VerifiableProperties
-
-import org.apache.spark.{Partition, SparkContext, SparkException, TaskContext}
-import org.apache.spark.internal.Logging
-import org.apache.spark.partial.{BoundedDouble, PartialResult}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.util.NextIterator
-
-/**
- * A batch-oriented interface for consuming from Kafka.
- * Starting and ending offsets are specified in advance,
- * so that you can control exactly-once semantics.
- * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
- * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" to be set
- * with Kafka broker(s) specified in host1:port1,host2:port2 form.
- * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD
- * @param messageHandler function for translating each message into the desired type
- */
-private[kafka]
-class KafkaRDD[
- K: ClassTag,
- V: ClassTag,
- U <: Decoder[_]: ClassTag,
- T <: Decoder[_]: ClassTag,
- R: ClassTag] private[spark] (
- sc: SparkContext,
- kafkaParams: Map[String, String],
- val offsetRanges: Array[OffsetRange],
- leaders: Map[TopicAndPartition, (String, Int)],
- messageHandler: MessageAndMetadata[K, V] => R
- ) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges {
- override def getPartitions: Array[Partition] = {
- offsetRanges.zipWithIndex.map { case (o, i) =>
- val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))
- new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)
- }.toArray
- }
-
- override def count(): Long = offsetRanges.map(_.count).sum
-
- override def countApprox(
- timeout: Long,
- confidence: Double = 0.95
- ): PartialResult[BoundedDouble] = {
- val c = count
- new PartialResult(new BoundedDouble(c, 1.0, c, c), true)
- }
-
- override def isEmpty(): Boolean = count == 0L
-
- override def take(num: Int): Array[R] = {
- val nonEmptyPartitions = this.partitions
- .map(_.asInstanceOf[KafkaRDDPartition])
- .filter(_.count > 0)
-
- if (num < 1 || nonEmptyPartitions.isEmpty) {
- return new Array[R](0)
- }
-
- // Determine in advance how many messages need to be taken from each partition
- val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, part) =>
- val remain = num - result.values.sum
- if (remain > 0) {
- val taken = Math.min(remain, part.count)
- result + (part.index -> taken.toInt)
- } else {
- result
- }
- }
-
- val buf = new ArrayBuffer[R]
- val res = context.runJob(
- this,
- (tc: TaskContext, it: Iterator[R]) => it.take(parts(tc.partitionId)).toArray,
- parts.keys.toArray)
- res.foreach(buf ++= _)
- buf.toArray
- }
-
- override def getPreferredLocations(thePart: Partition): Seq[String] = {
- val part = thePart.asInstanceOf[KafkaRDDPartition]
- // TODO is additional hostname resolution necessary here
- Seq(part.host)
- }
-
- private def errBeginAfterEnd(part: KafkaRDDPartition): String =
- s"Beginning offset ${part.fromOffset} is after the ending offset ${part.untilOffset} " +
- s"for topic ${part.topic} partition ${part.partition}. " +
- "You either provided an invalid fromOffset, or the Kafka topic has been damaged"
-
- private def errRanOutBeforeEnd(part: KafkaRDDPartition): String =
- s"Ran out of messages before reaching ending offset ${part.untilOffset} " +
- s"for topic ${part.topic} partition ${part.partition} start ${part.fromOffset}." +
- " This should not happen, and indicates that messages may have been lost"
-
- private def errOvershotEnd(itemOffset: Long, part: KafkaRDDPartition): String =
- s"Got ${itemOffset} > ending offset ${part.untilOffset} " +
- s"for topic ${part.topic} partition ${part.partition} start ${part.fromOffset}." +
- " This should not happen, and indicates a message may have been skipped"
-
- override def compute(thePart: Partition, context: TaskContext): Iterator[R] = {
- val part = thePart.asInstanceOf[KafkaRDDPartition]
- assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
- if (part.fromOffset == part.untilOffset) {
- log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset " +
- s"skipping ${part.topic} ${part.partition}")
- Iterator.empty
- } else {
- new KafkaRDDIterator(part, context)
- }
- }
-
- private class KafkaRDDIterator(
- part: KafkaRDDPartition,
- context: TaskContext) extends NextIterator[R] {
-
- context.addTaskCompletionListener{ context => closeIfNeeded() }
-
- log.info(s"Computing topic ${part.topic}, partition ${part.partition} " +
- s"offsets ${part.fromOffset} -> ${part.untilOffset}")
-
- val kc = new KafkaCluster(kafkaParams)
- val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
- .newInstance(kc.config.props)
- .asInstanceOf[Decoder[K]]
- val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
- .newInstance(kc.config.props)
- .asInstanceOf[Decoder[V]]
- val consumer = connectLeader
- var requestOffset = part.fromOffset
- var iter: Iterator[MessageAndOffset] = null
-
- // The idea is to use the provided preferred host, except on task retry attempts,
- // to minimize number of kafka metadata requests
- private def connectLeader: SimpleConsumer = {
- if (context.attemptNumber > 0) {
- kc.connectLeader(part.topic, part.partition).fold(
- errs => throw new SparkException(
- s"Couldn't connect to leader for topic ${part.topic} ${part.partition}: " +
- errs.mkString("\n")),
- consumer => consumer
- )
- } else {
- kc.connect(part.host, part.port)
- }
- }
-
- private def handleFetchErr(resp: FetchResponse) {
- if (resp.hasError) {
- val err = resp.errorCode(part.topic, part.partition)
- if (err == ErrorMapping.LeaderNotAvailableCode ||
- err == ErrorMapping.NotLeaderForPartitionCode) {
- log.error(s"Lost leader for topic ${part.topic} partition ${part.partition}, " +
- s" sleeping for ${kc.config.refreshLeaderBackoffMs}ms")
- Thread.sleep(kc.config.refreshLeaderBackoffMs)
- }
- // Let normal rdd retry sort out reconnect attempts
- throw ErrorMapping.exceptionFor(err)
- }
- }
-
- private def fetchBatch: Iterator[MessageAndOffset] = {
- val req = new FetchRequestBuilder()
- .addFetch(part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes)
- .build()
- val resp = consumer.fetch(req)
- handleFetchErr(resp)
- // kafka may return a batch that starts before the requested offset
- resp.messageSet(part.topic, part.partition)
- .iterator
- .dropWhile(_.offset < requestOffset)
- }
-
- override def close(): Unit = {
- if (consumer != null) {
- consumer.close()
- }
- }
-
- override def getNext(): R = {
- if (iter == null || !iter.hasNext) {
- iter = fetchBatch
- }
- if (!iter.hasNext) {
- assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))
- finished = true
- null.asInstanceOf[R]
- } else {
- val item = iter.next()
- if (item.offset >= part.untilOffset) {
- assert(item.offset == part.untilOffset, errOvershotEnd(item.offset, part))
- finished = true
- null.asInstanceOf[R]
- } else {
- requestOffset = item.nextOffset
- messageHandler(new MessageAndMetadata(
- part.topic, part.partition, item.message, item.offset, keyDecoder, valueDecoder))
- }
- }
- }
- }
-}
-
-private[kafka]
-object KafkaRDD {
- import KafkaCluster.LeaderOffset
-
- /**
- * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
- * configuration parameters</a>.
- * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
- * NOT zookeeper servers, specified in host1:port1,host2:port2 form.
- * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive)
- * starting point of the batch
- * @param untilOffsets per-topic/partition Kafka offsets defining the (exclusive)
- * ending point of the batch
- * @param messageHandler function for translating each message into the desired type
- */
- def apply[
- K: ClassTag,
- V: ClassTag,
- U <: Decoder[_]: ClassTag,
- T <: Decoder[_]: ClassTag,
- R: ClassTag](
- sc: SparkContext,
- kafkaParams: Map[String, String],
- fromOffsets: Map[TopicAndPartition, Long],
- untilOffsets: Map[TopicAndPartition, LeaderOffset],
- messageHandler: MessageAndMetadata[K, V] => R
- ): KafkaRDD[K, V, U, T, R] = {
- val leaders = untilOffsets.map { case (tp, lo) =>
- tp -> (lo.host, lo.port)
- }.toMap
-
- val offsetRanges = fromOffsets.map { case (tp, fo) =>
- val uo = untilOffsets(tp)
- OffsetRange(tp.topic, tp.partition, fo, uo.offset)
- }.toArray
-
- new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaders, messageHandler)
- }
-}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala
deleted file mode 100644
index 02917becf0..0000000000
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import org.apache.spark.Partition
-
-/**
- * @param topic kafka topic name
- * @param partition kafka partition id
- * @param fromOffset inclusive starting offset
- * @param untilOffset exclusive ending offset
- * @param host preferred kafka host, i.e. the leader at the time the rdd was created
- * @param port preferred kafka host's port
- */
-private[kafka]
-class KafkaRDDPartition(
- val index: Int,
- val topic: String,
- val partition: Int,
- val fromOffset: Long,
- val untilOffset: Long,
- val host: String,
- val port: Int
-) extends Partition {
- /** Number of messages this partition refers to */
- def count(): Long = untilOffset - fromOffset
-}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
deleted file mode 100644
index d9d4240c05..0000000000
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
+++ /dev/null
@@ -1,275 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import java.io.File
-import java.lang.{Integer => JInt}
-import java.net.InetSocketAddress
-import java.util.{Map => JMap, Properties}
-import java.util.concurrent.TimeoutException
-
-import scala.annotation.tailrec
-import scala.collection.JavaConverters._
-import scala.language.postfixOps
-import scala.util.control.NonFatal
-
-import kafka.admin.AdminUtils
-import kafka.api.Request
-import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
-import kafka.serializer.StringEncoder
-import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.{ZKStringSerializer, ZkUtils}
-import org.I0Itec.zkclient.ZkClient
-import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
-
-import org.apache.spark.SparkConf
-import org.apache.spark.internal.Logging
-import org.apache.spark.streaming.Time
-import org.apache.spark.util.Utils
-
-/**
- * This is a helper class for Kafka test suites. This has the functionality to set up
- * and tear down local Kafka servers, and to push data using Kafka producers.
- *
- * The reason to put Kafka test utility class in src is to test Python related Kafka APIs.
- */
-private[kafka] class KafkaTestUtils extends Logging {
-
- // Zookeeper related configurations
- private val zkHost = "localhost"
- private var zkPort: Int = 0
- private val zkConnectionTimeout = 60000
- private val zkSessionTimeout = 6000
-
- private var zookeeper: EmbeddedZookeeper = _
-
- private var zkClient: ZkClient = _
-
- // Kafka broker related configurations
- private val brokerHost = "localhost"
- private var brokerPort = 9092
- private var brokerConf: KafkaConfig = _
-
- // Kafka broker server
- private var server: KafkaServer = _
-
- // Kafka producer
- private var producer: Producer[String, String] = _
-
- // Flag to test whether the system is correctly started
- private var zkReady = false
- private var brokerReady = false
-
- def zkAddress: String = {
- assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper address")
- s"$zkHost:$zkPort"
- }
-
- def brokerAddress: String = {
- assert(brokerReady, "Kafka not setup yet or already torn down, cannot get broker address")
- s"$brokerHost:$brokerPort"
- }
-
- def zookeeperClient: ZkClient = {
- assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get zookeeper client")
- Option(zkClient).getOrElse(
- throw new IllegalStateException("Zookeeper client is not yet initialized"))
- }
-
- // Set up the Embedded Zookeeper server and get the proper Zookeeper port
- private def setupEmbeddedZookeeper(): Unit = {
- // Zookeeper server startup
- zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
- // Get the actual zookeeper binding port
- zkPort = zookeeper.actualPort
- zkClient = new ZkClient(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout,
- ZKStringSerializer)
- zkReady = true
- }
-
- // Set up the Embedded Kafka server
- private def setupEmbeddedKafkaServer(): Unit = {
- assert(zkReady, "Zookeeper should be set up beforehand")
-
- // Kafka broker startup
- Utils.startServiceOnPort(brokerPort, port => {
- brokerPort = port
- brokerConf = new KafkaConfig(brokerConfiguration)
- server = new KafkaServer(brokerConf)
- server.startup()
- (server, port)
- }, new SparkConf(), "KafkaBroker")
-
- brokerReady = true
- }
-
- /** setup the whole embedded servers, including Zookeeper and Kafka brokers */
- def setup(): Unit = {
- setupEmbeddedZookeeper()
- setupEmbeddedKafkaServer()
- }
-
- /** Teardown the whole servers, including Kafka broker and Zookeeper */
- def teardown(): Unit = {
- brokerReady = false
- zkReady = false
-
- if (producer != null) {
- producer.close()
- producer = null
- }
-
- if (server != null) {
- server.shutdown()
- server = null
- }
-
- brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) }
-
- if (zkClient != null) {
- zkClient.close()
- zkClient = null
- }
-
- if (zookeeper != null) {
- zookeeper.shutdown()
- zookeeper = null
- }
- }
-
- /** Create a Kafka topic and wait until it is propagated to the whole cluster */
- def createTopic(topic: String, partitions: Int): Unit = {
- AdminUtils.createTopic(zkClient, topic, partitions, 1)
- // wait until metadata is propagated
- (0 until partitions).foreach { p => waitUntilMetadataIsPropagated(topic, p) }
- }
-
- /** Single-argument version for backwards compatibility */
- def createTopic(topic: String): Unit = createTopic(topic, 1)
-
- /** Java-friendly function for sending messages to the Kafka broker */
- def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = {
- sendMessages(topic, Map(messageToFreq.asScala.mapValues(_.intValue()).toSeq: _*))
- }
-
- /** Send the messages to the Kafka broker */
- def sendMessages(topic: String, messageToFreq: Map[String, Int]): Unit = {
- val messages = messageToFreq.flatMap { case (s, freq) => Seq.fill(freq)(s) }.toArray
- sendMessages(topic, messages)
- }
-
- /** Send the array of messages to the Kafka broker */
- def sendMessages(topic: String, messages: Array[String]): Unit = {
- producer = new Producer[String, String](new ProducerConfig(producerConfiguration))
- producer.send(messages.map { new KeyedMessage[String, String](topic, _ ) }: _*)
- producer.close()
- producer = null
- }
-
- private def brokerConfiguration: Properties = {
- val props = new Properties()
- props.put("broker.id", "0")
- props.put("host.name", "localhost")
- props.put("port", brokerPort.toString)
- props.put("log.dir", Utils.createTempDir().getAbsolutePath)
- props.put("zookeeper.connect", zkAddress)
- props.put("log.flush.interval.messages", "1")
- props.put("replica.socket.timeout.ms", "1500")
- props
- }
-
- private def producerConfiguration: Properties = {
- val props = new Properties()
- props.put("metadata.broker.list", brokerAddress)
- props.put("serializer.class", classOf[StringEncoder].getName)
- // wait for all in-sync replicas to ack sends
- props.put("request.required.acks", "-1")
- props
- }
-
- // A simplified version of scalatest eventually, rewritten here to avoid adding extra test
- // dependency
- def eventually[T](timeout: Time, interval: Time)(func: => T): T = {
- def makeAttempt(): Either[Throwable, T] = {
- try {
- Right(func)
- } catch {
- case e if NonFatal(e) => Left(e)
- }
- }
-
- val startTime = System.currentTimeMillis()
- @tailrec
- def tryAgain(attempt: Int): T = {
- makeAttempt() match {
- case Right(result) => result
- case Left(e) =>
- val duration = System.currentTimeMillis() - startTime
- if (duration < timeout.milliseconds) {
- Thread.sleep(interval.milliseconds)
- } else {
- throw new TimeoutException(e.getMessage)
- }
-
- tryAgain(attempt + 1)
- }
- }
-
- tryAgain(1)
- }
-
- private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = {
- def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match {
- case Some(partitionState) =>
- val leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
-
- ZkUtils.getLeaderForPartition(zkClient, topic, partition).isDefined &&
- Request.isValidBrokerId(leaderAndInSyncReplicas.leader) &&
- leaderAndInSyncReplicas.isr.size >= 1
-
- case _ =>
- false
- }
- eventually(Time(10000), Time(100)) {
- assert(isPropagated, s"Partition [$topic, $partition] metadata not propagated after timeout")
- }
- }
-
- private class EmbeddedZookeeper(val zkConnect: String) {
- val snapshotDir = Utils.createTempDir()
- val logDir = Utils.createTempDir()
-
- val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500)
- val (ip, port) = {
- val splits = zkConnect.split(":")
- (splits(0), splits(1).toInt)
- }
- val factory = new NIOServerCnxnFactory()
- factory.configure(new InetSocketAddress(ip, port), 16)
- factory.startup(zookeeper)
-
- val actualPort = factory.getLocalPort
-
- def shutdown() {
- factory.shutdown()
- Utils.deleteRecursively(snapshotDir)
- Utils.deleteRecursively(logDir)
- }
- }
-}
-
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
deleted file mode 100644
index edaafb912c..0000000000
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ /dev/null
@@ -1,805 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import java.io.OutputStream
-import java.lang.{Integer => JInt, Long => JLong}
-import java.nio.charset.StandardCharsets
-import java.util.{List => JList, Map => JMap, Set => JSet}
-
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-
-import kafka.common.TopicAndPartition
-import kafka.message.MessageAndMetadata
-import kafka.serializer.{Decoder, DefaultDecoder, StringDecoder}
-import net.razorvine.pickle.{IObjectPickler, Opcodes, Pickler}
-
-import org.apache.spark.{SparkContext, SparkException}
-import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
-import org.apache.spark.api.java.function.{Function => JFunction}
-import org.apache.spark.api.python.SerDeUtil
-import org.apache.spark.rdd.RDD
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java._
-import org.apache.spark.streaming.dstream.{DStream, InputDStream, ReceiverInputDStream}
-import org.apache.spark.streaming.util.WriteAheadLogUtils
-
-object KafkaUtils {
- /**
- * Create an input stream that pulls messages from Kafka Brokers.
- * @param ssc StreamingContext object
- * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
- * @param groupId The group id for this consumer
- * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
- * in its own thread
- * @param storageLevel Storage level to use for storing the received objects
- * (default: StorageLevel.MEMORY_AND_DISK_SER_2)
- * @return DStream of (Kafka message key, Kafka message value)
- */
- def createStream(
- ssc: StreamingContext,
- zkQuorum: String,
- groupId: String,
- topics: Map[String, Int],
- storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): ReceiverInputDStream[(String, String)] = {
- val kafkaParams = Map[String, String](
- "zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
- "zookeeper.connection.timeout.ms" -> "10000")
- createStream[String, String, StringDecoder, StringDecoder](
- ssc, kafkaParams, topics, storageLevel)
- }
-
- /**
- * Create an input stream that pulls messages from Kafka Brokers.
- * @param ssc StreamingContext object
- * @param kafkaParams Map of kafka configuration parameters,
- * see http://kafka.apache.org/08/configuration.html
- * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
- * in its own thread.
- * @param storageLevel Storage level to use for storing the received objects
- * @tparam K type of Kafka message key
- * @tparam V type of Kafka message value
- * @tparam U type of Kafka message key decoder
- * @tparam T type of Kafka message value decoder
- * @return DStream of (Kafka message key, Kafka message value)
- */
- def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag](
- ssc: StreamingContext,
- kafkaParams: Map[String, String],
- topics: Map[String, Int],
- storageLevel: StorageLevel
- ): ReceiverInputDStream[(K, V)] = {
- val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf)
- new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel)
- }
-
- /**
- * Create an input stream that pulls messages from Kafka Brokers.
- * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
- * @param jssc JavaStreamingContext object
- * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
- * @param groupId The group id for this consumer
- * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
- * in its own thread
- * @return DStream of (Kafka message key, Kafka message value)
- */
- def createStream(
- jssc: JavaStreamingContext,
- zkQuorum: String,
- groupId: String,
- topics: JMap[String, JInt]
- ): JavaPairReceiverInputDStream[String, String] = {
- createStream(jssc.ssc, zkQuorum, groupId, Map(topics.asScala.mapValues(_.intValue()).toSeq: _*))
- }
-
- /**
- * Create an input stream that pulls messages from Kafka Brokers.
- * @param jssc JavaStreamingContext object
- * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..).
- * @param groupId The group id for this consumer.
- * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
- * in its own thread.
- * @param storageLevel RDD storage level.
- * @return DStream of (Kafka message key, Kafka message value)
- */
- def createStream(
- jssc: JavaStreamingContext,
- zkQuorum: String,
- groupId: String,
- topics: JMap[String, JInt],
- storageLevel: StorageLevel
- ): JavaPairReceiverInputDStream[String, String] = {
- createStream(jssc.ssc, zkQuorum, groupId, Map(topics.asScala.mapValues(_.intValue()).toSeq: _*),
- storageLevel)
- }
-
- /**
- * Create an input stream that pulls messages from Kafka Brokers.
- * @param jssc JavaStreamingContext object
- * @param keyTypeClass Key type of DStream
- * @param valueTypeClass value type of Dstream
- * @param keyDecoderClass Type of kafka key decoder
- * @param valueDecoderClass Type of kafka value decoder
- * @param kafkaParams Map of kafka configuration parameters,
- * see http://kafka.apache.org/08/configuration.html
- * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
- * in its own thread
- * @param storageLevel RDD storage level.
- * @tparam K type of Kafka message key
- * @tparam V type of Kafka message value
- * @tparam U type of Kafka message key decoder
- * @tparam T type of Kafka message value decoder
- * @return DStream of (Kafka message key, Kafka message value)
- */
- def createStream[K, V, U <: Decoder[_], T <: Decoder[_]](
- jssc: JavaStreamingContext,
- keyTypeClass: Class[K],
- valueTypeClass: Class[V],
- keyDecoderClass: Class[U],
- valueDecoderClass: Class[T],
- kafkaParams: JMap[String, String],
- topics: JMap[String, JInt],
- storageLevel: StorageLevel
- ): JavaPairReceiverInputDStream[K, V] = {
- implicit val keyCmt: ClassTag[K] = ClassTag(keyTypeClass)
- implicit val valueCmt: ClassTag[V] = ClassTag(valueTypeClass)
-
- implicit val keyCmd: ClassTag[U] = ClassTag(keyDecoderClass)
- implicit val valueCmd: ClassTag[T] = ClassTag(valueDecoderClass)
-
- createStream[K, V, U, T](
- jssc.ssc,
- kafkaParams.asScala.toMap,
- Map(topics.asScala.mapValues(_.intValue()).toSeq: _*),
- storageLevel)
- }
-
- /** get leaders for the given offset ranges, or throw an exception */
- private def leadersForRanges(
- kc: KafkaCluster,
- offsetRanges: Array[OffsetRange]): Map[TopicAndPartition, (String, Int)] = {
- val topics = offsetRanges.map(o => TopicAndPartition(o.topic, o.partition)).toSet
- val leaders = kc.findLeaders(topics)
- KafkaCluster.checkErrors(leaders)
- }
-
- /** Make sure offsets are available in kafka, or throw an exception */
- private def checkOffsets(
- kc: KafkaCluster,
- offsetRanges: Array[OffsetRange]): Unit = {
- val topics = offsetRanges.map(_.topicAndPartition).toSet
- val result = for {
- low <- kc.getEarliestLeaderOffsets(topics).right
- high <- kc.getLatestLeaderOffsets(topics).right
- } yield {
- offsetRanges.filterNot { o =>
- low(o.topicAndPartition).offset <= o.fromOffset &&
- o.untilOffset <= high(o.topicAndPartition).offset
- }
- }
- val badRanges = KafkaCluster.checkErrors(result)
- if (!badRanges.isEmpty) {
- throw new SparkException("Offsets not available on leader: " + badRanges.mkString(","))
- }
- }
-
- private[kafka] def getFromOffsets(
- kc: KafkaCluster,
- kafkaParams: Map[String, String],
- topics: Set[String]
- ): Map[TopicAndPartition, Long] = {
- val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
- val result = for {
- topicPartitions <- kc.getPartitions(topics).right
- leaderOffsets <- (if (reset == Some("smallest")) {
- kc.getEarliestLeaderOffsets(topicPartitions)
- } else {
- kc.getLatestLeaderOffsets(topicPartitions)
- }).right
- } yield {
- leaderOffsets.map { case (tp, lo) =>
- (tp, lo.offset)
- }
- }
- KafkaCluster.checkErrors(result)
- }
-
- /**
- * Create a RDD from Kafka using offset ranges for each topic and partition.
- *
- * @param sc SparkContext object
- * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
- * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
- * to be set with Kafka broker(s) (NOT zookeeper servers) specified in
- * host1:port1,host2:port2 form.
- * @param offsetRanges Each OffsetRange in the batch corresponds to a
- * range of offsets for a given Kafka topic/partition
- * @tparam K type of Kafka message key
- * @tparam V type of Kafka message value
- * @tparam KD type of Kafka message key decoder
- * @tparam VD type of Kafka message value decoder
- * @return RDD of (Kafka message key, Kafka message value)
- */
- def createRDD[
- K: ClassTag,
- V: ClassTag,
- KD <: Decoder[K]: ClassTag,
- VD <: Decoder[V]: ClassTag](
- sc: SparkContext,
- kafkaParams: Map[String, String],
- offsetRanges: Array[OffsetRange]
- ): RDD[(K, V)] = sc.withScope {
- val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
- val kc = new KafkaCluster(kafkaParams)
- val leaders = leadersForRanges(kc, offsetRanges)
- checkOffsets(kc, offsetRanges)
- new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler)
- }
-
- /**
- * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you
- * specify the Kafka leader to connect to (to optimize fetching) and access the message as well
- * as the metadata.
- *
- * @param sc SparkContext object
- * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
- * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
- * to be set with Kafka broker(s) (NOT zookeeper servers) specified in
- * host1:port1,host2:port2 form.
- * @param offsetRanges Each OffsetRange in the batch corresponds to a
- * range of offsets for a given Kafka topic/partition
- * @param leaders Kafka brokers for each TopicAndPartition in offsetRanges. May be an empty map,
- * in which case leaders will be looked up on the driver.
- * @param messageHandler Function for translating each message and metadata into the desired type
- * @tparam K type of Kafka message key
- * @tparam V type of Kafka message value
- * @tparam KD type of Kafka message key decoder
- * @tparam VD type of Kafka message value decoder
- * @tparam R type returned by messageHandler
- * @return RDD of R
- */
- def createRDD[
- K: ClassTag,
- V: ClassTag,
- KD <: Decoder[K]: ClassTag,
- VD <: Decoder[V]: ClassTag,
- R: ClassTag](
- sc: SparkContext,
- kafkaParams: Map[String, String],
- offsetRanges: Array[OffsetRange],
- leaders: Map[TopicAndPartition, Broker],
- messageHandler: MessageAndMetadata[K, V] => R
- ): RDD[R] = sc.withScope {
- val kc = new KafkaCluster(kafkaParams)
- val leaderMap = if (leaders.isEmpty) {
- leadersForRanges(kc, offsetRanges)
- } else {
- // This could be avoided by refactoring KafkaRDD.leaders and KafkaCluster to use Broker
- leaders.map {
- case (tp: TopicAndPartition, Broker(host, port)) => (tp, (host, port))
- }
- }
- val cleanedHandler = sc.clean(messageHandler)
- checkOffsets(kc, offsetRanges)
- new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, cleanedHandler)
- }
-
- /**
- * Create a RDD from Kafka using offset ranges for each topic and partition.
- *
- * @param jsc JavaSparkContext object
- * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
- * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
- * to be set with Kafka broker(s) (NOT zookeeper servers) specified in
- * host1:port1,host2:port2 form.
- * @param offsetRanges Each OffsetRange in the batch corresponds to a
- * range of offsets for a given Kafka topic/partition
- * @param keyClass type of Kafka message key
- * @param valueClass type of Kafka message value
- * @param keyDecoderClass type of Kafka message key decoder
- * @param valueDecoderClass type of Kafka message value decoder
- * @tparam K type of Kafka message key
- * @tparam V type of Kafka message value
- * @tparam KD type of Kafka message key decoder
- * @tparam VD type of Kafka message value decoder
- * @return RDD of (Kafka message key, Kafka message value)
- */
- def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V]](
- jsc: JavaSparkContext,
- keyClass: Class[K],
- valueClass: Class[V],
- keyDecoderClass: Class[KD],
- valueDecoderClass: Class[VD],
- kafkaParams: JMap[String, String],
- offsetRanges: Array[OffsetRange]
- ): JavaPairRDD[K, V] = jsc.sc.withScope {
- implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
- implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
- implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
- implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
- new JavaPairRDD(createRDD[K, V, KD, VD](
- jsc.sc, Map(kafkaParams.asScala.toSeq: _*), offsetRanges))
- }
-
- /**
- * Create a RDD from Kafka using offset ranges for each topic and partition. This allows you
- * specify the Kafka leader to connect to (to optimize fetching) and access the message as well
- * as the metadata.
- *
- * @param jsc JavaSparkContext object
- * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
- * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
- * to be set with Kafka broker(s) (NOT zookeeper servers) specified in
- * host1:port1,host2:port2 form.
- * @param offsetRanges Each OffsetRange in the batch corresponds to a
- * range of offsets for a given Kafka topic/partition
- * @param leaders Kafka brokers for each TopicAndPartition in offsetRanges. May be an empty map,
- * in which case leaders will be looked up on the driver.
- * @param messageHandler Function for translating each message and metadata into the desired type
- * @tparam K type of Kafka message key
- * @tparam V type of Kafka message value
- * @tparam KD type of Kafka message key decoder
- * @tparam VD type of Kafka message value decoder
- * @tparam R type returned by messageHandler
- * @return RDD of R
- */
- def createRDD[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
- jsc: JavaSparkContext,
- keyClass: Class[K],
- valueClass: Class[V],
- keyDecoderClass: Class[KD],
- valueDecoderClass: Class[VD],
- recordClass: Class[R],
- kafkaParams: JMap[String, String],
- offsetRanges: Array[OffsetRange],
- leaders: JMap[TopicAndPartition, Broker],
- messageHandler: JFunction[MessageAndMetadata[K, V], R]
- ): JavaRDD[R] = jsc.sc.withScope {
- implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
- implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
- implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
- implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
- implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
- val leaderMap = Map(leaders.asScala.toSeq: _*)
- createRDD[K, V, KD, VD, R](
- jsc.sc, Map(kafkaParams.asScala.toSeq: _*), offsetRanges, leaderMap, messageHandler.call(_))
- }
-
- /**
- * Create an input stream that directly pulls messages from Kafka Brokers
- * without using any receiver. This stream can guarantee that each message
- * from Kafka is included in transformations exactly once (see points below).
- *
- * Points to note:
- * - No receivers: This stream does not use any receiver. It directly queries Kafka
- * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
- * by the stream itself. For interoperability with Kafka monitoring tools that depend on
- * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
- * You can access the offsets used in each batch from the generated RDDs (see
- * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
- * - Failure Recovery: To recover from driver failures, you have to enable checkpointing
- * in the [[StreamingContext]]. The information on consumed offset can be
- * recovered from the checkpoint. See the programming guide for details (constraints, etc.).
- * - End-to-end semantics: This stream ensures that every records is effectively received and
- * transformed exactly once, but gives no guarantees on whether the transformed data are
- * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
- * that the output operation is idempotent, or use transactions to output records atomically.
- * See the programming guide for more details.
- *
- * @param ssc StreamingContext object
- * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
- * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
- * to be set with Kafka broker(s) (NOT zookeeper servers) specified in
- * host1:port1,host2:port2 form.
- * @param fromOffsets Per-topic/partition Kafka offsets defining the (inclusive)
- * starting point of the stream
- * @param messageHandler Function for translating each message and metadata into the desired type
- * @tparam K type of Kafka message key
- * @tparam V type of Kafka message value
- * @tparam KD type of Kafka message key decoder
- * @tparam VD type of Kafka message value decoder
- * @tparam R type returned by messageHandler
- * @return DStream of R
- */
- def createDirectStream[
- K: ClassTag,
- V: ClassTag,
- KD <: Decoder[K]: ClassTag,
- VD <: Decoder[V]: ClassTag,
- R: ClassTag] (
- ssc: StreamingContext,
- kafkaParams: Map[String, String],
- fromOffsets: Map[TopicAndPartition, Long],
- messageHandler: MessageAndMetadata[K, V] => R
- ): InputDStream[R] = {
- val cleanedHandler = ssc.sc.clean(messageHandler)
- new DirectKafkaInputDStream[K, V, KD, VD, R](
- ssc, kafkaParams, fromOffsets, cleanedHandler)
- }
-
- /**
- * Create an input stream that directly pulls messages from Kafka Brokers
- * without using any receiver. This stream can guarantee that each message
- * from Kafka is included in transformations exactly once (see points below).
- *
- * Points to note:
- * - No receivers: This stream does not use any receiver. It directly queries Kafka
- * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
- * by the stream itself. For interoperability with Kafka monitoring tools that depend on
- * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
- * You can access the offsets used in each batch from the generated RDDs (see
- * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
- * - Failure Recovery: To recover from driver failures, you have to enable checkpointing
- * in the [[StreamingContext]]. The information on consumed offset can be
- * recovered from the checkpoint. See the programming guide for details (constraints, etc.).
- * - End-to-end semantics: This stream ensures that every records is effectively received and
- * transformed exactly once, but gives no guarantees on whether the transformed data are
- * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
- * that the output operation is idempotent, or use transactions to output records atomically.
- * See the programming guide for more details.
- *
- * @param ssc StreamingContext object
- * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
- * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
- * to be set with Kafka broker(s) (NOT zookeeper servers), specified in
- * host1:port1,host2:port2 form.
- * If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest"
- * to determine where the stream starts (defaults to "largest")
- * @param topics Names of the topics to consume
- * @tparam K type of Kafka message key
- * @tparam V type of Kafka message value
- * @tparam KD type of Kafka message key decoder
- * @tparam VD type of Kafka message value decoder
- * @return DStream of (Kafka message key, Kafka message value)
- */
- def createDirectStream[
- K: ClassTag,
- V: ClassTag,
- KD <: Decoder[K]: ClassTag,
- VD <: Decoder[V]: ClassTag] (
- ssc: StreamingContext,
- kafkaParams: Map[String, String],
- topics: Set[String]
- ): InputDStream[(K, V)] = {
- val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
- val kc = new KafkaCluster(kafkaParams)
- val fromOffsets = getFromOffsets(kc, kafkaParams, topics)
- new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](
- ssc, kafkaParams, fromOffsets, messageHandler)
- }
-
- /**
- * Create an input stream that directly pulls messages from Kafka Brokers
- * without using any receiver. This stream can guarantee that each message
- * from Kafka is included in transformations exactly once (see points below).
- *
- * Points to note:
- * - No receivers: This stream does not use any receiver. It directly queries Kafka
- * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
- * by the stream itself. For interoperability with Kafka monitoring tools that depend on
- * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
- * You can access the offsets used in each batch from the generated RDDs (see
- * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
- * - Failure Recovery: To recover from driver failures, you have to enable checkpointing
- * in the [[StreamingContext]]. The information on consumed offset can be
- * recovered from the checkpoint. See the programming guide for details (constraints, etc.).
- * - End-to-end semantics: This stream ensures that every records is effectively received and
- * transformed exactly once, but gives no guarantees on whether the transformed data are
- * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
- * that the output operation is idempotent, or use transactions to output records atomically.
- * See the programming guide for more details.
- *
- * @param jssc JavaStreamingContext object
- * @param keyClass Class of the keys in the Kafka records
- * @param valueClass Class of the values in the Kafka records
- * @param keyDecoderClass Class of the key decoder
- * @param valueDecoderClass Class of the value decoder
- * @param recordClass Class of the records in DStream
- * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
- * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
- * to be set with Kafka broker(s) (NOT zookeeper servers), specified in
- * host1:port1,host2:port2 form.
- * @param fromOffsets Per-topic/partition Kafka offsets defining the (inclusive)
- * starting point of the stream
- * @param messageHandler Function for translating each message and metadata into the desired type
- * @tparam K type of Kafka message key
- * @tparam V type of Kafka message value
- * @tparam KD type of Kafka message key decoder
- * @tparam VD type of Kafka message value decoder
- * @tparam R type returned by messageHandler
- * @return DStream of R
- */
- def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
- jssc: JavaStreamingContext,
- keyClass: Class[K],
- valueClass: Class[V],
- keyDecoderClass: Class[KD],
- valueDecoderClass: Class[VD],
- recordClass: Class[R],
- kafkaParams: JMap[String, String],
- fromOffsets: JMap[TopicAndPartition, JLong],
- messageHandler: JFunction[MessageAndMetadata[K, V], R]
- ): JavaInputDStream[R] = {
- implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
- implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
- implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
- implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
- implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
- val cleanedHandler = jssc.sparkContext.clean(messageHandler.call _)
- createDirectStream[K, V, KD, VD, R](
- jssc.ssc,
- Map(kafkaParams.asScala.toSeq: _*),
- Map(fromOffsets.asScala.mapValues(_.longValue()).toSeq: _*),
- cleanedHandler
- )
- }
-
- /**
- * Create an input stream that directly pulls messages from Kafka Brokers
- * without using any receiver. This stream can guarantee that each message
- * from Kafka is included in transformations exactly once (see points below).
- *
- * Points to note:
- * - No receivers: This stream does not use any receiver. It directly queries Kafka
- * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
- * by the stream itself. For interoperability with Kafka monitoring tools that depend on
- * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
- * You can access the offsets used in each batch from the generated RDDs (see
- * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
- * - Failure Recovery: To recover from driver failures, you have to enable checkpointing
- * in the [[StreamingContext]]. The information on consumed offset can be
- * recovered from the checkpoint. See the programming guide for details (constraints, etc.).
- * - End-to-end semantics: This stream ensures that every records is effectively received and
- * transformed exactly once, but gives no guarantees on whether the transformed data are
- * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
- * that the output operation is idempotent, or use transactions to output records atomically.
- * See the programming guide for more details.
- *
- * @param jssc JavaStreamingContext object
- * @param keyClass Class of the keys in the Kafka records
- * @param valueClass Class of the values in the Kafka records
- * @param keyDecoderClass Class of the key decoder
- * @param valueDecoderClass Class type of the value decoder
- * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
- * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
- * to be set with Kafka broker(s) (NOT zookeeper servers), specified in
- * host1:port1,host2:port2 form.
- * If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest"
- * to determine where the stream starts (defaults to "largest")
- * @param topics Names of the topics to consume
- * @tparam K type of Kafka message key
- * @tparam V type of Kafka message value
- * @tparam KD type of Kafka message key decoder
- * @tparam VD type of Kafka message value decoder
- * @return DStream of (Kafka message key, Kafka message value)
- */
- def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V]](
- jssc: JavaStreamingContext,
- keyClass: Class[K],
- valueClass: Class[V],
- keyDecoderClass: Class[KD],
- valueDecoderClass: Class[VD],
- kafkaParams: JMap[String, String],
- topics: JSet[String]
- ): JavaPairInputDStream[K, V] = {
- implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
- implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
- implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
- implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
- createDirectStream[K, V, KD, VD](
- jssc.ssc,
- Map(kafkaParams.asScala.toSeq: _*),
- Set(topics.asScala.toSeq: _*)
- )
- }
-}
-
-/**
- * This is a helper class that wraps the KafkaUtils.createStream() into more
- * Python-friendly class and function so that it can be easily
- * instantiated and called from Python's KafkaUtils.
- *
- * The zero-arg constructor helps instantiate this class from the Class object
- * classOf[KafkaUtilsPythonHelper].newInstance(), and the createStream()
- * takes care of known parameters instead of passing them from Python
- */
-private[kafka] class KafkaUtilsPythonHelper {
- import KafkaUtilsPythonHelper._
-
- def createStream(
- jssc: JavaStreamingContext,
- kafkaParams: JMap[String, String],
- topics: JMap[String, JInt],
- storageLevel: StorageLevel): JavaPairReceiverInputDStream[Array[Byte], Array[Byte]] = {
- KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](
- jssc,
- classOf[Array[Byte]],
- classOf[Array[Byte]],
- classOf[DefaultDecoder],
- classOf[DefaultDecoder],
- kafkaParams,
- topics,
- storageLevel)
- }
-
- def createRDDWithoutMessageHandler(
- jsc: JavaSparkContext,
- kafkaParams: JMap[String, String],
- offsetRanges: JList[OffsetRange],
- leaders: JMap[TopicAndPartition, Broker]): JavaRDD[(Array[Byte], Array[Byte])] = {
- val messageHandler =
- (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) => (mmd.key, mmd.message)
- new JavaRDD(createRDD(jsc, kafkaParams, offsetRanges, leaders, messageHandler))
- }
-
- def createRDDWithMessageHandler(
- jsc: JavaSparkContext,
- kafkaParams: JMap[String, String],
- offsetRanges: JList[OffsetRange],
- leaders: JMap[TopicAndPartition, Broker]): JavaRDD[Array[Byte]] = {
- val messageHandler = (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) =>
- new PythonMessageAndMetadata(
- mmd.topic, mmd.partition, mmd.offset, mmd.key(), mmd.message())
- val rdd = createRDD(jsc, kafkaParams, offsetRanges, leaders, messageHandler).
- mapPartitions(picklerIterator)
- new JavaRDD(rdd)
- }
-
- private def createRDD[V: ClassTag](
- jsc: JavaSparkContext,
- kafkaParams: JMap[String, String],
- offsetRanges: JList[OffsetRange],
- leaders: JMap[TopicAndPartition, Broker],
- messageHandler: MessageAndMetadata[Array[Byte], Array[Byte]] => V): RDD[V] = {
- KafkaUtils.createRDD[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder, V](
- jsc.sc,
- kafkaParams.asScala.toMap,
- offsetRanges.toArray(new Array[OffsetRange](offsetRanges.size())),
- leaders.asScala.toMap,
- messageHandler
- )
- }
-
- def createDirectStreamWithoutMessageHandler(
- jssc: JavaStreamingContext,
- kafkaParams: JMap[String, String],
- topics: JSet[String],
- fromOffsets: JMap[TopicAndPartition, JLong]): JavaDStream[(Array[Byte], Array[Byte])] = {
- val messageHandler =
- (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) => (mmd.key, mmd.message)
- new JavaDStream(createDirectStream(jssc, kafkaParams, topics, fromOffsets, messageHandler))
- }
-
- def createDirectStreamWithMessageHandler(
- jssc: JavaStreamingContext,
- kafkaParams: JMap[String, String],
- topics: JSet[String],
- fromOffsets: JMap[TopicAndPartition, JLong]): JavaDStream[Array[Byte]] = {
- val messageHandler = (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) =>
- new PythonMessageAndMetadata(mmd.topic, mmd.partition, mmd.offset, mmd.key(), mmd.message())
- val stream = createDirectStream(jssc, kafkaParams, topics, fromOffsets, messageHandler).
- mapPartitions(picklerIterator)
- new JavaDStream(stream)
- }
-
- private def createDirectStream[V: ClassTag](
- jssc: JavaStreamingContext,
- kafkaParams: JMap[String, String],
- topics: JSet[String],
- fromOffsets: JMap[TopicAndPartition, JLong],
- messageHandler: MessageAndMetadata[Array[Byte], Array[Byte]] => V): DStream[V] = {
-
- val currentFromOffsets = if (!fromOffsets.isEmpty) {
- val topicsFromOffsets = fromOffsets.keySet().asScala.map(_.topic)
- if (topicsFromOffsets != topics.asScala.toSet) {
- throw new IllegalStateException(
- s"The specified topics: ${topics.asScala.toSet.mkString(" ")} " +
- s"do not equal to the topic from offsets: ${topicsFromOffsets.mkString(" ")}")
- }
- Map(fromOffsets.asScala.mapValues { _.longValue() }.toSeq: _*)
- } else {
- val kc = new KafkaCluster(Map(kafkaParams.asScala.toSeq: _*))
- KafkaUtils.getFromOffsets(
- kc, Map(kafkaParams.asScala.toSeq: _*), Set(topics.asScala.toSeq: _*))
- }
-
- KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder, V](
- jssc.ssc,
- Map(kafkaParams.asScala.toSeq: _*),
- Map(currentFromOffsets.toSeq: _*),
- messageHandler)
- }
-
- def createOffsetRange(topic: String, partition: JInt, fromOffset: JLong, untilOffset: JLong
- ): OffsetRange = OffsetRange.create(topic, partition, fromOffset, untilOffset)
-
- def createTopicAndPartition(topic: String, partition: JInt): TopicAndPartition =
- TopicAndPartition(topic, partition)
-
- def createBroker(host: String, port: JInt): Broker = Broker(host, port)
-
- def offsetRangesOfKafkaRDD(rdd: RDD[_]): JList[OffsetRange] = {
- val parentRDDs = rdd.getNarrowAncestors
- val kafkaRDDs = parentRDDs.filter(rdd => rdd.isInstanceOf[KafkaRDD[_, _, _, _, _]])
-
- require(
- kafkaRDDs.length == 1,
- "Cannot get offset ranges, as there may be multiple Kafka RDDs or no Kafka RDD associated" +
- "with this RDD, please call this method only on a Kafka RDD.")
-
- val kafkaRDD = kafkaRDDs.head.asInstanceOf[KafkaRDD[_, _, _, _, _]]
- kafkaRDD.offsetRanges.toSeq.asJava
- }
-}
-
-private object KafkaUtilsPythonHelper {
- private var initialized = false
-
- def initialize(): Unit = {
- SerDeUtil.initialize()
- synchronized {
- if (!initialized) {
- new PythonMessageAndMetadataPickler().register()
- initialized = true
- }
- }
- }
-
- initialize()
-
- def picklerIterator(iter: Iterator[Any]): Iterator[Array[Byte]] = {
- new SerDeUtil.AutoBatchedPickler(iter)
- }
-
- case class PythonMessageAndMetadata(
- topic: String,
- partition: JInt,
- offset: JLong,
- key: Array[Byte],
- message: Array[Byte])
-
- class PythonMessageAndMetadataPickler extends IObjectPickler {
- private val module = "pyspark.streaming.kafka"
-
- def register(): Unit = {
- Pickler.registerCustomPickler(classOf[PythonMessageAndMetadata], this)
- Pickler.registerCustomPickler(this.getClass, this)
- }
-
- def pickle(obj: Object, out: OutputStream, pickler: Pickler) {
- if (obj == this) {
- out.write(Opcodes.GLOBAL)
- out.write(s"$module\nKafkaMessageAndMetadata\n".getBytes(StandardCharsets.UTF_8))
- } else {
- pickler.save(this)
- val msgAndMetaData = obj.asInstanceOf[PythonMessageAndMetadata]
- out.write(Opcodes.MARK)
- pickler.save(msgAndMetaData.topic)
- pickler.save(msgAndMetaData.partition)
- pickler.save(msgAndMetaData.offset)
- pickler.save(msgAndMetaData.key)
- pickler.save(msgAndMetaData.message)
- out.write(Opcodes.TUPLE)
- out.write(Opcodes.REDUCE)
- }
- }
- }
-}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
deleted file mode 100644
index d9b856e469..0000000000
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import kafka.common.TopicAndPartition
-
-/**
- * Represents any object that has a collection of [[OffsetRange]]s. This can be used to access the
- * offset ranges in RDDs generated by the direct Kafka DStream (see
- * [[KafkaUtils.createDirectStream()]]).
- * {{{
- * KafkaUtils.createDirectStream(...).foreachRDD { rdd =>
- * val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
- * ...
- * }
- * }}}
- */
-trait HasOffsetRanges {
- def offsetRanges: Array[OffsetRange]
-}
-
-/**
- * Represents a range of offsets from a single Kafka TopicAndPartition. Instances of this class
- * can be created with `OffsetRange.create()`.
- * @param topic Kafka topic name
- * @param partition Kafka partition id
- * @param fromOffset Inclusive starting offset
- * @param untilOffset Exclusive ending offset
- */
-final class OffsetRange private(
- val topic: String,
- val partition: Int,
- val fromOffset: Long,
- val untilOffset: Long) extends Serializable {
- import OffsetRange.OffsetRangeTuple
-
- /** Kafka TopicAndPartition object, for convenience */
- def topicAndPartition(): TopicAndPartition = TopicAndPartition(topic, partition)
-
- /** Number of messages this OffsetRange refers to */
- def count(): Long = untilOffset - fromOffset
-
- override def equals(obj: Any): Boolean = obj match {
- case that: OffsetRange =>
- this.topic == that.topic &&
- this.partition == that.partition &&
- this.fromOffset == that.fromOffset &&
- this.untilOffset == that.untilOffset
- case _ => false
- }
-
- override def hashCode(): Int = {
- toTuple.hashCode()
- }
-
- override def toString(): String = {
- s"OffsetRange(topic: '$topic', partition: $partition, range: [$fromOffset -> $untilOffset])"
- }
-
- /** this is to avoid ClassNotFoundException during checkpoint restore */
- private[streaming]
- def toTuple: OffsetRangeTuple = (topic, partition, fromOffset, untilOffset)
-}
-
-/**
- * Companion object the provides methods to create instances of [[OffsetRange]].
- */
-object OffsetRange {
- def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange =
- new OffsetRange(topic, partition, fromOffset, untilOffset)
-
- def create(
- topicAndPartition: TopicAndPartition,
- fromOffset: Long,
- untilOffset: Long): OffsetRange =
- new OffsetRange(topicAndPartition.topic, topicAndPartition.partition, fromOffset, untilOffset)
-
- def apply(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange =
- new OffsetRange(topic, partition, fromOffset, untilOffset)
-
- def apply(
- topicAndPartition: TopicAndPartition,
- fromOffset: Long,
- untilOffset: Long): OffsetRange =
- new OffsetRange(topicAndPartition.topic, topicAndPartition.partition, fromOffset, untilOffset)
-
- /** this is to avoid ClassNotFoundException during checkpoint restore */
- private[kafka]
- type OffsetRangeTuple = (String, Int, Long, Long)
-
- private[kafka]
- def apply(t: OffsetRangeTuple) =
- new OffsetRange(t._1, t._2, t._3, t._4)
-}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
deleted file mode 100644
index 39abe3c3e2..0000000000
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
+++ /dev/null
@@ -1,302 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import java.util.Properties
-import java.util.concurrent.{ConcurrentHashMap, ThreadPoolExecutor}
-
-import scala.collection.{mutable, Map}
-import scala.reflect.{classTag, ClassTag}
-
-import kafka.common.TopicAndPartition
-import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream}
-import kafka.message.MessageAndMetadata
-import kafka.serializer.Decoder
-import kafka.utils.{VerifiableProperties, ZKGroupTopicDirs, ZKStringSerializer, ZkUtils}
-import org.I0Itec.zkclient.ZkClient
-
-import org.apache.spark.SparkEnv
-import org.apache.spark.internal.Logging
-import org.apache.spark.storage.{StorageLevel, StreamBlockId}
-import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver}
-import org.apache.spark.util.ThreadUtils
-
-/**
- * ReliableKafkaReceiver offers the ability to reliably store data into BlockManager without loss.
- * It is turned off by default and will be enabled when
- * spark.streaming.receiver.writeAheadLog.enable is true. The difference compared to KafkaReceiver
- * is that this receiver manages topic-partition/offset itself and updates the offset information
- * after data is reliably stored as write-ahead log. Offsets will only be updated when data is
- * reliably stored, so the potential data loss problem of KafkaReceiver can be eliminated.
- *
- * Note: ReliableKafkaReceiver will set auto.commit.enable to false to turn off automatic offset
- * commit mechanism in Kafka consumer. So setting this configuration manually within kafkaParams
- * will not take effect.
- */
-private[streaming]
-class ReliableKafkaReceiver[
- K: ClassTag,
- V: ClassTag,
- U <: Decoder[_]: ClassTag,
- T <: Decoder[_]: ClassTag](
- kafkaParams: Map[String, String],
- topics: Map[String, Int],
- storageLevel: StorageLevel)
- extends Receiver[(K, V)](storageLevel) with Logging {
-
- private val groupId = kafkaParams("group.id")
- private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
- private def conf = SparkEnv.get.conf
-
- /** High level consumer to connect to Kafka. */
- private var consumerConnector: ConsumerConnector = null
-
- /** zkClient to connect to Zookeeper to commit the offsets. */
- private var zkClient: ZkClient = null
-
- /**
- * A HashMap to manage the offset for each topic/partition, this HashMap is called in
- * synchronized block, so mutable HashMap will not meet concurrency issue.
- */
- private var topicPartitionOffsetMap: mutable.HashMap[TopicAndPartition, Long] = null
-
- /** A concurrent HashMap to store the stream block id and related offset snapshot. */
- private var blockOffsetMap: ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]] = null
-
- /**
- * Manage the BlockGenerator in receiver itself for better managing block store and offset
- * commit.
- */
- private var blockGenerator: BlockGenerator = null
-
- /** Thread pool running the handlers for receiving message from multiple topics and partitions. */
- private var messageHandlerThreadPool: ThreadPoolExecutor = null
-
- override def onStart(): Unit = {
- logInfo(s"Starting Kafka Consumer Stream with group: $groupId")
-
- // Initialize the topic-partition / offset hash map.
- topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long]
-
- // Initialize the stream block id / offset snapshot hash map.
- blockOffsetMap = new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]]()
-
- // Initialize the block generator for storing Kafka message.
- blockGenerator = supervisor.createBlockGenerator(new GeneratedBlockHandler)
-
- if (kafkaParams.contains(AUTO_OFFSET_COMMIT) && kafkaParams(AUTO_OFFSET_COMMIT) == "true") {
- logWarning(s"$AUTO_OFFSET_COMMIT should be set to false in ReliableKafkaReceiver, " +
- "otherwise we will manually set it to false to turn off auto offset commit in Kafka")
- }
-
- val props = new Properties()
- kafkaParams.foreach(param => props.put(param._1, param._2))
- // Manually set "auto.commit.enable" to "false" no matter user explicitly set it to true,
- // we have to make sure this property is set to false to turn off auto commit mechanism in
- // Kafka.
- props.setProperty(AUTO_OFFSET_COMMIT, "false")
-
- val consumerConfig = new ConsumerConfig(props)
-
- assert(!consumerConfig.autoCommitEnable)
-
- logInfo(s"Connecting to Zookeeper: ${consumerConfig.zkConnect}")
- consumerConnector = Consumer.create(consumerConfig)
- logInfo(s"Connected to Zookeeper: ${consumerConfig.zkConnect}")
-
- zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs,
- consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)
-
- messageHandlerThreadPool = ThreadUtils.newDaemonFixedThreadPool(
- topics.values.sum, "KafkaMessageHandler")
-
- blockGenerator.start()
-
- val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
- .newInstance(consumerConfig.props)
- .asInstanceOf[Decoder[K]]
-
- val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
- .newInstance(consumerConfig.props)
- .asInstanceOf[Decoder[V]]
-
- val topicMessageStreams = consumerConnector.createMessageStreams(
- topics, keyDecoder, valueDecoder)
-
- topicMessageStreams.values.foreach { streams =>
- streams.foreach { stream =>
- messageHandlerThreadPool.submit(new MessageHandler(stream))
- }
- }
- }
-
- override def onStop(): Unit = {
- if (messageHandlerThreadPool != null) {
- messageHandlerThreadPool.shutdown()
- messageHandlerThreadPool = null
- }
-
- if (consumerConnector != null) {
- consumerConnector.shutdown()
- consumerConnector = null
- }
-
- if (zkClient != null) {
- zkClient.close()
- zkClient = null
- }
-
- if (blockGenerator != null) {
- blockGenerator.stop()
- blockGenerator = null
- }
-
- if (topicPartitionOffsetMap != null) {
- topicPartitionOffsetMap.clear()
- topicPartitionOffsetMap = null
- }
-
- if (blockOffsetMap != null) {
- blockOffsetMap.clear()
- blockOffsetMap = null
- }
- }
-
- /** Store a Kafka message and the associated metadata as a tuple. */
- private def storeMessageAndMetadata(
- msgAndMetadata: MessageAndMetadata[K, V]): Unit = {
- val topicAndPartition = TopicAndPartition(msgAndMetadata.topic, msgAndMetadata.partition)
- val data = (msgAndMetadata.key, msgAndMetadata.message)
- val metadata = (topicAndPartition, msgAndMetadata.offset)
- blockGenerator.addDataWithCallback(data, metadata)
- }
-
- /** Update stored offset */
- private def updateOffset(topicAndPartition: TopicAndPartition, offset: Long): Unit = {
- topicPartitionOffsetMap.put(topicAndPartition, offset)
- }
-
- /**
- * Remember the current offsets for each topic and partition. This is called when a block is
- * generated.
- */
- private def rememberBlockOffsets(blockId: StreamBlockId): Unit = {
- // Get a snapshot of current offset map and store with related block id.
- val offsetSnapshot = topicPartitionOffsetMap.toMap
- blockOffsetMap.put(blockId, offsetSnapshot)
- topicPartitionOffsetMap.clear()
- }
-
- /**
- * Store the ready-to-be-stored block and commit the related offsets to zookeeper. This method
- * will try a fixed number of times to push the block. If the push fails, the receiver is stopped.
- */
- private def storeBlockAndCommitOffset(
- blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
- var count = 0
- var pushed = false
- var exception: Exception = null
- while (!pushed && count <= 3) {
- try {
- store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]])
- pushed = true
- } catch {
- case ex: Exception =>
- count += 1
- exception = ex
- }
- }
- if (pushed) {
- Option(blockOffsetMap.get(blockId)).foreach(commitOffset)
- blockOffsetMap.remove(blockId)
- } else {
- stop("Error while storing block into Spark", exception)
- }
- }
-
- /**
- * Commit the offset of Kafka's topic/partition, the commit mechanism follow Kafka 0.8.x's
- * metadata schema in Zookeeper.
- */
- private def commitOffset(offsetMap: Map[TopicAndPartition, Long]): Unit = {
- if (zkClient == null) {
- val thrown = new IllegalStateException("Zookeeper client is unexpectedly null")
- stop("Zookeeper client is not initialized before commit offsets to ZK", thrown)
- return
- }
-
- for ((topicAndPart, offset) <- offsetMap) {
- try {
- val topicDirs = new ZKGroupTopicDirs(groupId, topicAndPart.topic)
- val zkPath = s"${topicDirs.consumerOffsetDir}/${topicAndPart.partition}"
-
- ZkUtils.updatePersistentPath(zkClient, zkPath, offset.toString)
- } catch {
- case e: Exception =>
- logWarning(s"Exception during commit offset $offset for topic" +
- s"${topicAndPart.topic}, partition ${topicAndPart.partition}", e)
- }
-
- logInfo(s"Committed offset $offset for topic ${topicAndPart.topic}, " +
- s"partition ${topicAndPart.partition}")
- }
- }
-
- /** Class to handle received Kafka message. */
- private final class MessageHandler(stream: KafkaStream[K, V]) extends Runnable {
- override def run(): Unit = {
- while (!isStopped) {
- try {
- val streamIterator = stream.iterator()
- while (streamIterator.hasNext) {
- storeMessageAndMetadata(streamIterator.next)
- }
- } catch {
- case e: Exception =>
- reportError("Error handling message", e)
- }
- }
- }
- }
-
- /** Class to handle blocks generated by the block generator. */
- private final class GeneratedBlockHandler extends BlockGeneratorListener {
-
- def onAddData(data: Any, metadata: Any): Unit = {
- // Update the offset of the data that was added to the generator
- if (metadata != null) {
- val (topicAndPartition, offset) = metadata.asInstanceOf[(TopicAndPartition, Long)]
- updateOffset(topicAndPartition, offset)
- }
- }
-
- def onGenerateBlock(blockId: StreamBlockId): Unit = {
- // Remember the offsets of topics/partitions when a block has been generated
- rememberBlockOffsets(blockId)
- }
-
- def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
- // Store block and commit the blocks offset
- storeBlockAndCommitOffset(blockId, arrayBuffer)
- }
-
- def onError(message: String, throwable: Throwable): Unit = {
- reportError(message, throwable)
- }
- }
-}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package-info.java b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package-info.java
deleted file mode 100644
index 2e5ab0fb3b..0000000000
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Kafka receiver for spark streaming.
- */
-package org.apache.spark.streaming.kafka;
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package.scala
deleted file mode 100644
index 47c5187f87..0000000000
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/package.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-/**
- * Kafka receiver for spark streaming,
- */
-package object kafka
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
deleted file mode 100644
index fa6b0dbc8c..0000000000
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka;
-
-import java.io.Serializable;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicReference;
-
-import scala.Tuple2;
-
-import kafka.common.TopicAndPartition;
-import kafka.message.MessageAndMetadata;
-import kafka.serializer.StringDecoder;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.VoidFunction;
-import org.apache.spark.streaming.Durations;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-
-public class JavaDirectKafkaStreamSuite implements Serializable {
- private transient JavaStreamingContext ssc = null;
- private transient KafkaTestUtils kafkaTestUtils = null;
-
- @Before
- public void setUp() {
- kafkaTestUtils = new KafkaTestUtils();
- kafkaTestUtils.setup();
- SparkConf sparkConf = new SparkConf()
- .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
- ssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(200));
- }
-
- @After
- public void tearDown() {
- if (ssc != null) {
- ssc.stop();
- ssc = null;
- }
-
- if (kafkaTestUtils != null) {
- kafkaTestUtils.teardown();
- kafkaTestUtils = null;
- }
- }
-
- @Test
- public void testKafkaStream() throws InterruptedException {
- final String topic1 = "topic1";
- final String topic2 = "topic2";
- // hold a reference to the current offset ranges, so it can be used downstream
- final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>();
-
- String[] topic1data = createTopicAndSendData(topic1);
- String[] topic2data = createTopicAndSendData(topic2);
-
- Set<String> sent = new HashSet<>();
- sent.addAll(Arrays.asList(topic1data));
- sent.addAll(Arrays.asList(topic2data));
-
- Map<String, String> kafkaParams = new HashMap<>();
- kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress());
- kafkaParams.put("auto.offset.reset", "smallest");
-
- JavaDStream<String> stream1 = KafkaUtils.createDirectStream(
- ssc,
- String.class,
- String.class,
- StringDecoder.class,
- StringDecoder.class,
- kafkaParams,
- topicToSet(topic1)
- ).transformToPair(
- // Make sure you can get offset ranges from the rdd
- new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
- @Override
- public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) {
- OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
- offsetRanges.set(offsets);
- Assert.assertEquals(topic1, offsets[0].topic());
- return rdd;
- }
- }
- ).map(
- new Function<Tuple2<String, String>, String>() {
- @Override
- public String call(Tuple2<String, String> kv) {
- return kv._2();
- }
- }
- );
-
- JavaDStream<String> stream2 = KafkaUtils.createDirectStream(
- ssc,
- String.class,
- String.class,
- StringDecoder.class,
- StringDecoder.class,
- String.class,
- kafkaParams,
- topicOffsetToMap(topic2, 0L),
- new Function<MessageAndMetadata<String, String>, String>() {
- @Override
- public String call(MessageAndMetadata<String, String> msgAndMd) {
- return msgAndMd.message();
- }
- }
- );
- JavaDStream<String> unifiedStream = stream1.union(stream2);
-
- final Set<String> result = Collections.synchronizedSet(new HashSet<String>());
- unifiedStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
- @Override
- public void call(JavaRDD<String> rdd) {
- result.addAll(rdd.collect());
- for (OffsetRange o : offsetRanges.get()) {
- System.out.println(
- o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()
- );
- }
- }
- }
- );
- ssc.start();
- long startTime = System.currentTimeMillis();
- boolean matches = false;
- while (!matches && System.currentTimeMillis() - startTime < 20000) {
- matches = sent.size() == result.size();
- Thread.sleep(50);
- }
- Assert.assertEquals(sent, result);
- ssc.stop();
- }
-
- private static Set<String> topicToSet(String topic) {
- Set<String> topicSet = new HashSet<>();
- topicSet.add(topic);
- return topicSet;
- }
-
- private static Map<TopicAndPartition, Long> topicOffsetToMap(String topic, Long offsetToStart) {
- Map<TopicAndPartition, Long> topicMap = new HashMap<>();
- topicMap.put(new TopicAndPartition(topic, 0), offsetToStart);
- return topicMap;
- }
-
- private String[] createTopicAndSendData(String topic) {
- String[] data = { topic + "-1", topic + "-2", topic + "-3"};
- kafkaTestUtils.createTopic(topic, 1);
- kafkaTestUtils.sendMessages(topic, data);
- return data;
- }
-}
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
deleted file mode 100644
index c41b6297b0..0000000000
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-
-import scala.Tuple2;
-
-import kafka.common.TopicAndPartition;
-import kafka.message.MessageAndMetadata;
-import kafka.serializer.StringDecoder;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-
-public class JavaKafkaRDDSuite implements Serializable {
- private transient JavaSparkContext sc = null;
- private transient KafkaTestUtils kafkaTestUtils = null;
-
- @Before
- public void setUp() {
- kafkaTestUtils = new KafkaTestUtils();
- kafkaTestUtils.setup();
- SparkConf sparkConf = new SparkConf()
- .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
- sc = new JavaSparkContext(sparkConf);
- }
-
- @After
- public void tearDown() {
- if (sc != null) {
- sc.stop();
- sc = null;
- }
-
- if (kafkaTestUtils != null) {
- kafkaTestUtils.teardown();
- kafkaTestUtils = null;
- }
- }
-
- @Test
- public void testKafkaRDD() throws InterruptedException {
- String topic1 = "topic1";
- String topic2 = "topic2";
-
- createTopicAndSendData(topic1);
- createTopicAndSendData(topic2);
-
- Map<String, String> kafkaParams = new HashMap<>();
- kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress());
-
- OffsetRange[] offsetRanges = {
- OffsetRange.create(topic1, 0, 0, 1),
- OffsetRange.create(topic2, 0, 0, 1)
- };
-
- Map<TopicAndPartition, Broker> emptyLeaders = new HashMap<>();
- Map<TopicAndPartition, Broker> leaders = new HashMap<>();
- String[] hostAndPort = kafkaTestUtils.brokerAddress().split(":");
- Broker broker = Broker.create(hostAndPort[0], Integer.parseInt(hostAndPort[1]));
- leaders.put(new TopicAndPartition(topic1, 0), broker);
- leaders.put(new TopicAndPartition(topic2, 0), broker);
-
- JavaRDD<String> rdd1 = KafkaUtils.createRDD(
- sc,
- String.class,
- String.class,
- StringDecoder.class,
- StringDecoder.class,
- kafkaParams,
- offsetRanges
- ).map(
- new Function<Tuple2<String, String>, String>() {
- @Override
- public String call(Tuple2<String, String> kv) {
- return kv._2();
- }
- }
- );
-
- JavaRDD<String> rdd2 = KafkaUtils.createRDD(
- sc,
- String.class,
- String.class,
- StringDecoder.class,
- StringDecoder.class,
- String.class,
- kafkaParams,
- offsetRanges,
- emptyLeaders,
- new Function<MessageAndMetadata<String, String>, String>() {
- @Override
- public String call(MessageAndMetadata<String, String> msgAndMd) {
- return msgAndMd.message();
- }
- }
- );
-
- JavaRDD<String> rdd3 = KafkaUtils.createRDD(
- sc,
- String.class,
- String.class,
- StringDecoder.class,
- StringDecoder.class,
- String.class,
- kafkaParams,
- offsetRanges,
- leaders,
- new Function<MessageAndMetadata<String, String>, String>() {
- @Override
- public String call(MessageAndMetadata<String, String> msgAndMd) {
- return msgAndMd.message();
- }
- }
- );
-
- // just making sure the java user apis work; the scala tests handle logic corner cases
- long count1 = rdd1.count();
- long count2 = rdd2.count();
- long count3 = rdd3.count();
- Assert.assertTrue(count1 > 0);
- Assert.assertEquals(count1, count2);
- Assert.assertEquals(count1, count3);
- }
-
- private String[] createTopicAndSendData(String topic) {
- String[] data = { topic + "-1", topic + "-2", topic + "-3"};
- kafkaTestUtils.createTopic(topic, 1);
- kafkaTestUtils.sendMessages(topic, data);
- return data;
- }
-}
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
deleted file mode 100644
index 868df64e8c..0000000000
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka;
-
-import java.io.Serializable;
-import java.util.*;
-
-import scala.Tuple2;
-
-import kafka.serializer.StringDecoder;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.VoidFunction;
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-
-public class JavaKafkaStreamSuite implements Serializable {
- private transient JavaStreamingContext ssc = null;
- private transient Random random = new Random();
- private transient KafkaTestUtils kafkaTestUtils = null;
-
- @Before
- public void setUp() {
- kafkaTestUtils = new KafkaTestUtils();
- kafkaTestUtils.setup();
- SparkConf sparkConf = new SparkConf()
- .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
- ssc = new JavaStreamingContext(sparkConf, new Duration(500));
- }
-
- @After
- public void tearDown() {
- if (ssc != null) {
- ssc.stop();
- ssc = null;
- }
-
- if (kafkaTestUtils != null) {
- kafkaTestUtils.teardown();
- kafkaTestUtils = null;
- }
- }
-
- @Test
- public void testKafkaStream() throws InterruptedException {
- String topic = "topic1";
- Map<String, Integer> topics = new HashMap<>();
- topics.put(topic, 1);
-
- Map<String, Integer> sent = new HashMap<>();
- sent.put("a", 5);
- sent.put("b", 3);
- sent.put("c", 10);
-
- kafkaTestUtils.createTopic(topic, 1);
- kafkaTestUtils.sendMessages(topic, sent);
-
- Map<String, String> kafkaParams = new HashMap<>();
- kafkaParams.put("zookeeper.connect", kafkaTestUtils.zkAddress());
- kafkaParams.put("group.id", "test-consumer-" + random.nextInt(10000));
- kafkaParams.put("auto.offset.reset", "smallest");
-
- JavaPairDStream<String, String> stream = KafkaUtils.createStream(ssc,
- String.class,
- String.class,
- StringDecoder.class,
- StringDecoder.class,
- kafkaParams,
- topics,
- StorageLevel.MEMORY_ONLY_SER());
-
- final Map<String, Long> result = Collections.synchronizedMap(new HashMap<String, Long>());
-
- JavaDStream<String> words = stream.map(
- new Function<Tuple2<String, String>, String>() {
- @Override
- public String call(Tuple2<String, String> tuple2) {
- return tuple2._2();
- }
- }
- );
-
- words.countByValue().foreachRDD(new VoidFunction<JavaPairRDD<String, Long>>() {
- @Override
- public void call(JavaPairRDD<String, Long> rdd) {
- List<Tuple2<String, Long>> ret = rdd.collect();
- for (Tuple2<String, Long> r : ret) {
- if (result.containsKey(r._1())) {
- result.put(r._1(), result.get(r._1()) + r._2());
- } else {
- result.put(r._1(), r._2());
- }
- }
- }
- }
- );
-
- ssc.start();
-
- long startTime = System.currentTimeMillis();
- boolean sizeMatches = false;
- while (!sizeMatches && System.currentTimeMillis() - startTime < 20000) {
- sizeMatches = sent.size() == result.size();
- Thread.sleep(200);
- }
- Assert.assertEquals(sent.size(), result.size());
- for (Map.Entry<String, Integer> e : sent.entrySet()) {
- Assert.assertEquals(e.getValue().intValue(), result.get(e.getKey()).intValue());
- }
- }
-}
diff --git a/external/kafka/src/test/resources/log4j.properties b/external/kafka/src/test/resources/log4j.properties
deleted file mode 100644
index fd51f8faf5..0000000000
--- a/external/kafka/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,28 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Set everything to be logged to the file target/unit-tests.log
-log4j.rootCategory=INFO, file
-log4j.appender.file=org.apache.log4j.FileAppender
-log4j.appender.file.append=true
-log4j.appender.file.file=target/unit-tests.log
-log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
-
-# Ignore messages below warning level from Jetty, because it's a bit verbose
-log4j.logger.org.spark_project.jetty=WARN
-
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
deleted file mode 100644
index cb782d27fe..0000000000
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
+++ /dev/null
@@ -1,531 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import java.io.File
-import java.util.Arrays
-import java.util.concurrent.atomic.AtomicLong
-import java.util.concurrent.ConcurrentLinkedQueue
-
-import scala.collection.JavaConverters._
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-import kafka.common.TopicAndPartition
-import kafka.message.MessageAndMetadata
-import kafka.serializer.StringDecoder
-import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
-import org.scalatest.concurrent.Eventually
-
-import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
-import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.RDD
-import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time}
-import org.apache.spark.streaming.dstream.DStream
-import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
-import org.apache.spark.streaming.scheduler._
-import org.apache.spark.streaming.scheduler.rate.RateEstimator
-import org.apache.spark.util.Utils
-
-class DirectKafkaStreamSuite
- extends SparkFunSuite
- with BeforeAndAfter
- with BeforeAndAfterAll
- with Eventually
- with Logging {
- val sparkConf = new SparkConf()
- .setMaster("local[4]")
- .setAppName(this.getClass.getSimpleName)
-
- private var sc: SparkContext = _
- private var ssc: StreamingContext = _
- private var testDir: File = _
-
- private var kafkaTestUtils: KafkaTestUtils = _
-
- override def beforeAll {
- kafkaTestUtils = new KafkaTestUtils
- kafkaTestUtils.setup()
- }
-
- override def afterAll {
- if (kafkaTestUtils != null) {
- kafkaTestUtils.teardown()
- kafkaTestUtils = null
- }
- }
-
- after {
- if (ssc != null) {
- ssc.stop()
- sc = null
- }
- if (sc != null) {
- sc.stop()
- }
- if (testDir != null) {
- Utils.deleteRecursively(testDir)
- }
- }
-
-
- test("basic stream receiving with multiple topics and smallest starting offset") {
- val topics = Set("basic1", "basic2", "basic3")
- val data = Map("a" -> 7, "b" -> 9)
- topics.foreach { t =>
- kafkaTestUtils.createTopic(t)
- kafkaTestUtils.sendMessages(t, data)
- }
- val totalSent = data.values.sum * topics.size
- val kafkaParams = Map(
- "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
- "auto.offset.reset" -> "smallest"
- )
-
- ssc = new StreamingContext(sparkConf, Milliseconds(200))
- val stream = withClue("Error creating direct stream") {
- KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
- ssc, kafkaParams, topics)
- }
-
- val allReceived = new ConcurrentLinkedQueue[(String, String)]()
-
- // hold a reference to the current offset ranges, so it can be used downstream
- var offsetRanges = Array[OffsetRange]()
-
- stream.transform { rdd =>
- // Get the offset ranges in the RDD
- offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
- rdd
- }.foreachRDD { rdd =>
- for (o <- offsetRanges) {
- logInfo(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
- }
- val collected = rdd.mapPartitionsWithIndex { (i, iter) =>
- // For each partition, get size of the range in the partition,
- // and the number of items in the partition
- val off = offsetRanges(i)
- val all = iter.toSeq
- val partSize = all.size
- val rangeSize = off.untilOffset - off.fromOffset
- Iterator((partSize, rangeSize))
- }.collect
-
- // Verify whether number of elements in each partition
- // matches with the corresponding offset range
- collected.foreach { case (partSize, rangeSize) =>
- assert(partSize === rangeSize, "offset ranges are wrong")
- }
- }
- stream.foreachRDD { rdd => allReceived.addAll(Arrays.asList(rdd.collect(): _*)) }
- ssc.start()
- eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
- assert(allReceived.size === totalSent,
- "didn't get expected number of messages, messages:\n" +
- allReceived.asScala.mkString("\n"))
- }
- ssc.stop()
- }
-
- test("receiving from largest starting offset") {
- val topic = "largest"
- val topicPartition = TopicAndPartition(topic, 0)
- val data = Map("a" -> 10)
- kafkaTestUtils.createTopic(topic)
- val kafkaParams = Map(
- "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
- "auto.offset.reset" -> "largest"
- )
- val kc = new KafkaCluster(kafkaParams)
- def getLatestOffset(): Long = {
- kc.getLatestLeaderOffsets(Set(topicPartition)).right.get(topicPartition).offset
- }
-
- // Send some initial messages before starting context
- kafkaTestUtils.sendMessages(topic, data)
- eventually(timeout(10 seconds), interval(20 milliseconds)) {
- assert(getLatestOffset() > 3)
- }
- val offsetBeforeStart = getLatestOffset()
-
- // Setup context and kafka stream with largest offset
- ssc = new StreamingContext(sparkConf, Milliseconds(200))
- val stream = withClue("Error creating direct stream") {
- KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
- ssc, kafkaParams, Set(topic))
- }
- assert(
- stream.asInstanceOf[DirectKafkaInputDStream[_, _, _, _, _]]
- .fromOffsets(topicPartition) >= offsetBeforeStart,
- "Start offset not from latest"
- )
-
- val collectedData = new ConcurrentLinkedQueue[String]()
- stream.map { _._2 }.foreachRDD { rdd => collectedData.addAll(Arrays.asList(rdd.collect(): _*)) }
- ssc.start()
- val newData = Map("b" -> 10)
- kafkaTestUtils.sendMessages(topic, newData)
- eventually(timeout(10 seconds), interval(50 milliseconds)) {
- collectedData.contains("b")
- }
- assert(!collectedData.contains("a"))
- }
-
-
- test("creating stream by offset") {
- val topic = "offset"
- val topicPartition = TopicAndPartition(topic, 0)
- val data = Map("a" -> 10)
- kafkaTestUtils.createTopic(topic)
- val kafkaParams = Map(
- "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
- "auto.offset.reset" -> "largest"
- )
- val kc = new KafkaCluster(kafkaParams)
- def getLatestOffset(): Long = {
- kc.getLatestLeaderOffsets(Set(topicPartition)).right.get(topicPartition).offset
- }
-
- // Send some initial messages before starting context
- kafkaTestUtils.sendMessages(topic, data)
- eventually(timeout(10 seconds), interval(20 milliseconds)) {
- assert(getLatestOffset() >= 10)
- }
- val offsetBeforeStart = getLatestOffset()
-
- // Setup context and kafka stream with largest offset
- ssc = new StreamingContext(sparkConf, Milliseconds(200))
- val stream = withClue("Error creating direct stream") {
- KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](
- ssc, kafkaParams, Map(topicPartition -> 11L),
- (m: MessageAndMetadata[String, String]) => m.message())
- }
- assert(
- stream.asInstanceOf[DirectKafkaInputDStream[_, _, _, _, _]]
- .fromOffsets(topicPartition) >= offsetBeforeStart,
- "Start offset not from latest"
- )
-
- val collectedData = new ConcurrentLinkedQueue[String]()
- stream.foreachRDD { rdd => collectedData.addAll(Arrays.asList(rdd.collect(): _*)) }
- ssc.start()
- val newData = Map("b" -> 10)
- kafkaTestUtils.sendMessages(topic, newData)
- eventually(timeout(10 seconds), interval(50 milliseconds)) {
- collectedData.contains("b")
- }
- assert(!collectedData.contains("a"))
- }
-
- // Test to verify the offset ranges can be recovered from the checkpoints
- test("offset recovery") {
- val topic = "recovery"
- kafkaTestUtils.createTopic(topic)
- testDir = Utils.createTempDir()
-
- val kafkaParams = Map(
- "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
- "auto.offset.reset" -> "smallest"
- )
-
- // Send data to Kafka and wait for it to be received
- def sendDataAndWaitForReceive(data: Seq[Int]) {
- val strings = data.map { _.toString}
- kafkaTestUtils.sendMessages(topic, strings.map { _ -> 1}.toMap)
- eventually(timeout(10 seconds), interval(50 milliseconds)) {
- assert(strings.forall { DirectKafkaStreamSuite.collectedData.contains })
- }
- }
-
- // Setup the streaming context
- ssc = new StreamingContext(sparkConf, Milliseconds(100))
- val kafkaStream = withClue("Error creating direct stream") {
- KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
- ssc, kafkaParams, Set(topic))
- }
- val keyedStream = kafkaStream.map { v => "key" -> v._2.toInt }
- val stateStream = keyedStream.updateStateByKey { (values: Seq[Int], state: Option[Int]) =>
- Some(values.sum + state.getOrElse(0))
- }
- ssc.checkpoint(testDir.getAbsolutePath)
-
- // This is to collect the raw data received from Kafka
- kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) =>
- val data = rdd.map { _._2 }.collect()
- DirectKafkaStreamSuite.collectedData.addAll(Arrays.asList(data: _*))
- }
-
- // This is ensure all the data is eventually receiving only once
- stateStream.foreachRDD { (rdd: RDD[(String, Int)]) =>
- rdd.collect().headOption.foreach { x => DirectKafkaStreamSuite.total = x._2 }
- }
- ssc.start()
-
- // Send some data and wait for them to be received
- for (i <- (1 to 10).grouped(4)) {
- sendDataAndWaitForReceive(i)
- }
-
- ssc.stop()
-
- // Verify that offset ranges were generated
- // Since "offsetRangesAfterStop" will be used to compare with "recoveredOffsetRanges", we should
- // collect offset ranges after stopping. Otherwise, because new RDDs keep being generated before
- // stopping, we may not be able to get the latest RDDs, then "recoveredOffsetRanges" will
- // contain something not in "offsetRangesAfterStop".
- val offsetRangesAfterStop = getOffsetRanges(kafkaStream)
- assert(offsetRangesAfterStop.size >= 1, "No offset ranges generated")
- assert(
- offsetRangesAfterStop.head._2.forall { _.fromOffset === 0 },
- "starting offset not zero"
- )
-
- logInfo("====== RESTARTING ========")
-
- // Recover context from checkpoints
- ssc = new StreamingContext(testDir.getAbsolutePath)
- val recoveredStream = ssc.graph.getInputStreams().head.asInstanceOf[DStream[(String, String)]]
-
- // Verify offset ranges have been recovered
- val recoveredOffsetRanges = getOffsetRanges(recoveredStream)
- assert(recoveredOffsetRanges.size > 0, "No offset ranges recovered")
- val earlierOffsetRangesAsSets = offsetRangesAfterStop.map { x => (x._1, x._2.toSet) }
- assert(
- recoveredOffsetRanges.forall { or =>
- earlierOffsetRangesAsSets.contains((or._1, or._2.toSet))
- },
- "Recovered ranges are not the same as the ones generated\n" +
- s"recoveredOffsetRanges: $recoveredOffsetRanges\n" +
- s"earlierOffsetRangesAsSets: $earlierOffsetRangesAsSets"
- )
- // Restart context, give more data and verify the total at the end
- // If the total is write that means each records has been received only once
- ssc.start()
- sendDataAndWaitForReceive(11 to 20)
- eventually(timeout(10 seconds), interval(50 milliseconds)) {
- assert(DirectKafkaStreamSuite.total === (1 to 20).sum)
- }
- ssc.stop()
- }
-
- test("Direct Kafka stream report input information") {
- val topic = "report-test"
- val data = Map("a" -> 7, "b" -> 9)
- kafkaTestUtils.createTopic(topic)
- kafkaTestUtils.sendMessages(topic, data)
-
- val totalSent = data.values.sum
- val kafkaParams = Map(
- "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
- "auto.offset.reset" -> "smallest"
- )
-
- import DirectKafkaStreamSuite._
- ssc = new StreamingContext(sparkConf, Milliseconds(200))
- val collector = new InputInfoCollector
- ssc.addStreamingListener(collector)
-
- val stream = withClue("Error creating direct stream") {
- KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
- ssc, kafkaParams, Set(topic))
- }
-
- val allReceived = new ConcurrentLinkedQueue[(String, String)]
-
- stream.foreachRDD { rdd => allReceived.addAll(Arrays.asList(rdd.collect(): _*)) }
- ssc.start()
- eventually(timeout(20000.milliseconds), interval(200.milliseconds)) {
- assert(allReceived.size === totalSent,
- "didn't get expected number of messages, messages:\n" +
- allReceived.asScala.mkString("\n"))
-
- // Calculate all the record number collected in the StreamingListener.
- assert(collector.numRecordsSubmitted.get() === totalSent)
- assert(collector.numRecordsStarted.get() === totalSent)
- assert(collector.numRecordsCompleted.get() === totalSent)
- }
- ssc.stop()
- }
-
- test("maxMessagesPerPartition with backpressure disabled") {
- val topic = "maxMessagesPerPartition"
- val kafkaStream = getDirectKafkaStream(topic, None)
-
- val input = Map(TopicAndPartition(topic, 0) -> 50L, TopicAndPartition(topic, 1) -> 50L)
- assert(kafkaStream.maxMessagesPerPartition(input).get ==
- Map(TopicAndPartition(topic, 0) -> 10L, TopicAndPartition(topic, 1) -> 10L))
- }
-
- test("maxMessagesPerPartition with no lag") {
- val topic = "maxMessagesPerPartition"
- val rateController = Some(new ConstantRateController(0, new ConstantEstimator(100), 100))
- val kafkaStream = getDirectKafkaStream(topic, rateController)
-
- val input = Map(TopicAndPartition(topic, 0) -> 0L, TopicAndPartition(topic, 1) -> 0L)
- assert(kafkaStream.maxMessagesPerPartition(input).isEmpty)
- }
-
- test("maxMessagesPerPartition respects max rate") {
- val topic = "maxMessagesPerPartition"
- val rateController = Some(new ConstantRateController(0, new ConstantEstimator(100), 1000))
- val kafkaStream = getDirectKafkaStream(topic, rateController)
-
- val input = Map(TopicAndPartition(topic, 0) -> 1000L, TopicAndPartition(topic, 1) -> 1000L)
- assert(kafkaStream.maxMessagesPerPartition(input).get ==
- Map(TopicAndPartition(topic, 0) -> 10L, TopicAndPartition(topic, 1) -> 10L))
- }
-
- test("using rate controller") {
- val topic = "backpressure"
- val topicPartitions = Set(TopicAndPartition(topic, 0), TopicAndPartition(topic, 1))
- kafkaTestUtils.createTopic(topic, 2)
- val kafkaParams = Map(
- "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
- "auto.offset.reset" -> "smallest"
- )
-
- val batchIntervalMilliseconds = 100
- val estimator = new ConstantEstimator(100)
- val messages = Map("foo" -> 200)
- kafkaTestUtils.sendMessages(topic, messages)
-
- val sparkConf = new SparkConf()
- // Safe, even with streaming, because we're using the direct API.
- // Using 1 core is useful to make the test more predictable.
- .setMaster("local[1]")
- .setAppName(this.getClass.getSimpleName)
- .set("spark.streaming.kafka.maxRatePerPartition", "100")
-
- // Setup the streaming context
- ssc = new StreamingContext(sparkConf, Milliseconds(batchIntervalMilliseconds))
-
- val kafkaStream = withClue("Error creating direct stream") {
- val kc = new KafkaCluster(kafkaParams)
- val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)
- val m = kc.getEarliestLeaderOffsets(topicPartitions)
- .fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo => lo.offset))
-
- new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)](
- ssc, kafkaParams, m, messageHandler) {
- override protected[streaming] val rateController =
- Some(new DirectKafkaRateController(id, estimator))
- }
- }
-
- val collectedData = new ConcurrentLinkedQueue[Array[String]]()
-
- // Used for assertion failure messages.
- def dataToString: String =
- collectedData.asScala.map(_.mkString("[", ",", "]")).mkString("{", ", ", "}")
-
- // This is to collect the raw data received from Kafka
- kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) =>
- val data = rdd.map { _._2 }.collect()
- collectedData.add(data)
- }
-
- ssc.start()
-
- // Try different rate limits.
- // Wait for arrays of data to appear matching the rate.
- Seq(100, 50, 20).foreach { rate =>
- collectedData.clear() // Empty this buffer on each pass.
- estimator.updateRate(rate) // Set a new rate.
- // Expect blocks of data equal to "rate", scaled by the interval length in secs.
- val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001)
- eventually(timeout(5.seconds), interval(batchIntervalMilliseconds.milliseconds)) {
- // Assert that rate estimator values are used to determine maxMessagesPerPartition.
- // Funky "-" in message makes the complete assertion message read better.
- assert(collectedData.asScala.exists(_.size == expectedSize),
- s" - No arrays of size $expectedSize for rate $rate found in $dataToString")
- }
- }
-
- ssc.stop()
- }
-
- /** Get the generated offset ranges from the DirectKafkaStream */
- private def getOffsetRanges[K, V](
- kafkaStream: DStream[(K, V)]): Seq[(Time, Array[OffsetRange])] = {
- kafkaStream.generatedRDDs.mapValues { rdd =>
- rdd.asInstanceOf[KafkaRDD[K, V, _, _, (K, V)]].offsetRanges
- }.toSeq.sortBy { _._1 }
- }
-
- private def getDirectKafkaStream(topic: String, mockRateController: Option[RateController]) = {
- val batchIntervalMilliseconds = 100
-
- val sparkConf = new SparkConf()
- .setMaster("local[1]")
- .setAppName(this.getClass.getSimpleName)
- .set("spark.streaming.kafka.maxRatePerPartition", "100")
-
- // Setup the streaming context
- ssc = new StreamingContext(sparkConf, Milliseconds(batchIntervalMilliseconds))
-
- val earliestOffsets = Map(TopicAndPartition(topic, 0) -> 0L, TopicAndPartition(topic, 1) -> 0L)
- val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)
- new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)](
- ssc, Map[String, String](), earliestOffsets, messageHandler) {
- override protected[streaming] val rateController = mockRateController
- }
- }
-}
-
-object DirectKafkaStreamSuite {
- val collectedData = new ConcurrentLinkedQueue[String]()
- @volatile var total = -1L
-
- class InputInfoCollector extends StreamingListener {
- val numRecordsSubmitted = new AtomicLong(0L)
- val numRecordsStarted = new AtomicLong(0L)
- val numRecordsCompleted = new AtomicLong(0L)
-
- override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = {
- numRecordsSubmitted.addAndGet(batchSubmitted.batchInfo.numRecords)
- }
-
- override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = {
- numRecordsStarted.addAndGet(batchStarted.batchInfo.numRecords)
- }
-
- override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
- numRecordsCompleted.addAndGet(batchCompleted.batchInfo.numRecords)
- }
- }
-}
-
-private[streaming] class ConstantEstimator(@volatile private var rate: Long)
- extends RateEstimator {
-
- def updateRate(newRate: Long): Unit = {
- rate = newRate
- }
-
- def compute(
- time: Long,
- elements: Long,
- processingDelay: Long,
- schedulingDelay: Long): Option[Double] = Some(rate)
-}
-
-private[streaming] class ConstantRateController(id: Int, estimator: RateEstimator, rate: Long)
- extends RateController(id, estimator) {
- override def publish(rate: Long): Unit = ()
- override def getLatestRate(): Long = rate
-}
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
deleted file mode 100644
index d66830cbac..0000000000
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import scala.util.Random
-
-import kafka.common.TopicAndPartition
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.spark.SparkFunSuite
-
-class KafkaClusterSuite extends SparkFunSuite with BeforeAndAfterAll {
- private val topic = "kcsuitetopic" + Random.nextInt(10000)
- private val topicAndPartition = TopicAndPartition(topic, 0)
- private var kc: KafkaCluster = null
-
- private var kafkaTestUtils: KafkaTestUtils = _
-
- override def beforeAll() {
- kafkaTestUtils = new KafkaTestUtils
- kafkaTestUtils.setup()
-
- kafkaTestUtils.createTopic(topic)
- kafkaTestUtils.sendMessages(topic, Map("a" -> 1))
- kc = new KafkaCluster(Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress))
- }
-
- override def afterAll() {
- if (kafkaTestUtils != null) {
- kafkaTestUtils.teardown()
- kafkaTestUtils = null
- }
- }
-
- test("metadata apis") {
- val leader = kc.findLeaders(Set(topicAndPartition)).right.get(topicAndPartition)
- val leaderAddress = s"${leader._1}:${leader._2}"
- assert(leaderAddress === kafkaTestUtils.brokerAddress, "didn't get leader")
-
- val parts = kc.getPartitions(Set(topic)).right.get
- assert(parts(topicAndPartition), "didn't get partitions")
-
- val err = kc.getPartitions(Set(topic + "BAD"))
- assert(err.isLeft, "getPartitions for a nonexistant topic should be an error")
- }
-
- test("leader offset apis") {
- val earliest = kc.getEarliestLeaderOffsets(Set(topicAndPartition)).right.get
- assert(earliest(topicAndPartition).offset === 0, "didn't get earliest")
-
- val latest = kc.getLatestLeaderOffsets(Set(topicAndPartition)).right.get
- assert(latest(topicAndPartition).offset === 1, "didn't get latest")
- }
-
- test("consumer offset apis") {
- val group = "kcsuitegroup" + Random.nextInt(10000)
-
- val offset = Random.nextInt(10000)
-
- val set = kc.setConsumerOffsets(group, Map(topicAndPartition -> offset))
- assert(set.isRight, "didn't set consumer offsets")
-
- val get = kc.getConsumerOffsets(group, Set(topicAndPartition)).right.get
- assert(get(topicAndPartition) === offset, "didn't get consumer offsets")
- }
-}
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
deleted file mode 100644
index 5e539c1d79..0000000000
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import scala.util.Random
-
-import kafka.common.TopicAndPartition
-import kafka.message.MessageAndMetadata
-import kafka.serializer.StringDecoder
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.spark._
-
-class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
-
- private var kafkaTestUtils: KafkaTestUtils = _
-
- private val sparkConf = new SparkConf().setMaster("local[4]")
- .setAppName(this.getClass.getSimpleName)
- private var sc: SparkContext = _
-
- override def beforeAll {
- sc = new SparkContext(sparkConf)
- kafkaTestUtils = new KafkaTestUtils
- kafkaTestUtils.setup()
- }
-
- override def afterAll {
- if (sc != null) {
- sc.stop
- sc = null
- }
-
- if (kafkaTestUtils != null) {
- kafkaTestUtils.teardown()
- kafkaTestUtils = null
- }
- }
-
- test("basic usage") {
- val topic = s"topicbasic-${Random.nextInt}"
- kafkaTestUtils.createTopic(topic)
- val messages = Array("the", "quick", "brown", "fox")
- kafkaTestUtils.sendMessages(topic, messages)
-
- val kafkaParams = Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress,
- "group.id" -> s"test-consumer-${Random.nextInt}")
-
- val offsetRanges = Array(OffsetRange(topic, 0, 0, messages.size))
-
- val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](
- sc, kafkaParams, offsetRanges)
-
- val received = rdd.map(_._2).collect.toSet
- assert(received === messages.toSet)
-
- // size-related method optimizations return sane results
- assert(rdd.count === messages.size)
- assert(rdd.countApprox(0).getFinalValue.mean === messages.size)
- assert(!rdd.isEmpty)
- assert(rdd.take(1).size === 1)
- assert(rdd.take(1).head._2 === messages.head)
- assert(rdd.take(messages.size + 10).size === messages.size)
-
- val emptyRdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](
- sc, kafkaParams, Array(OffsetRange(topic, 0, 0, 0)))
-
- assert(emptyRdd.isEmpty)
-
- // invalid offset ranges throw exceptions
- val badRanges = Array(OffsetRange(topic, 0, 0, messages.size + 1))
- intercept[SparkException] {
- KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](
- sc, kafkaParams, badRanges)
- }
- }
-
- test("iterator boundary conditions") {
- // the idea is to find e.g. off-by-one errors between what kafka has available and the rdd
- val topic = s"topicboundary-${Random.nextInt}"
- val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
- kafkaTestUtils.createTopic(topic)
-
- val kafkaParams = Map("metadata.broker.list" -> kafkaTestUtils.brokerAddress,
- "group.id" -> s"test-consumer-${Random.nextInt}")
-
- val kc = new KafkaCluster(kafkaParams)
-
- // this is the "lots of messages" case
- kafkaTestUtils.sendMessages(topic, sent)
- val sentCount = sent.values.sum
-
- // rdd defined from leaders after sending messages, should get the number sent
- val rdd = getRdd(kc, Set(topic))
-
- assert(rdd.isDefined)
-
- val ranges = rdd.get.asInstanceOf[HasOffsetRanges].offsetRanges
- val rangeCount = ranges.map(o => o.untilOffset - o.fromOffset).sum
-
- assert(rangeCount === sentCount, "offset range didn't include all sent messages")
- assert(rdd.get.count === sentCount, "didn't get all sent messages")
-
- val rangesMap = ranges.map(o => TopicAndPartition(o.topic, o.partition) -> o.untilOffset).toMap
-
- // make sure consumer offsets are committed before the next getRdd call
- kc.setConsumerOffsets(kafkaParams("group.id"), rangesMap).fold(
- err => throw new Exception(err.mkString("\n")),
- _ => ()
- )
-
- // this is the "0 messages" case
- val rdd2 = getRdd(kc, Set(topic))
- // shouldn't get anything, since message is sent after rdd was defined
- val sentOnlyOne = Map("d" -> 1)
-
- kafkaTestUtils.sendMessages(topic, sentOnlyOne)
-
- assert(rdd2.isDefined)
- assert(rdd2.get.count === 0, "got messages when there shouldn't be any")
-
- // this is the "exactly 1 message" case, namely the single message from sentOnlyOne above
- val rdd3 = getRdd(kc, Set(topic))
- // send lots of messages after rdd was defined, they shouldn't show up
- kafkaTestUtils.sendMessages(topic, Map("extra" -> 22))
-
- assert(rdd3.isDefined)
- assert(rdd3.get.count === sentOnlyOne.values.sum, "didn't get exactly one message")
-
- }
-
- // get an rdd from the committed consumer offsets until the latest leader offsets,
- private def getRdd(kc: KafkaCluster, topics: Set[String]) = {
- val groupId = kc.kafkaParams("group.id")
- def consumerOffsets(topicPartitions: Set[TopicAndPartition]) = {
- kc.getConsumerOffsets(groupId, topicPartitions).right.toOption.orElse(
- kc.getEarliestLeaderOffsets(topicPartitions).right.toOption.map { offs =>
- offs.map(kv => kv._1 -> kv._2.offset)
- }
- )
- }
- kc.getPartitions(topics).right.toOption.flatMap { topicPartitions =>
- consumerOffsets(topicPartitions).flatMap { from =>
- kc.getLatestLeaderOffsets(topicPartitions).right.toOption.map { until =>
- val offsetRanges = from.map { case (tp: TopicAndPartition, fromOffset: Long) =>
- OffsetRange(tp.topic, tp.partition, fromOffset, until(tp).offset)
- }.toArray
-
- val leaders = until.map { case (tp: TopicAndPartition, lo: KafkaCluster.LeaderOffset) =>
- tp -> Broker(lo.host, lo.port)
- }.toMap
-
- KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder, String](
- sc, kc.kafkaParams, offsetRanges, leaders,
- (mmd: MessageAndMetadata[String, String]) => s"${mmd.offset} ${mmd.message}")
- }
- }
- }
- }
-}
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
deleted file mode 100644
index 6a35ac14a8..0000000000
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import scala.collection.mutable
-import scala.concurrent.duration._
-import scala.language.postfixOps
-import scala.util.Random
-
-import kafka.serializer.StringDecoder
-import org.scalatest.BeforeAndAfterAll
-import org.scalatest.concurrent.Eventually
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Milliseconds, StreamingContext}
-
-class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfterAll {
- private var ssc: StreamingContext = _
- private var kafkaTestUtils: KafkaTestUtils = _
-
- override def beforeAll(): Unit = {
- kafkaTestUtils = new KafkaTestUtils
- kafkaTestUtils.setup()
- }
-
- override def afterAll(): Unit = {
- if (ssc != null) {
- ssc.stop()
- ssc = null
- }
-
- if (kafkaTestUtils != null) {
- kafkaTestUtils.teardown()
- kafkaTestUtils = null
- }
- }
-
- test("Kafka input stream") {
- val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
- ssc = new StreamingContext(sparkConf, Milliseconds(500))
- val topic = "topic1"
- val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
- kafkaTestUtils.createTopic(topic)
- kafkaTestUtils.sendMessages(topic, sent)
-
- val kafkaParams = Map("zookeeper.connect" -> kafkaTestUtils.zkAddress,
- "group.id" -> s"test-consumer-${Random.nextInt(10000)}",
- "auto.offset.reset" -> "smallest")
-
- val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
- ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
- val result = new mutable.HashMap[String, Long]()
- stream.map(_._2).countByValue().foreachRDD { r =>
- r.collect().foreach { kv =>
- result.synchronized {
- val count = result.getOrElseUpdate(kv._1, 0) + kv._2
- result.put(kv._1, count)
- }
- }
- }
-
- ssc.start()
-
- eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
- assert(result.synchronized { sent === result })
- }
- }
-}
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
deleted file mode 100644
index 7b9aee39ff..0000000000
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.kafka
-
-import java.io.File
-
-import scala.collection.mutable
-import scala.concurrent.duration._
-import scala.language.postfixOps
-import scala.util.Random
-
-import kafka.serializer.StringDecoder
-import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
-import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
-import org.scalatest.concurrent.Eventually
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Milliseconds, StreamingContext}
-import org.apache.spark.util.Utils
-
-class ReliableKafkaStreamSuite extends SparkFunSuite
- with BeforeAndAfterAll with BeforeAndAfter with Eventually {
-
- private val sparkConf = new SparkConf()
- .setMaster("local[4]")
- .setAppName(this.getClass.getSimpleName)
- .set("spark.streaming.receiver.writeAheadLog.enable", "true")
- private val data = Map("a" -> 10, "b" -> 10, "c" -> 10)
-
- private var kafkaTestUtils: KafkaTestUtils = _
-
- private var groupId: String = _
- private var kafkaParams: Map[String, String] = _
- private var ssc: StreamingContext = _
- private var tempDirectory: File = null
-
- override def beforeAll(): Unit = {
- kafkaTestUtils = new KafkaTestUtils
- kafkaTestUtils.setup()
-
- groupId = s"test-consumer-${Random.nextInt(10000)}"
- kafkaParams = Map(
- "zookeeper.connect" -> kafkaTestUtils.zkAddress,
- "group.id" -> groupId,
- "auto.offset.reset" -> "smallest"
- )
-
- tempDirectory = Utils.createTempDir()
- }
-
- override def afterAll(): Unit = {
- Utils.deleteRecursively(tempDirectory)
-
- if (kafkaTestUtils != null) {
- kafkaTestUtils.teardown()
- kafkaTestUtils = null
- }
- }
-
- before {
- ssc = new StreamingContext(sparkConf, Milliseconds(500))
- ssc.checkpoint(tempDirectory.getAbsolutePath)
- }
-
- after {
- if (ssc != null) {
- ssc.stop()
- ssc = null
- }
- }
-
- test("Reliable Kafka input stream with single topic") {
- val topic = "test-topic"
- kafkaTestUtils.createTopic(topic)
- kafkaTestUtils.sendMessages(topic, data)
-
- // Verify whether the offset of this group/topic/partition is 0 before starting.
- assert(getCommitOffset(groupId, topic, 0) === None)
-
- val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
- ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
- val result = new mutable.HashMap[String, Long]()
- stream.map { case (k, v) => v }.foreachRDD { r =>
- val ret = r.collect()
- ret.foreach { v =>
- val count = result.getOrElseUpdate(v, 0) + 1
- result.put(v, count)
- }
- }
- ssc.start()
-
- eventually(timeout(20000 milliseconds), interval(200 milliseconds)) {
- // A basic process verification for ReliableKafkaReceiver.
- // Verify whether received message number is equal to the sent message number.
- assert(data.size === result.size)
- // Verify whether each message is the same as the data to be verified.
- data.keys.foreach { k => assert(data(k) === result(k).toInt) }
- // Verify the offset number whether it is equal to the total message number.
- assert(getCommitOffset(groupId, topic, 0) === Some(29L))
- }
- }
-
- test("Reliable Kafka input stream with multiple topics") {
- val topics = Map("topic1" -> 1, "topic2" -> 1, "topic3" -> 1)
- topics.foreach { case (t, _) =>
- kafkaTestUtils.createTopic(t)
- kafkaTestUtils.sendMessages(t, data)
- }
-
- // Before started, verify all the group/topic/partition offsets are 0.
- topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === None) }
-
- // Consuming all the data sent to the broker which will potential commit the offsets internally.
- val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
- ssc, kafkaParams, topics, StorageLevel.MEMORY_ONLY)
- stream.foreachRDD(_ => Unit)
- ssc.start()
-
- eventually(timeout(20000 milliseconds), interval(100 milliseconds)) {
- // Verify the offset for each group/topic to see whether they are equal to the expected one.
- topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === Some(29L)) }
- }
- }
-
-
- /** Getting partition offset from Zookeeper. */
- private def getCommitOffset(groupId: String, topic: String, partition: Int): Option[Long] = {
- val topicDirs = new ZKGroupTopicDirs(groupId, topic)
- val zkPath = s"${topicDirs.consumerOffsetDir}/$partition"
- ZkUtils.readDataMaybeNull(kafkaTestUtils.zookeeperClient, zkPath)._1.map(_.toLong)
- }
-}