From b93c97d79b42a06b48d2a8d98beccc636442541e Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 18 May 2015 14:33:33 -0700 Subject: [SPARK-7501] [STREAMING] DAG visualization: show DStream operations This is similar to #5999, but for streaming. Roughly 200 lines are tests. One thing to note here is that we already do some kind of scoping thing for call sites, so this patch adds the new RDD operation scoping logic in the same place. Also, this patch adds a `try finally` block to set the relevant variables in a safer way. tdas zsxwing ------------------------ **Before** -------------------------- **After** Author: Andrew Or Closes #6034 from andrewor14/dag-viz-streaming and squashes the following commits: 932a64a [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-streaming e685df9 [Andrew Or] Rename createRDDWith 84d0656 [Andrew Or] Review feedback 697c086 [Andrew Or] Fix tests 53b9936 [Andrew Or] Set scopes for foreachRDD properly 1881802 [Andrew Or] Refactor DStream scope names again af4ba8d [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-streaming fd07d22 [Andrew Or] Make MQTT lower case f6de871 [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-streaming 0ca1801 [Andrew Or] Remove a few unnecessary withScopes on aliases fa4e5fb [Andrew Or] Pass in input stream name rather than defining it from within 1af0b0e [Andrew Or] Fix style 074c00b [Andrew Or] Review comments d25a324 [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-streaming e4a93ac [Andrew Or] Fix tests? 25416dc [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-streaming 9113183 [Andrew Or] Add tests for DStream scopes b3806ab [Andrew Or] Fix test bb80bbb [Andrew Or] Fix MIMA? 5c30360 [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-streaming 5703939 [Andrew Or] Rename operations that create InputDStreams 7c4513d [Andrew Or] Group RDDs by DStream operations and batches bf0ab6e [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-streaming 05c2676 [Andrew Or] Wrap many more methods in withScope c121047 [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-streaming 65ef3e9 [Andrew Or] Fix NPE a0d3263 [Andrew Or] Scope streaming operations instead of RDD operations --- .../spark/streaming/kafka/DirectKafkaInputDStream.scala | 3 +++ .../org/apache/spark/streaming/kafka/KafkaUtils.scala | 17 ++++++++++------- .../apache/spark/streaming/mqtt/MQTTInputDStream.scala | 3 ++- 3 files changed, 15 insertions(+), 8 deletions(-) (limited to 'external') diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala index 6715aede79..060c2f23ed 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala @@ -65,6 +65,9 @@ class DirectKafkaInputDStream[ val maxRetries = context.sparkContext.getConf.getInt( "spark.streaming.kafka.maxRetries", 1) + // Keep this consistent with how other streams are named (e.g. "Flume polling stream [2]") + private[streaming] override def name: String = s"Kafka direct stream [$id]" + protected[streaming] override val checkpointData = new DirectKafkaInputDStreamCheckpointData diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index d7cf500577..8be2707528 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -189,7 +189,7 @@ object KafkaUtils { sc: SparkContext, kafkaParams: Map[String, String], offsetRanges: Array[OffsetRange] - ): RDD[(K, V)] = { + ): RDD[(K, V)] = sc.withScope { val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message) val leaders = leadersForRanges(kafkaParams, offsetRanges) new KafkaRDD[K, V, KD, VD, (K, V)](sc, kafkaParams, offsetRanges, leaders, messageHandler) @@ -224,7 +224,7 @@ object KafkaUtils { offsetRanges: Array[OffsetRange], leaders: Map[TopicAndPartition, Broker], messageHandler: MessageAndMetadata[K, V] => R - ): RDD[R] = { + ): RDD[R] = sc.withScope { val leaderMap = if (leaders.isEmpty) { leadersForRanges(kafkaParams, offsetRanges) } else { @@ -233,7 +233,8 @@ object KafkaUtils { case (tp: TopicAndPartition, Broker(host, port)) => (tp, (host, port)) }.toMap } - new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, messageHandler) + val cleanedHandler = sc.clean(messageHandler) + new KafkaRDD[K, V, KD, VD, R](sc, kafkaParams, offsetRanges, leaderMap, cleanedHandler) } /** @@ -256,7 +257,7 @@ object KafkaUtils { valueDecoderClass: Class[VD], kafkaParams: JMap[String, String], offsetRanges: Array[OffsetRange] - ): JavaPairRDD[K, V] = { + ): JavaPairRDD[K, V] = jsc.sc.withScope { implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) @@ -294,7 +295,7 @@ object KafkaUtils { offsetRanges: Array[OffsetRange], leaders: JMap[TopicAndPartition, Broker], messageHandler: JFunction[MessageAndMetadata[K, V], R] - ): JavaRDD[R] = { + ): JavaRDD[R] = jsc.sc.withScope { implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) @@ -348,8 +349,9 @@ object KafkaUtils { fromOffsets: Map[TopicAndPartition, Long], messageHandler: MessageAndMetadata[K, V] => R ): InputDStream[R] = { + val cleanedHandler = ssc.sc.clean(messageHandler) new DirectKafkaInputDStream[K, V, KD, VD, R]( - ssc, kafkaParams, fromOffsets, messageHandler) + ssc, kafkaParams, fromOffsets, cleanedHandler) } /** @@ -469,11 +471,12 @@ object KafkaUtils { implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) implicit val recordCmt: ClassTag[R] = ClassTag(recordClass) + val cleanedHandler = jssc.sparkContext.clean(messageHandler.call _) createDirectStream[K, V, KD, VD, R]( jssc.ssc, Map(kafkaParams.toSeq: _*), Map(fromOffsets.mapValues { _.longValue() }.toSeq: _*), - messageHandler.call _ + cleanedHandler ) } diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala index 3c0ef94cb0..40f5f18547 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala @@ -35,7 +35,6 @@ import org.eclipse.paho.client.mqttv3.MqttMessage import org.eclipse.paho.client.mqttv3.MqttTopic import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence -import org.apache.spark.Logging import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream._ @@ -57,6 +56,8 @@ class MQTTInputDStream( storageLevel: StorageLevel ) extends ReceiverInputDStream[String](ssc_) { + private[streaming] override def name: String = s"MQTT stream [$id]" + def getReceiver(): Receiver[String] = { new MQTTReceiver(brokerUrl, topic, storageLevel) } -- cgit v1.2.3