diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2014-04-28 13:58:09 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-04-28 13:58:09 -0700 |
commit | 1d84964bf80f4e69e54d62286c3861c2362342d0 (patch) | |
tree | 416f1576d794f1e02162fd94c784f3e9c014fa60 /streaming/src/test | |
parent | f735884414a15c0c07df60068ee11f9da47eff77 (diff) | |
download | spark-1d84964bf80f4e69e54d62286c3861c2362342d0.tar.gz spark-1d84964bf80f4e69e54d62286c3861c2362342d0.tar.bz2 spark-1d84964bf80f4e69e54d62286c3861c2362342d0.zip |
[SPARK-1633][Streaming] Java API unit test and example for custom streaming receiver in Java
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #558 from tdas/more-fixes and squashes the following commits:
c0c84e6 [Tathagata Das] Removing extra println()
d8a8cf4 [Tathagata Das] More tweaks to make unit test work in Jenkins.
b7caa98 [Tathagata Das] More tweaks.
d337367 [Tathagata Das] More tweaks
22d6f2d [Tathagata Das] Merge remote-tracking branch 'apache/master' into more-fixes
40a961b [Tathagata Das] Modified java test to reduce flakiness.
9410ca6 [Tathagata Das] Merge remote-tracking branch 'apache/master' into more-fixes
86d9147 [Tathagata Das] scala style fix
2f3d7b1 [Tathagata Das] Added Scala custom receiver example.
d677611 [Tathagata Das] Merge remote-tracking branch 'apache/master' into more-fixes
bec3fc2 [Tathagata Das] Added license.
51d6514 [Tathagata Das] Fixed docs on receiver.
81aafa0 [Tathagata Das] Added Java test for Receiver API, and added JavaCustomReceiver example.
Diffstat (limited to 'streaming/src/test')
5 files changed, 150 insertions, 4 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java new file mode 100644 index 0000000000..1b0787fe69 --- /dev/null +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import static org.junit.Assert.*; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.receiver.Receiver; +import org.apache.spark.api.java.function.Function; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.io.Serializable; +import java.net.ConnectException; +import java.net.Socket; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +public class JavaReceiverAPISuite implements Serializable { + + @Before + public void setUp() { + System.clearProperty("spark.streaming.clock"); + } + + @After + public void tearDown() { + System.clearProperty("spark.streaming.clock"); + } + + @Test + public void testReceiver() throws InterruptedException { + TestServer server = new TestServer(0); + server.start(); + + final AtomicLong dataCounter = new AtomicLong(0); + + try { + JavaStreamingContext ssc = new JavaStreamingContext("local[2]", "test", new Duration(200)); + JavaReceiverInputDStream<String> input = + ssc.receiverStream(new JavaSocketReceiver("localhost", server.port())); + JavaDStream<String> mapped = input.map(new Function<String, String>() { + @Override + public String call(String v1) throws Exception { + return v1 + "."; + } + }); + mapped.foreachRDD(new Function<JavaRDD<String>, Void>() { + @Override + public Void call(JavaRDD<String> rdd) throws Exception { + long count = rdd.count(); + dataCounter.addAndGet(count); + return null; + } + }); + + ssc.start(); + long startTime = System.currentTimeMillis(); + long timeout = 10000; + + Thread.sleep(200); + for (int i = 0; i < 6; i++) { + server.send("" + i + "\n"); // \n to make sure these are separate lines + Thread.sleep(100); + } + while (dataCounter.get() == 0 && System.currentTimeMillis() - startTime < timeout) { + Thread.sleep(100); + } + ssc.stop(); + assertTrue(dataCounter.get() > 0); + } finally { + server.stop(); + } + } +} + +class JavaSocketReceiver extends Receiver<String> { + + String host = null; + int port = -1; + + public JavaSocketReceiver(String host_ , int port_) { + super(StorageLevel.MEMORY_AND_DISK()); + host = host_; + port = port_; + } + + @Override + public void onStart() { + new Thread() { + @Override public void run() { + receive(); + } + }.start(); + } + + @Override + public void onStop() { + } + + private void receive() { + Socket socket = null; + try { + socket = new Socket(host, port); + BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); + String userInput; + while ((userInput = in.readLine()) != null) { + store(userInput); + } + in.close(); + socket.close(); + } catch(ConnectException ce) { + ce.printStackTrace(); + restart("Could not connect", ce); + } catch(Throwable t) { + t.printStackTrace(); + restart("Error receiving data", t); + } + } +} + diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala index 33f6df8f88..c0ea0491c3 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala @@ -26,6 +26,7 @@ import org.apache.spark.streaming._ import java.util.ArrayList import collection.JavaConversions._ import org.apache.spark.api.java.JavaRDDLike +import org.apache.spark.streaming.dstream.DStream /** Exposes streaming test functionality in a Java-friendly way. */ trait JavaTestBase extends TestSuiteBase { @@ -51,8 +52,7 @@ trait JavaTestBase extends TestSuiteBase { * [[org.apache.spark.streaming.TestOutputStream]]. **/ def attachTestOutputStream[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]]( - dstream: JavaDStreamLike[T, This, R]) = - { + dstream: JavaDStreamLike[T, This, R]) = { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val ostream = new TestOutputStreamWithPartitions(dstream.dstream) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index b55b7834c9..3fa254065c 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -49,7 +49,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Set up the streaming context and input streams val ssc = new StreamingContext(conf, batchDuration) - val networkStream = ssc.socketTextStream("localhost", testServer.port, StorageLevel.MEMORY_AND_DISK) + val networkStream = ssc.socketTextStream( + "localhost", testServer.port, StorageLevel.MEMORY_AND_DISK) val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] val outputStream = new TestOutputStream(networkStream, outputBuffer) def output = outputBuffer.flatMap(x => x) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 3e2b25af84..ee0bc8b7d6 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -165,7 +165,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w ssc = new StreamingContext(sc, Milliseconds(100)) var runningCount = 0 TestReceiver.counter.set(1) - val input = ssc.networkStream(new TestReceiver) + val input = ssc.receiverStream(new TestReceiver) input.count.foreachRDD(rdd => { val count = rdd.first() runningCount += count.toInt diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 4f63fd3782..8036f77c97 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -155,6 +155,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { def afterFunction() { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") + System.clearProperty("spark.streaming.clock") } before(beforeFunction) |