aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-01-14 00:05:37 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-01-14 00:05:37 -0800
commit980250b1ee0cdba9cf06ea87c790a2d504bbf03e (patch)
tree547a023f4a672ab84299762d71e40d19dc2cf92f /external
parent055be5c6940a82e2d7fa39f968a434643a1eb1e2 (diff)
parentf8bd828c7ccf1ff69bc35bf95d07183cb35a7c72 (diff)
downloadspark-980250b1ee0cdba9cf06ea87c790a2d504bbf03e.tar.gz
spark-980250b1ee0cdba9cf06ea87c790a2d504bbf03e.tar.bz2
spark-980250b1ee0cdba9cf06ea87c790a2d504bbf03e.zip
Merge pull request #416 from tdas/filestream-fix
Removed unnecessary DStream operations and updated docs Removed StreamingContext.registerInputStream and registerOutputStream - they were useless. InputDStream has been made to register itself, and just registering a DStream as output stream cause RDD objects to be created but the RDDs will not be computed at all.. Also made DStream.register() private[streaming] for the same reasons. Updated docs, specially added package documentation for streaming package. Also, changed NetworkWordCount's input storage level to use MEMORY_ONLY, replication on the local machine causes warning messages (as replication fails) which is scary for a new user trying out his/her first example.
Diffstat (limited to 'external')
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala1
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala2
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala4
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala4
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala4
5 files changed, 4 insertions, 11 deletions
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
index d53b66dd46..654ba451e7 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
@@ -37,7 +37,6 @@ object FlumeUtils {
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[SparkFlumeEvent] = {
val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel)
- ssc.registerInputStream(inputStream)
inputStream
}
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
index 2e8e9fac45..8bc43972ab 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -43,7 +43,7 @@ class FlumeStreamSuite extends TestSuiteBase {
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
- ssc.registerOutputStream(outputStream)
+ outputStream.register()
ssc.start()
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index 37c03be4e7..15a2daa008 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -71,9 +71,7 @@ object KafkaUtils {
topics: Map[String, Int],
storageLevel: StorageLevel
): DStream[(K, V)] = {
- val inputStream = new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel)
- ssc.registerInputStream(inputStream)
- inputStream
+ new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel)
}
/**
diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
index 3636e46bb8..1b09ee5dc8 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala
@@ -37,9 +37,7 @@ object MQTTUtils {
topic: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[String] = {
- val inputStream = new MQTTInputDStream[String](ssc, brokerUrl, topic, storageLevel)
- ssc.registerInputStream(inputStream)
- inputStream
+ new MQTTInputDStream[String](ssc, brokerUrl, topic, storageLevel)
}
/**
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
index b8bae7b6d3..e8433b7e9f 100644
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala
@@ -41,9 +41,7 @@ object TwitterUtils {
filters: Seq[String] = Nil,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[Status] = {
- val inputStream = new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel)
- ssc.registerInputStream(inputStream)
- inputStream
+ new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel)
}
/**