aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/java/org/apache
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-04-28 13:58:09 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2014-04-28 13:58:09 -0700
commit1d84964bf80f4e69e54d62286c3861c2362342d0 (patch)
tree416f1576d794f1e02162fd94c784f3e9c014fa60 /streaming/src/test/java/org/apache
parentf735884414a15c0c07df60068ee11f9da47eff77 (diff)
downloadspark-1d84964bf80f4e69e54d62286c3861c2362342d0.tar.gz
spark-1d84964bf80f4e69e54d62286c3861c2362342d0.tar.bz2
spark-1d84964bf80f4e69e54d62286c3861c2362342d0.zip
[SPARK-1633][Streaming] Java API unit test and example for custom streaming receiver in Java
Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #558 from tdas/more-fixes and squashes the following commits: c0c84e6 [Tathagata Das] Removing extra println() d8a8cf4 [Tathagata Das] More tweaks to make unit test work in Jenkins. b7caa98 [Tathagata Das] More tweaks. d337367 [Tathagata Das] More tweaks 22d6f2d [Tathagata Das] Merge remote-tracking branch 'apache/master' into more-fixes 40a961b [Tathagata Das] Modified java test to reduce flakiness. 9410ca6 [Tathagata Das] Merge remote-tracking branch 'apache/master' into more-fixes 86d9147 [Tathagata Das] scala style fix 2f3d7b1 [Tathagata Das] Added Scala custom receiver example. d677611 [Tathagata Das] Merge remote-tracking branch 'apache/master' into more-fixes bec3fc2 [Tathagata Das] Added license. 51d6514 [Tathagata Das] Fixed docs on receiver. 81aafa0 [Tathagata Das] Added Java test for Receiver API, and added JavaCustomReceiver example.
Diffstat (limited to 'streaming/src/test/java/org/apache')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java144
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala4
2 files changed, 146 insertions, 2 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
new file mode 100644
index 0000000000..1b0787fe69
--- /dev/null
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import static org.junit.Assert.*;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.receiver.Receiver;
+import org.apache.spark.api.java.function.Function;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.net.ConnectException;
+import java.net.Socket;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class JavaReceiverAPISuite implements Serializable {
+
+ @Before
+ public void setUp() {
+ System.clearProperty("spark.streaming.clock");
+ }
+
+ @After
+ public void tearDown() {
+ System.clearProperty("spark.streaming.clock");
+ }
+
+ @Test
+ public void testReceiver() throws InterruptedException {
+ TestServer server = new TestServer(0);
+ server.start();
+
+ final AtomicLong dataCounter = new AtomicLong(0);
+
+ try {
+ JavaStreamingContext ssc = new JavaStreamingContext("local[2]", "test", new Duration(200));
+ JavaReceiverInputDStream<String> input =
+ ssc.receiverStream(new JavaSocketReceiver("localhost", server.port()));
+ JavaDStream<String> mapped = input.map(new Function<String, String>() {
+ @Override
+ public String call(String v1) throws Exception {
+ return v1 + ".";
+ }
+ });
+ mapped.foreachRDD(new Function<JavaRDD<String>, Void>() {
+ @Override
+ public Void call(JavaRDD<String> rdd) throws Exception {
+ long count = rdd.count();
+ dataCounter.addAndGet(count);
+ return null;
+ }
+ });
+
+ ssc.start();
+ long startTime = System.currentTimeMillis();
+ long timeout = 10000;
+
+ Thread.sleep(200);
+ for (int i = 0; i < 6; i++) {
+ server.send("" + i + "\n"); // \n to make sure these are separate lines
+ Thread.sleep(100);
+ }
+ while (dataCounter.get() == 0 && System.currentTimeMillis() - startTime < timeout) {
+ Thread.sleep(100);
+ }
+ ssc.stop();
+ assertTrue(dataCounter.get() > 0);
+ } finally {
+ server.stop();
+ }
+ }
+}
+
+class JavaSocketReceiver extends Receiver<String> {
+
+ String host = null;
+ int port = -1;
+
+ public JavaSocketReceiver(String host_ , int port_) {
+ super(StorageLevel.MEMORY_AND_DISK());
+ host = host_;
+ port = port_;
+ }
+
+ @Override
+ public void onStart() {
+ new Thread() {
+ @Override public void run() {
+ receive();
+ }
+ }.start();
+ }
+
+ @Override
+ public void onStop() {
+ }
+
+ private void receive() {
+ Socket socket = null;
+ try {
+ socket = new Socket(host, port);
+ BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+ String userInput;
+ while ((userInput = in.readLine()) != null) {
+ store(userInput);
+ }
+ in.close();
+ socket.close();
+ } catch(ConnectException ce) {
+ ce.printStackTrace();
+ restart("Could not connect", ce);
+ } catch(Throwable t) {
+ t.printStackTrace();
+ restart("Error receiving data", t);
+ }
+ }
+}
+
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
index 33f6df8f88..c0ea0491c3 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
@@ -26,6 +26,7 @@ import org.apache.spark.streaming._
import java.util.ArrayList
import collection.JavaConversions._
import org.apache.spark.api.java.JavaRDDLike
+import org.apache.spark.streaming.dstream.DStream
/** Exposes streaming test functionality in a Java-friendly way. */
trait JavaTestBase extends TestSuiteBase {
@@ -51,8 +52,7 @@ trait JavaTestBase extends TestSuiteBase {
* [[org.apache.spark.streaming.TestOutputStream]].
**/
def attachTestOutputStream[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]](
- dstream: JavaDStreamLike[T, This, R]) =
- {
+ dstream: JavaDStreamLike[T, This, R]) = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val ostream = new TestOutputStreamWithPartitions(dstream.dstream)