aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-02-09 15:18:05 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-02-09 15:18:05 -0800
commit99a5fc498acf3de14d754f8dda0df6bb81dd9595 (patch)
treec1596614463f91cc524dc7d75081db6877a0b3ae /streaming
parent4cc223b4785c9da39c4a35d2adb7339dfa8e47e6 (diff)
downloadspark-99a5fc498acf3de14d754f8dda0df6bb81dd9595.tar.gz
spark-99a5fc498acf3de14d754f8dda0df6bb81dd9595.tar.bz2
spark-99a5fc498acf3de14d754f8dda0df6bb81dd9595.zip
Added an initial spark job to ensure worker nodes are initialized.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala7
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala2
2 files changed, 7 insertions, 2 deletions
diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
index e4152f3a61..b54f53b203 100644
--- a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
@@ -4,6 +4,7 @@ import spark.streaming.dstream.{NetworkInputDStream, NetworkReceiver}
import spark.streaming.dstream.{StopReceiver, ReportBlock, ReportError}
import spark.Logging
import spark.SparkEnv
+import spark.SparkContext._
import scala.collection.mutable.HashMap
import scala.collection.mutable.Queue
@@ -138,8 +139,12 @@ class NetworkInputTracker(
}
iterator.next().start()
}
+ // Run the dummy Spark job to ensure that all slaves have registered.
+ // This avoids all the receivers to be scheduled on the same node.
+ //ssc.sparkContext.makeRDD(1 to 100, 100).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
+
// Distribute the receivers and start them
- ssc.sc.runJob(tempRDD, startReceiver)
+ ssc.sparkContext.runJob(tempRDD, startReceiver)
}
/** Stops the receivers. */
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index c442210004..0eb9c7b81e 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -95,7 +95,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq(1, 2, 3, 4, 5)
-
+ Thread.sleep(1000)
val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", 33333));
val client = SpecificRequestor.getClient(
classOf[AvroSourceProtocol], transceiver);