aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java9
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala19
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala249
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala42
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala84
5 files changed, 359 insertions, 44 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index a0b1bbc34f..f9bfb9b744 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -17,6 +17,7 @@
package org.apache.spark.streaming;
+import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
import org.junit.Assert;
@@ -36,10 +37,6 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaDStreamLike;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
// The test suite itself is Serializable so that anonymous Function implementations can be
// serialized, as an alternative to converting these anonymous classes to static inner classes;
@@ -1668,7 +1665,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
// InputStream functionality is deferred to the existing Scala tests.
@Test
public void testSocketTextStream() {
- JavaDStream<String> test = ssc.socketTextStream("localhost", 12345);
+ JavaReceiverInputDStream<String> test = ssc.socketTextStream("localhost", 12345);
}
@Test
@@ -1701,6 +1698,6 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
@Test
public void testRawSocketStream() {
- JavaDStream<String> test = ssc.rawSocketStream("localhost", 12345);
+ JavaReceiverInputDStream<String> test = ssc.rawSocketStream("localhost", 12345);
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 952511d411..46b7f63b65 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -36,10 +36,9 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.dstream.NetworkReceiver
-import org.apache.spark.streaming.receivers.Receiver
import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.util.Utils
+import org.apache.spark.streaming.receiver.{ActorHelper, Receiver}
class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
@@ -207,7 +206,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// set up the network stream using the test receiver
val ssc = new StreamingContext(conf, batchDuration)
- val networkStream = ssc.networkStream[Int](testReceiver)
+ val networkStream = ssc.receiverStream[Int](testReceiver)
val countStream = networkStream.count
val outputBuffer = new ArrayBuffer[Seq[Long]] with SynchronizedBuffer[Seq[Long]]
val outputStream = new TestOutputStream(countStream, outputBuffer)
@@ -301,7 +300,7 @@ object TestServer {
}
/** This is an actor for testing actor input stream */
-class TestActor(port: Int) extends Actor with Receiver {
+class TestActor(port: Int) extends Actor with ActorHelper {
def bytesToString(byteString: ByteString) = byteString.utf8String
@@ -309,24 +308,22 @@ class TestActor(port: Int) extends Actor with Receiver {
def receive = {
case IO.Read(socket, bytes) =>
- pushBlock(bytesToString(bytes))
+ store(bytesToString(bytes))
}
}
/** This is a receiver to test multiple threads inserting data using block generator */
class MultiThreadTestReceiver(numThreads: Int, numRecordsPerThread: Int)
- extends NetworkReceiver[Int] {
+ extends Receiver[Int](StorageLevel.MEMORY_ONLY_SER) with Logging {
lazy val executorPool = Executors.newFixedThreadPool(numThreads)
- lazy val blockGenerator = new BlockGenerator(StorageLevel.MEMORY_ONLY)
lazy val finishCount = new AtomicInteger(0)
- protected def onStart() {
- blockGenerator.start()
+ def onStart() {
(1 to numThreads).map(threadId => {
val runnable = new Runnable {
def run() {
(1 to numRecordsPerThread).foreach(i =>
- blockGenerator += (threadId * numRecordsPerThread + i) )
+ store(threadId * numRecordsPerThread + i) )
if (finishCount.incrementAndGet == numThreads) {
MultiThreadTestReceiver.haveAllThreadsFinished = true
}
@@ -337,7 +334,7 @@ class MultiThreadTestReceiver(numThreads: Int, numRecordsPerThread: Int)
})
}
- protected def onStop() {
+ def onStop() {
executorPool.shutdown()
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
new file mode 100644
index 0000000000..5c0415ad14
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.SparkConf
+import org.apache.spark.storage.{StorageLevel, StreamBlockId}
+import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver, ReceiverSupervisor}
+import org.scalatest.FunSuite
+import org.scalatest.concurrent.Timeouts
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
+
+/** Testsuite for testing the network receiver behavior */
+class NetworkReceiverSuite extends FunSuite with Timeouts {
+
+ test("network receiver life cycle") {
+
+ val receiver = new FakeReceiver
+ val executor = new FakeReceiverSupervisor(receiver)
+
+ assert(executor.isAllEmpty)
+
+ // Thread that runs the executor
+ val executingThread = new Thread() {
+ override def run() {
+ executor.start()
+ executor.awaitTermination()
+ }
+ }
+
+ // Start the receiver
+ executingThread.start()
+
+ // Verify that the receiver
+ intercept[Exception] {
+ failAfter(200 millis) {
+ executingThread.join()
+ }
+ }
+
+ // Verify that receiver was started
+ assert(receiver.onStartCalled)
+ assert(executor.isReceiverStarted)
+ assert(receiver.isStarted)
+ assert(!receiver.isStopped())
+ assert(receiver.otherThread.isAlive)
+ eventually(timeout(100 millis), interval(10 millis)) {
+ assert(receiver.receiving)
+ }
+
+ // Verify whether the data stored by the receiver was sent to the executor
+ val byteBuffer = ByteBuffer.allocate(100)
+ val arrayBuffer = new ArrayBuffer[Int]()
+ val iterator = arrayBuffer.iterator
+ receiver.store(1)
+ receiver.store(byteBuffer)
+ receiver.store(arrayBuffer)
+ receiver.store(iterator)
+ assert(executor.singles.size === 1)
+ assert(executor.singles.head === 1)
+ assert(executor.byteBuffers.size === 1)
+ assert(executor.byteBuffers.head.eq(byteBuffer))
+ assert(executor.iterators.size === 1)
+ assert(executor.iterators.head.eq(iterator))
+ assert(executor.arrayBuffers.size === 1)
+ assert(executor.arrayBuffers.head.eq(arrayBuffer))
+
+ // Verify whether the exceptions reported by the receiver was sent to the executor
+ val exception = new Exception
+ receiver.reportError("Error", exception)
+ assert(executor.errors.size === 1)
+ assert(executor.errors.head.eq(exception))
+
+ // Verify restarting actually stops and starts the receiver
+ receiver.restart("restarting", null, 100)
+ assert(receiver.isStopped)
+ assert(receiver.onStopCalled)
+ eventually(timeout(1000 millis), interval(100 millis)) {
+ assert(receiver.onStartCalled)
+ assert(executor.isReceiverStarted)
+ assert(receiver.isStarted)
+ assert(!receiver.isStopped)
+ assert(receiver.receiving)
+ }
+
+ // Verify that stopping actually stops the thread
+ failAfter(100 millis) {
+ receiver.stop("test")
+ assert(receiver.isStopped)
+ assert(!receiver.otherThread.isAlive)
+
+ // The thread that started the executor should complete
+ // as stop() stops everything
+ executingThread.join()
+ }
+ }
+
+ test("block generator") {
+ val blockGeneratorListener = new FakeBlockGeneratorListener
+ val blockInterval = 200
+ val conf = new SparkConf().set("spark.streaming.blockInterval", blockInterval.toString)
+ val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf)
+ val expectedBlocks = 5
+ val waitTime = expectedBlocks * blockInterval + (blockInterval / 2)
+ val generatedData = new ArrayBuffer[Int]
+
+ // Generate blocks
+ val startTime = System.currentTimeMillis()
+ blockGenerator.start()
+ var count = 0
+ while(System.currentTimeMillis - startTime < waitTime) {
+ blockGenerator += count
+ generatedData += count
+ count += 1
+ Thread.sleep(10)
+ }
+ blockGenerator.stop()
+
+ val recordedData = blockGeneratorListener.arrayBuffers.flatten
+ assert(blockGeneratorListener.arrayBuffers.size > 0)
+ assert(recordedData.toSet === generatedData.toSet)
+ }
+
+ /**
+ * An implementation of NetworkReceiver that is used for testing a receiver's life cycle.
+ */
+ class FakeReceiver extends Receiver[Int](StorageLevel.MEMORY_ONLY) {
+ var otherThread: Thread = null
+ var receiving = false
+ var onStartCalled = false
+ var onStopCalled = false
+
+ def onStart() {
+ otherThread = new Thread() {
+ override def run() {
+ receiving = true
+ while(!isStopped()) {
+ Thread.sleep(10)
+ }
+ }
+ }
+ onStartCalled = true
+ otherThread.start()
+
+ }
+
+ def onStop() {
+ onStopCalled = true
+ otherThread.join()
+ }
+
+ def reset() {
+ receiving = false
+ onStartCalled = false
+ onStopCalled = false
+ }
+ }
+
+ /**
+ * An implementation of NetworkReceiverExecutor used for testing a NetworkReceiver.
+ * Instead of storing the data in the BlockManager, it stores all the data in a local buffer
+ * that can used for verifying that the data has been forwarded correctly.
+ */
+ class FakeReceiverSupervisor(receiver: FakeReceiver)
+ extends ReceiverSupervisor(receiver, new SparkConf()) {
+ val singles = new ArrayBuffer[Any]
+ val byteBuffers = new ArrayBuffer[ByteBuffer]
+ val iterators = new ArrayBuffer[Iterator[_]]
+ val arrayBuffers = new ArrayBuffer[ArrayBuffer[_]]
+ val errors = new ArrayBuffer[Throwable]
+
+ /** Check if all data structures are clean */
+ def isAllEmpty = {
+ singles.isEmpty && byteBuffers.isEmpty && iterators.isEmpty &&
+ arrayBuffers.isEmpty && errors.isEmpty
+ }
+
+ def pushSingle(data: Any) {
+ singles += data
+ }
+
+ def pushBytes(
+ bytes: ByteBuffer,
+ optionalMetadata: Option[Any],
+ optionalBlockId: Option[StreamBlockId]
+ ) {
+ byteBuffers += bytes
+ }
+
+ def pushIterator(
+ iterator: Iterator[_],
+ optionalMetadata: Option[Any],
+ optionalBlockId: Option[StreamBlockId]
+ ) {
+ iterators += iterator
+ }
+
+ def pushArrayBuffer(
+ arrayBuffer: ArrayBuffer[_],
+ optionalMetadata: Option[Any],
+ optionalBlockId: Option[StreamBlockId]
+ ) {
+ arrayBuffers += arrayBuffer
+ }
+
+ def reportError(message: String, throwable: Throwable) {
+ errors += throwable
+ }
+ }
+
+ /**
+ * An implementation of BlockGeneratorListener that is used to test the BlockGenerator.
+ */
+ class FakeBlockGeneratorListener(pushDelay: Long = 0) extends BlockGeneratorListener {
+ // buffer of data received as ArrayBuffers
+ val arrayBuffers = new ArrayBuffer[ArrayBuffer[Int]]
+ val errors = new ArrayBuffer[Throwable]
+
+ def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {
+ val bufferOfInts = arrayBuffer.map(_.asInstanceOf[Int])
+ arrayBuffers += bufferOfInts
+ Thread.sleep(0)
+ }
+
+ def onError(message: String, throwable: Throwable) {
+ errors += throwable
+ }
+ }
+}
+
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index ad5367ab94..6d14b1f785 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -21,7 +21,8 @@ import java.util.concurrent.atomic.AtomicInteger
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException}
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.dstream.{DStream, NetworkReceiver}
+import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.util.{MetadataCleaner, Utils}
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.concurrent.Timeouts
@@ -181,15 +182,15 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
conf.set("spark.cleaner.ttl", "3600")
sc = new SparkContext(conf)
for (i <- 1 to 4) {
- logInfo("==================================")
- ssc = new StreamingContext(sc, batchDuration)
+ logInfo("==================================\n\n\n")
+ ssc = new StreamingContext(sc, Milliseconds(100))
var runningCount = 0
TestReceiver.counter.set(1)
val input = ssc.networkStream(new TestReceiver)
input.count.foreachRDD(rdd => {
val count = rdd.first()
- logInfo("Count = " + count)
runningCount += count.toInt
+ logInfo("Count = " + count + ", Running count = " + runningCount)
})
ssc.start()
ssc.awaitTermination(500)
@@ -216,12 +217,12 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
ssc.start()
}
- // test whether waitForStop() exits after give amount of time
+ // test whether awaitTermination() exits after give amount of time
failAfter(1000 millis) {
ssc.awaitTermination(500)
}
- // test whether waitForStop() does not exit if not time is given
+ // test whether awaitTermination() does not exit if not time is given
val exception = intercept[Exception] {
failAfter(1000 millis) {
ssc.awaitTermination()
@@ -276,23 +277,26 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
class TestException(msg: String) extends Exception(msg)
/** Custom receiver for testing whether all data received by a receiver gets processed or not */
-class TestReceiver extends NetworkReceiver[Int] {
- protected lazy val blockGenerator = new BlockGenerator(StorageLevel.MEMORY_ONLY)
- protected def onStart() {
- blockGenerator.start()
- logInfo("BlockGenerator started on thread " + receivingThread)
- try {
- while(true) {
- blockGenerator += TestReceiver.counter.getAndIncrement
- Thread.sleep(0)
+class TestReceiver extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging {
+
+ var receivingThreadOption: Option[Thread] = None
+
+ def onStart() {
+ val thread = new Thread() {
+ override def run() {
+ logInfo("Receiving started")
+ while (!isStopped) {
+ store(TestReceiver.counter.getAndIncrement)
+ }
+ logInfo("Receiving stopped at count value of " + TestReceiver.counter.get())
}
- } finally {
- logInfo("Receiving stopped at count value of " + TestReceiver.counter.get())
}
+ receivingThreadOption = Some(thread)
+ thread.start()
}
- protected def onStop() {
- blockGenerator.stop()
+ def onStop() {
+ // no cleanup to be done, the receiving thread should stop on it own
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index 9e0f2c900e..542c697ae3 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -17,10 +17,19 @@
package org.apache.spark.streaming
-import org.apache.spark.streaming.scheduler._
import scala.collection.mutable.ArrayBuffer
-import org.scalatest.matchers.ShouldMatchers
+import scala.concurrent.Future
+import scala.concurrent.ExecutionContext.Implicits.global
+
+import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.streaming.scheduler._
+
+import org.scalatest.matchers.ShouldMatchers
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
+import org.apache.spark.Logging
class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers {
@@ -32,7 +41,7 @@ class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers {
override def batchDuration = Milliseconds(100)
override def actuallyWait = true
- test("basic BatchInfo generation") {
+ test("batch info reporting") {
val ssc = setupStreams(input, operation)
val collector = new BatchInfoCollector
ssc.addStreamingListener(collector)
@@ -54,6 +63,31 @@ class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers {
isInIncreasingOrder(batchInfos.map(_.processingEndTime.get)) should be (true)
}
+ test("receiver info reporting") {
+ val ssc = new StreamingContext("local[2]", "test", Milliseconds(1000))
+ val inputStream = ssc.networkStream(new StreamingListenerSuiteReceiver)
+ inputStream.foreachRDD(_.count)
+
+ val collector = new ReceiverInfoCollector
+ ssc.addStreamingListener(collector)
+
+ ssc.start()
+ try {
+ eventually(timeout(1000 millis), interval(20 millis)) {
+ collector.startedReceiverInfo should have size 1
+ collector.startedReceiverInfo(0).streamId should equal (0)
+ collector.stoppedReceiverStreamIds should have size 1
+ collector.stoppedReceiverStreamIds(0) should equal (0)
+ collector.receiverErrors should have size 1
+ collector.receiverErrors(0)._1 should equal (0)
+ collector.receiverErrors(0)._2 should include ("report error")
+ collector.receiverErrors(0)._3 should include ("report exception")
+ }
+ } finally {
+ ssc.stop()
+ }
+ }
+
/** Check if a sequence of numbers is in increasing order */
def isInIncreasingOrder(seq: Seq[Long]): Boolean = {
for(i <- 1 until seq.size) {
@@ -61,12 +95,46 @@ class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers {
}
true
}
+}
+
+/** Listener that collects information on processed batches */
+class BatchInfoCollector extends StreamingListener {
+ val batchInfos = new ArrayBuffer[BatchInfo]
+ override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
+ batchInfos += batchCompleted.batchInfo
+ }
+}
+
+/** Listener that collects information on processed batches */
+class ReceiverInfoCollector extends StreamingListener {
+ val startedReceiverInfo = new ArrayBuffer[ReceiverInfo]
+ val stoppedReceiverStreamIds = new ArrayBuffer[Int]()
+ val receiverErrors = new ArrayBuffer[(Int, String, String)]()
+
+ override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) {
+ startedReceiverInfo += receiverStarted.receiverInfo
+ }
+
+ override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) {
+ stoppedReceiverStreamIds += receiverStopped.streamId
+ }
+
+ override def onReceiverError(receiverError: StreamingListenerReceiverError) {
+ receiverErrors += ((receiverError.streamId, receiverError.message, receiverError.error))
+ }
+}
- /** Listener that collects information on processed batches */
- class BatchInfoCollector extends StreamingListener {
- val batchInfos = new ArrayBuffer[BatchInfo]
- override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
- batchInfos += batchCompleted.batchInfo
+class StreamingListenerSuiteReceiver extends Receiver[Any](StorageLevel.MEMORY_ONLY) with Logging {
+ def onStart() {
+ Future {
+ logInfo("Started receiver and sleeping")
+ Thread.sleep(10)
+ logInfo("Reporting error and sleeping")
+ reportError("test report error", new Exception("test report exception"))
+ Thread.sleep(10)
+ logInfo("Stopping")
+ stop("test stop error")
}
}
+ def onStop() { }
}