aboutsummaryrefslogtreecommitdiff
path: root/external/mqtt/src/test
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-07 01:56:15 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-07 01:56:15 -0800
commitaa99f226a691ddcb4442d60f4cd4908f434cc4ce (patch)
tree33a1614e3d5ee7a050776e3601ba8c7430b573f8 /external/mqtt/src/test
parent3b4c4c7f4d0d6e45a1acb0baf0d9416a8997b686 (diff)
downloadspark-aa99f226a691ddcb4442d60f4cd4908f434cc4ce.tar.gz
spark-aa99f226a691ddcb4442d60f4cd4908f434cc4ce.tar.bz2
spark-aa99f226a691ddcb4442d60f4cd4908f434cc4ce.zip
Removed XYZFunctions and added XYZUtils as a common Scala and Java interface for creating XYZ streams.
Diffstat (limited to 'external/mqtt/src/test')
-rw-r--r--external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java6
-rw-r--r--external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala4
2 files changed, 4 insertions, 6 deletions
diff --git a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
index 3ddb4d084f..44743aaecf 100644
--- a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
+++ b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
@@ -19,7 +19,6 @@ package org.apache.spark.streaming.mqtt;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.mqtt.MQTTFunctions;
import org.junit.Test;
import org.apache.spark.streaming.LocalJavaStreamingContext;
@@ -29,11 +28,10 @@ public class JavaMQTTStreamSuite extends LocalJavaStreamingContext {
public void testMQTTStream() {
String brokerUrl = "abc";
String topic = "def";
- MQTTFunctions mqttFunc = new MQTTFunctions(ssc);
// tests the API, does not actually test data receiving
- JavaDStream<String> test1 = mqttFunc.mqttStream(brokerUrl, topic);
- JavaDStream<String> test2 = mqttFunc.mqttStream(brokerUrl, topic,
+ JavaDStream<String> test1 = MQTTUtils.createStream(ssc, brokerUrl, topic);
+ JavaDStream<String> test2 = MQTTUtils.createStream(ssc, brokerUrl, topic,
StorageLevel.MEMORY_AND_DISK_SER_2());
}
}
diff --git a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
index ab6542918b..fcc159e85a 100644
--- a/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
+++ b/external/mqtt/src/test/scala/org/apache/spark/streaming/mqtt/MQTTStreamSuite.scala
@@ -28,8 +28,8 @@ class MQTTStreamSuite extends TestSuiteBase {
val topic = "def"
// tests the API, does not actually test data receiving
- val test1 = ssc.mqttStream(brokerUrl, topic)
- val test2 = ssc.mqttStream(brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test1 = MQTTUtils.createStream(ssc, brokerUrl, topic)
+ val test2 = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_AND_DISK_SER_2)
// TODO: Actually test receiving data
}