diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-01-20 13:55:41 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2016-01-20 13:55:41 -0800 |
commit | b7d74a602f622d8e105b349bd6d17ba42e7668dc (patch) | |
tree | 118deb532942513693e60f851dde638d7fa818cd /external/zeromq/src/test/scala | |
parent | 944fdadf77523570f6b33544ad0b388031498952 (diff) | |
download | spark-b7d74a602f622d8e105b349bd6d17ba42e7668dc.tar.gz spark-b7d74a602f622d8e105b349bd6d17ba42e7668dc.tar.bz2 spark-b7d74a602f622d8e105b349bd6d17ba42e7668dc.zip |
[SPARK-7799][SPARK-12786][STREAMING] Add "streaming-akka" project
Include the following changes:
1. Add "streaming-akka" project and org.apache.spark.streaming.akka.AkkaUtils for creating an actorStream
2. Remove "StreamingContext.actorStream" and "JavaStreamingContext.actorStream"
3. Update the ActorWordCount example and add the JavaActorWordCount example
4. Make "streaming-zeromq" depend on "streaming-akka" and update the codes accordingly
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #10744 from zsxwing/streaming-akka-2.
Diffstat (limited to 'external/zeromq/src/test/scala')
-rw-r--r-- | external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala | 16 |
1 files changed, 12 insertions, 4 deletions
diff --git a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala index 35d2e62c68..bac2679cab 100644 --- a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala +++ b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala @@ -42,14 +42,22 @@ class ZeroMQStreamSuite extends SparkFunSuite { // tests the API, does not actually test data receiving val test1: ReceiverInputDStream[String] = - ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects) + ZeroMQUtils.createStream( + ssc, publishUrl, subscribe, bytesToObjects, actorSystemCreator = () => null) val test2: ReceiverInputDStream[String] = ZeroMQUtils.createStream( - ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2) + ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2, () => null) val test3: ReceiverInputDStream[String] = ZeroMQUtils.createStream( ssc, publishUrl, subscribe, bytesToObjects, - StorageLevel.MEMORY_AND_DISK_SER_2, SupervisorStrategy.defaultStrategy) + StorageLevel.MEMORY_AND_DISK_SER_2, () => null, SupervisorStrategy.defaultStrategy) + val test4: ReceiverInputDStream[String] = + ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects) + val test5: ReceiverInputDStream[String] = ZeroMQUtils.createStream( + ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2) + val test6: ReceiverInputDStream[String] = ZeroMQUtils.createStream( + ssc, publishUrl, subscribe, bytesToObjects, + StorageLevel.MEMORY_AND_DISK_SER_2, supervisorStrategy = SupervisorStrategy.defaultStrategy) - // TODO: Actually test data receiving + // TODO: Actually test data receiving. A real test needs the native ZeroMQ library ssc.stop() } } |