aboutsummaryrefslogtreecommitdiff
path: root/external/zeromq/src/test/scala
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-01-20 13:55:41 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2016-01-20 13:55:41 -0800
commitb7d74a602f622d8e105b349bd6d17ba42e7668dc (patch)
tree118deb532942513693e60f851dde638d7fa818cd /external/zeromq/src/test/scala
parent944fdadf77523570f6b33544ad0b388031498952 (diff)
downloadspark-b7d74a602f622d8e105b349bd6d17ba42e7668dc.tar.gz
spark-b7d74a602f622d8e105b349bd6d17ba42e7668dc.tar.bz2
spark-b7d74a602f622d8e105b349bd6d17ba42e7668dc.zip
[SPARK-7799][SPARK-12786][STREAMING] Add "streaming-akka" project
Include the following changes: 1. Add "streaming-akka" project and org.apache.spark.streaming.akka.AkkaUtils for creating an actorStream 2. Remove "StreamingContext.actorStream" and "JavaStreamingContext.actorStream" 3. Update the ActorWordCount example and add the JavaActorWordCount example 4. Make "streaming-zeromq" depend on "streaming-akka" and update the codes accordingly Author: Shixiong Zhu <shixiong@databricks.com> Closes #10744 from zsxwing/streaming-akka-2.
Diffstat (limited to 'external/zeromq/src/test/scala')
-rw-r--r--external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala16
1 files changed, 12 insertions, 4 deletions
diff --git a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
index 35d2e62c68..bac2679cab 100644
--- a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
+++ b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
@@ -42,14 +42,22 @@ class ZeroMQStreamSuite extends SparkFunSuite {
// tests the API, does not actually test data receiving
val test1: ReceiverInputDStream[String] =
- ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects)
+ ZeroMQUtils.createStream(
+ ssc, publishUrl, subscribe, bytesToObjects, actorSystemCreator = () => null)
val test2: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
- ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2)
+ ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2, () => null)
val test3: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
ssc, publishUrl, subscribe, bytesToObjects,
- StorageLevel.MEMORY_AND_DISK_SER_2, SupervisorStrategy.defaultStrategy)
+ StorageLevel.MEMORY_AND_DISK_SER_2, () => null, SupervisorStrategy.defaultStrategy)
+ val test4: ReceiverInputDStream[String] =
+ ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects)
+ val test5: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
+ ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test6: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
+ ssc, publishUrl, subscribe, bytesToObjects,
+ StorageLevel.MEMORY_AND_DISK_SER_2, supervisorStrategy = SupervisorStrategy.defaultStrategy)
- // TODO: Actually test data receiving
+ // TODO: Actually test data receiving. A real test needs the native ZeroMQ library
ssc.stop()
}
}