diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-13 23:23:46 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-13 23:23:46 -0800 |
commit | 4e497db8f3826cf5142b2165a08d02c6f3c2cd90 (patch) | |
tree | 9ec25e86ccf8986035215e51f7b0e1ba1b96dad6 /external | |
parent | 1233b3de01be1ff57910786f5f3e2e2a23e228ab (diff) | |
download | spark-4e497db8f3826cf5142b2165a08d02c6f3c2cd90.tar.gz spark-4e497db8f3826cf5142b2165a08d02c6f3c2cd90.tar.bz2 spark-4e497db8f3826cf5142b2165a08d02c6f3c2cd90.zip |
Removed StreamingContext.registerInputStream and registerOutputStream - they were useless as InputDStream has been made to register itself. Also made DStream.register() private[streaming] - not useful to expose the confusing function. Updated a lot of documentation.
Diffstat (limited to 'external')
5 files changed, 4 insertions, 11 deletions
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala index d53b66dd46..654ba451e7 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala @@ -37,7 +37,6 @@ object FlumeUtils { storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): DStream[SparkFlumeEvent] = { val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel) - ssc.registerInputStream(inputStream) inputStream } diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala index 2e8e9fac45..8bc43972ab 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala @@ -43,7 +43,7 @@ class FlumeStreamSuite extends TestSuiteBase { val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]] with SynchronizedBuffer[Seq[SparkFlumeEvent]] val outputStream = new TestOutputStream(flumeStream, outputBuffer) - ssc.registerOutputStream(outputStream) + outputStream.register() ssc.start() val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index 37c03be4e7..15a2daa008 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -71,9 +71,7 @@ object KafkaUtils { topics: Map[String, Int], storageLevel: StorageLevel ): DStream[(K, V)] = { - val inputStream = new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel) - ssc.registerInputStream(inputStream) - inputStream + new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel) } /** diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala index 3636e46bb8..1b09ee5dc8 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala @@ -37,9 +37,7 @@ object MQTTUtils { topic: String, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): DStream[String] = { - val inputStream = new MQTTInputDStream[String](ssc, brokerUrl, topic, storageLevel) - ssc.registerInputStream(inputStream) - inputStream + new MQTTInputDStream[String](ssc, brokerUrl, topic, storageLevel) } /** diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala index b8bae7b6d3..e8433b7e9f 100644 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala @@ -41,9 +41,7 @@ object TwitterUtils { filters: Seq[String] = Nil, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): DStream[Status] = { - val inputStream = new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel) - ssc.registerInputStream(inputStream) - inputStream + new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel) } /** |