diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-09 15:18:05 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-09 15:18:05 -0800 |
commit | 99a5fc498acf3de14d754f8dda0df6bb81dd9595 (patch) | |
tree | c1596614463f91cc524dc7d75081db6877a0b3ae /streaming/src | |
parent | 4cc223b4785c9da39c4a35d2adb7339dfa8e47e6 (diff) | |
download | spark-99a5fc498acf3de14d754f8dda0df6bb81dd9595.tar.gz spark-99a5fc498acf3de14d754f8dda0df6bb81dd9595.tar.bz2 spark-99a5fc498acf3de14d754f8dda0df6bb81dd9595.zip |
Added an initial spark job to ensure worker nodes are initialized.
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala | 7 | ||||
-rw-r--r-- | streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala | 2 |
2 files changed, 7 insertions, 2 deletions
diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala index e4152f3a61..b54f53b203 100644 --- a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala +++ b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala @@ -4,6 +4,7 @@ import spark.streaming.dstream.{NetworkInputDStream, NetworkReceiver} import spark.streaming.dstream.{StopReceiver, ReportBlock, ReportError} import spark.Logging import spark.SparkEnv +import spark.SparkContext._ import scala.collection.mutable.HashMap import scala.collection.mutable.Queue @@ -138,8 +139,12 @@ class NetworkInputTracker( } iterator.next().start() } + // Run the dummy Spark job to ensure that all slaves have registered. + // This avoids all the receivers to be scheduled on the same node. + //ssc.sparkContext.makeRDD(1 to 100, 100).map(x => (x, 1)).reduceByKey(_ + _, 20).collect() + // Distribute the receivers and start them - ssc.sc.runJob(tempRDD, startReceiver) + ssc.sparkContext.runJob(tempRDD, startReceiver) } /** Stops the receivers. */ diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index c442210004..0eb9c7b81e 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -95,7 +95,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] val input = Seq(1, 2, 3, 4, 5) - + Thread.sleep(1000) val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", 33333)); val client = SpecificRequestor.getClient( classOf[AvroSourceProtocol], transceiver); |