aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
Diffstat (limited to 'external')
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala3
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala17
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala3
3 files changed, 15 insertions, 8 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index 6715aede79..060c2f23ed 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -65,6 +65,9 @@ class DirectKafkaInputDStream[
val maxRetries = context.sparkContext.getConf.getInt(
"spark.streaming.kafka.maxRetries", 1)
+ // Keep this consistent with how other streams are named (e.g. "Flume polling stream [2]")
+ private[streaming] override def name: String = s"Kafka direct stream [$id]"
+
protected[streaming] override val checkpointData =
new DirectKafkaInputDStreamCheckpointData
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index d7cf500577..8be2707528 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -189,7 +189,7 @@ object KafkaUtils {
sc: SparkContext,
kafkaParams: Map[String, String],
offsetRanges: Array[OffsetRange]
- ): RDD[(K, V)] = {
+ ): RDD[(K, V)] = sc.withScope {
val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
val leaders = leadersForRanges(kafkaParams, offsetRanges)
new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler)
@@ -224,7 +224,7 @@ object KafkaUtils {
offsetRanges: Array[OffsetRange],
leaders: Map[TopicAndPartition, Broker],
messageHandler: MessageAndMetadata[K, V] => R
- ): RDD[R] = {
+ ): RDD[R] = sc.withScope {
val leaderMap = if (leaders.isEmpty) {
leadersForRanges(kafkaParams, offsetRanges)
} else {
@@ -233,7 +233,8 @@ object KafkaUtils {
case (tp: TopicAndPartition, Broker(host, port)) => (tp, (host, port))
}.toMap
}
- new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler)
+ val cleanedHandler = sc.clean(messageHandler)
+ new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, cleanedHandler)
}
/**
@@ -256,7 +257,7 @@ object KafkaUtils {
valueDecoderClass: Class[VD],
kafkaParams: JMap[String, String],
offsetRanges: Array[OffsetRange]
- ): JavaPairRDD[K, V] = {
+ ): JavaPairRDD[K, V] = jsc.sc.withScope {
implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
@@ -294,7 +295,7 @@ object KafkaUtils {
offsetRanges: Array[OffsetRange],
leaders: JMap[TopicAndPartition, Broker],
messageHandler: JFunction[MessageAndMetadata[K, V], R]
- ): JavaRDD[R] = {
+ ): JavaRDD[R] = jsc.sc.withScope {
implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
@@ -348,8 +349,9 @@ object KafkaUtils {
fromOffsets: Map[TopicAndPartition, Long],
messageHandler: MessageAndMetadata[K, V] => R
): InputDStream[R] = {
+ val cleanedHandler = ssc.sc.clean(messageHandler)
new DirectKafkaInputDStream[K, V, KD, VD, R](
- ssc, kafkaParams, fromOffsets, messageHandler)
+ ssc, kafkaParams, fromOffsets, cleanedHandler)
}
/**
@@ -469,11 +471,12 @@ object KafkaUtils {
implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
+ val cleanedHandler = jssc.sparkContext.clean(messageHandler.call _)
createDirectStream[K, V, KD, VD, R](
jssc.ssc,
Map(kafkaParams.toSeq: _*),
Map(fromOffsets.mapValues { _.longValue() }.toSeq: _*),
- messageHandler.call _
+ cleanedHandler
)
}
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
index 3c0ef94cb0..40f5f18547 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
@@ -35,7 +35,6 @@ import org.eclipse.paho.client.mqttv3.MqttMessage
import org.eclipse.paho.client.mqttv3.MqttTopic
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
-import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream._
@@ -57,6 +56,8 @@ class MQTTInputDStream(
storageLevel: StorageLevel
) extends ReceiverInputDStream[String](ssc_) {
+ private[streaming] override def name: String = s"MQTT stream [$id]"
+
def getReceiver(): Receiver[String] = {
new MQTTReceiver(brokerUrl, topic, storageLevel)
}