aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-04-21 19:04:49 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-04-21 19:04:49 -0700
commit04c37b6f749dc2418cc28c89964cdc687dfcbd51 (patch)
treeba434fee57cba6fe201e83ad9c049fded5e09bc0 /streaming/src/test
parent5a5b3346c79abb659260284fed0ace51942f3193 (diff)
downloadspark-04c37b6f749dc2418cc28c89964cdc687dfcbd51.tar.gz
spark-04c37b6f749dc2418cc28c89964cdc687dfcbd51.tar.bz2
spark-04c37b6f749dc2418cc28c89964cdc687dfcbd51.zip
[SPARK-1332] Improve Spark Streaming's Network Receiver and InputDStream API [WIP]
The current Network Receiver API makes it slightly complicated to right a new receiver as one needs to create an instance of BlockGenerator as shown in SocketReceiver https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala#L51 Exposing the BlockGenerator interface has made it harder to improve the receiving process. The API of NetworkReceiver (which was not a very stable API anyways) needs to be change if we are to ensure future stability. Additionally, the functions like streamingContext.socketStream that create input streams, return DStream objects. That makes it hard to expose functionality (say, rate limits) unique to input dstreams. They should return InputDStream or NetworkInputDStream. This is still not yet implemented. This PR is blocked on the graceful shutdown PR #247 Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #300 from tdas/network-receiver-api and squashes the following commits: ea27b38 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into network-receiver-api 3a4777c [Tathagata Das] Renamed NetworkInputDStream to ReceiverInputDStream, and ActorReceiver related stuff. 838dd39 [Tathagata Das] Added more events to the StreamingListener to report errors and stopped receivers. a75c7a6 [Tathagata Das] Address some PR comments and fixed other issues. 91bfa72 [Tathagata Das] Fixed bugs. 8533094 [Tathagata Das] Scala style fixes. 028bde6 [Tathagata Das] Further refactored receiver to allow restarting of a receiver. 43f5290 [Tathagata Das] Made functions that create input streams return InputDStream and NetworkInputDStream, for both Scala and Java. 2c94579 [Tathagata Das] Fixed graceful shutdown by removing interrupts on receiving thread. 9e37a0b [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into network-receiver-api 3223e95 [Tathagata Das] Refactored the code that runs the NetworkReceiver into further classes and traits to make them more testable. a36cc48 [Tathagata Das] Refactored the NetworkReceiver API for future stability.
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java9
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala19
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala249
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala42
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala84
5 files changed, 359 insertions, 44 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index a0b1bbc34f..f9bfb9b744 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -17,6 +17,7 @@
package org.apache.spark.streaming;
+import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
import org.junit.Assert;
@@ -36,10 +37,6 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaDStreamLike;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
// The test suite itself is Serializable so that anonymous Function implementations can be
// serialized, as an alternative to converting these anonymous classes to static inner classes;
@@ -1668,7 +1665,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
// InputStream functionality is deferred to the existing Scala tests.
@Test
public void testSocketTextStream() {
- JavaDStream<String> test = ssc.socketTextStream("localhost", 12345);
+ JavaReceiverInputDStream<String> test = ssc.socketTextStream("localhost", 12345);
}
@Test
@@ -1701,6 +1698,6 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
@Test
public void testRawSocketStream() {
- JavaDStream<String> test = ssc.rawSocketStream("localhost", 12345);
+ JavaReceiverInputDStream<String> test = ssc.rawSocketStream("localhost", 12345);
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 952511d411..46b7f63b65 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -36,10 +36,9 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.dstream.NetworkReceiver
-import org.apache.spark.streaming.receivers.Receiver
import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.util.Utils
+import org.apache.spark.streaming.receiver.{ActorHelper, Receiver}
class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
@@ -207,7 +206,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// set up the network stream using the test receiver
val ssc = new StreamingContext(conf, batchDuration)
- val networkStream = ssc.networkStream[Int](testReceiver)
+ val networkStream = ssc.receiverStream[Int](testReceiver)
val countStream = networkStream.count
val outputBuffer = new ArrayBuffer[Seq[Long]] with SynchronizedBuffer[Seq[Long]]
val outputStream = new TestOutputStream(countStream, outputBuffer)
@@ -301,7 +300,7 @@ object TestServer {
}
/** This is an actor for testing actor input stream */
-class TestActor(port: Int) extends Actor with Receiver {
+class TestActor(port: Int) extends Actor with ActorHelper {
def bytesToString(byteString: ByteString) = byteString.utf8String
@@ -309,24 +308,22 @@ class TestActor(port: Int) extends Actor with Receiver {
def receive = {
case IO.Read(socket, bytes) =>
- pushBlock(bytesToString(bytes))
+ store(bytesToString(bytes))
}
}
/** This is a receiver to test multiple threads inserting data using block generator */
class MultiThreadTestReceiver(numThreads: Int, numRecordsPerThread: Int)
- extends NetworkReceiver[Int] {
+ extends Receiver[Int](StorageLevel.MEMORY_ONLY_SER) with Logging {
lazy val executorPool = Executors.newFixedThreadPool(numThreads)
- lazy val blockGenerator = new BlockGenerator(StorageLevel.MEMORY_ONLY)
lazy val finishCount = new AtomicInteger(0)
- protected def onStart() {
- blockGenerator.start()
+ def onStart() {
(1 to numThreads).map(threadId => {
val runnable = new Runnable {
def run() {
(1 to numRecordsPerThread).foreach(i =>
- blockGenerator += (threadId * numRecordsPerThread + i) )
+ store(threadId * numRecordsPerThread + i) )
if (finishCount.incrementAndGet == numThreads) {
MultiThreadTestReceiver.haveAllThreadsFinished = true
}
@@ -337,7 +334,7 @@ class MultiThreadTestReceiver(numThreads: Int, numRecordsPerThread: Int)
})
}
- protected def onStop() {
+ def onStop() {
executorPool.shutdown()
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
new file mode 100644
index 0000000000..5c0415ad14
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.SparkConf
+import org.apache.spark.storage.{StorageLevel, StreamBlockId}
+import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver, ReceiverSupervisor}
+import org.scalatest.FunSuite
+import org.scalatest.concurrent.Timeouts
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
+
+/** Testsuite for testing the network receiver behavior */
+class NetworkReceiverSuite extends FunSuite with Timeouts {
+
+ test("network receiver life cycle") {
+
+ val receiver = new FakeReceiver
+ val executor = new FakeReceiverSupervisor(receiver)
+
+ assert(executor.isAllEmpty)
+
+ // Thread that runs the executor
+ val executingThread = new Thread() {
+ override def run() {
+ executor.start()
+ executor.awaitTermination()
+ }
+ }
+
+ // Start the receiver
+ executingThread.start()
+
+ // Verify that the receiver
+ intercept[Exception] {
+ failAfter(200 millis) {
+ executingThread.join()
+ }
+ }
+
+ // Verify that receiver was started
+ assert(receiver.onStartCalled)
+ assert(executor.isReceiverStarted)
+ assert(receiver.isStarted)
+ assert(!receiver.isStopped())
+ assert(receiver.otherThread.isAlive)
+ eventually(timeout(100 millis), interval(10 millis)) {
+ assert(receiver.receiving)
+ }
+
+ // Verify whether the data stored by the receiver was sent to the executor
+ val byteBuffer = ByteBuffer.allocate(100)
+ val arrayBuffer = new ArrayBuffer[Int]()
+ val iterator = arrayBuffer.iterator
+ receiver.store(1)
+ receiver.store(byteBuffer)
+ receiver.store(arrayBuffer)
+ receiver.store(iterator)
+ assert(executor.singles.size === 1)
+ assert(executor.singles.head === 1)
+ assert(executor.byteBuffers.size === 1)
+ assert(executor.byteBuffers.head.eq(byteBuffer))
+ assert(executor.iterators.size === 1)
+ assert(executor.iterators.head.eq(iterator))
+ assert(executor.arrayBuffers.size === 1)
+ assert(executor.arrayBuffers.head.eq(arrayBuffer))
+
+ // Verify whether the exceptions reported by the receiver was sent to the executor
+ val exception = new Exception
+ receiver.reportError("Error", exception)
+ assert(executor.errors.size === 1)
+ assert(executor.errors.head.eq(exception))
+
+ // Verify restarting actually stops and starts the receiver
+ receiver.restart("restarting", null, 100)
+ assert(receiver.isStopped)
+ assert(receiver.onStopCalled)
+ eventually(timeout(1000 millis), interval(100 millis)) {
+ assert(receiver.onStartCalled)
+ assert(executor.isReceiverStarted)
+ assert(receiver.isStarted)
+ assert(!receiver.isStopped)
+ assert(receiver.receiving)
+ }
+
+ // Verify that stopping actually stops the thread
+ failAfter(100 millis) {
+ receiver.stop("test")
+ assert(receiver.isStopped)
+ assert(!receiver.otherThread.isAlive)
+
+ // The thread that started the executor should complete
+ // as stop() stops everything
+ executingThread.join()
+ }
+ }
+
+ test("block generator") {
+ val blockGeneratorListener = new FakeBlockGeneratorListener
+ val blockInterval = 200
+ val conf = new SparkConf().set("spark.streaming.blockInterval", blockInterval.toString)
+ val blockGenerator = new BlockGenerator(blockGeneratorListener, 1, conf)
+ val expectedBlocks = 5
+ val waitTime = expectedBlocks * blockInterval + (blockInterval / 2)
+ val generatedData = new ArrayBuffer[Int]
+
+ // Generate blocks
+ val startTime = System.currentTimeMillis()
+ blockGenerator.start()
+ var count = 0
+ while(System.currentTimeMillis - startTime < waitTime) {
+ blockGenerator += count
+ generatedData += count
+ count += 1
+ Thread.sleep(10)
+ }
+ blockGenerator.stop()
+
+ val recordedData = blockGeneratorListener.arrayBuffers.flatten
+ assert(blockGeneratorListener.arrayBuffers.size > 0)
+ assert(recordedData.toSet === generatedData.toSet)
+ }
+
+ /**
+ * An implementation of NetworkReceiver that is used for testing a receiver's life cycle.
+ */
+ class FakeReceiver extends Receiver[Int](StorageLevel.MEMORY_ONLY) {
+ var otherThread: Thread = null
+ var receiving = false
+ var onStartCalled = false
+ var onStopCalled = false
+
+ def onStart() {
+ otherThread = new Thread() {
+ override def run() {
+ receiving = true
+ while(!isStopped()) {
+ Thread.sleep(10)
+ }
+ }
+ }
+ onStartCalled = true
+ otherThread.start()
+
+ }
+
+ def onStop() {
+ onStopCalled = true
+ otherThread.join()
+ }
+
+ def reset() {
+ receiving = false
+ onStartCalled = false
+ onStopCalled = false
+ }
+ }
+
+ /**
+ * An implementation of NetworkReceiverExecutor used for testing a NetworkReceiver.
+ * Instead of storing the data in the BlockManager, it stores all the data in a local buffer
+ * that can used for verifying that the data has been forwarded correctly.
+ */
+ class FakeReceiverSupervisor(receiver: FakeReceiver)
+ extends ReceiverSupervisor(receiver, new SparkConf()) {
+ val singles = new ArrayBuffer[Any]
+ val byteBuffers = new ArrayBuffer[ByteBuffer]
+ val iterators = new ArrayBuffer[Iterator[_]]
+ val arrayBuffers = new ArrayBuffer[ArrayBuffer[_]]
+ val errors = new ArrayBuffer[Throwable]
+
+ /** Check if all data structures are clean */
+ def isAllEmpty = {
+ singles.isEmpty && byteBuffers.isEmpty && iterators.isEmpty &&
+ arrayBuffers.isEmpty && errors.isEmpty
+ }
+
+ def pushSingle(data: Any) {
+ singles += data
+ }
+
+ def pushBytes(
+ bytes: ByteBuffer,
+ optionalMetadata: Option[Any],
+ optionalBlockId: Option[StreamBlockId]
+ ) {
+ byteBuffers += bytes
+ }
+
+ def pushIterator(
+ iterator: Iterator[_],
+ optionalMetadata: Option[Any],
+ optionalBlockId: Option[StreamBlockId]
+ ) {
+ iterators += iterator
+ }
+
+ def pushArrayBuffer(
+ arrayBuffer: ArrayBuffer[_],
+ optionalMetadata: Option[Any],
+ optionalBlockId: Option[StreamBlockId]
+ ) {
+ arrayBuffers += arrayBuffer
+ }
+
+ def reportError(message: String, throwable: Throwable) {
+ errors += throwable
+ }
+ }
+
+ /**
+ * An implementation of BlockGeneratorListener that is used to test the BlockGenerator.
+ */
+ class FakeBlockGeneratorListener(pushDelay: Long = 0) extends BlockGeneratorListener {
+ // buffer of data received as ArrayBuffers
+ val arrayBuffers = new ArrayBuffer[ArrayBuffer[Int]]
+ val errors = new ArrayBuffer[Throwable]
+
+ def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {
+ val bufferOfInts = arrayBuffer.map(_.asInstanceOf[Int])
+ arrayBuffers += bufferOfInts
+ Thread.sleep(0)
+ }
+
+ def onError(message: String, throwable: Throwable) {
+ errors += throwable
+ }
+ }
+}
+
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index ad5367ab94..6d14b1f785 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -21,7 +21,8 @@ import java.util.concurrent.atomic.AtomicInteger
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException}
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.dstream.{DStream, NetworkReceiver}
+import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.util.{MetadataCleaner, Utils}
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.concurrent.Timeouts
@@ -181,15 +182,15 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
conf.set("spark.cleaner.ttl", "3600")
sc = new SparkContext(conf)
for (i <- 1 to 4) {
- logInfo("==================================")
- ssc = new StreamingContext(sc, batchDuration)
+ logInfo("==================================\n\n\n")
+ ssc = new StreamingContext(sc, Milliseconds(100))
var runningCount = 0
TestReceiver.counter.set(1)
val input = ssc.networkStream(new TestReceiver)
input.count.foreachRDD(rdd => {
val count = rdd.first()
- logInfo("Count = " + count)
runningCount += count.toInt
+ logInfo("Count = " + count + ", Running count = " + runningCount)
})
ssc.start()
ssc.awaitTermination(500)
@@ -216,12 +217,12 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
ssc.start()
}
- // test whether waitForStop() exits after give amount of time
+ // test whether awaitTermination() exits after give amount of time
failAfter(1000 millis) {
ssc.awaitTermination(500)
}
- // test whether waitForStop() does not exit if not time is given
+ // test whether awaitTermination() does not exit if not time is given
val exception = intercept[Exception] {
failAfter(1000 millis) {
ssc.awaitTermination()
@@ -276,23 +277,26 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
class TestException(msg: String) extends Exception(msg)
/** Custom receiver for testing whether all data received by a receiver gets processed or not */
-class TestReceiver extends NetworkReceiver[Int] {
- protected lazy val blockGenerator = new BlockGenerator(StorageLevel.MEMORY_ONLY)
- protected def onStart() {
- blockGenerator.start()
- logInfo("BlockGenerator started on thread " + receivingThread)
- try {
- while(true) {
- blockGenerator += TestReceiver.counter.getAndIncrement
- Thread.sleep(0)
+class TestReceiver extends Receiver[Int](StorageLevel.MEMORY_ONLY) with Logging {
+
+ var receivingThreadOption: Option[Thread] = None
+
+ def onStart() {
+ val thread = new Thread() {
+ override def run() {
+ logInfo("Receiving started")
+ while (!isStopped) {
+ store(TestReceiver.counter.getAndIncrement)
+ }
+ logInfo("Receiving stopped at count value of " + TestReceiver.counter.get())
}
- } finally {
- logInfo("Receiving stopped at count value of " + TestReceiver.counter.get())
}
+ receivingThreadOption = Some(thread)
+ thread.start()
}
- protected def onStop() {
- blockGenerator.stop()
+ def onStop() {
+ // no cleanup to be done, the receiving thread should stop on it own
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index 9e0f2c900e..542c697ae3 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -17,10 +17,19 @@
package org.apache.spark.streaming
-import org.apache.spark.streaming.scheduler._
import scala.collection.mutable.ArrayBuffer
-import org.scalatest.matchers.ShouldMatchers
+import scala.concurrent.Future
+import scala.concurrent.ExecutionContext.Implicits.global
+
+import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.streaming.scheduler._
+
+import org.scalatest.matchers.ShouldMatchers
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
+import org.apache.spark.Logging
class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers {
@@ -32,7 +41,7 @@ class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers {
override def batchDuration = Milliseconds(100)
override def actuallyWait = true
- test("basic BatchInfo generation") {
+ test("batch info reporting") {
val ssc = setupStreams(input, operation)
val collector = new BatchInfoCollector
ssc.addStreamingListener(collector)
@@ -54,6 +63,31 @@ class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers {
isInIncreasingOrder(batchInfos.map(_.processingEndTime.get)) should be (true)
}
+ test("receiver info reporting") {
+ val ssc = new StreamingContext("local[2]", "test", Milliseconds(1000))
+ val inputStream = ssc.networkStream(new StreamingListenerSuiteReceiver)
+ inputStream.foreachRDD(_.count)
+
+ val collector = new ReceiverInfoCollector
+ ssc.addStreamingListener(collector)
+
+ ssc.start()
+ try {
+ eventually(timeout(1000 millis), interval(20 millis)) {
+ collector.startedReceiverInfo should have size 1
+ collector.startedReceiverInfo(0).streamId should equal (0)
+ collector.stoppedReceiverStreamIds should have size 1
+ collector.stoppedReceiverStreamIds(0) should equal (0)
+ collector.receiverErrors should have size 1
+ collector.receiverErrors(0)._1 should equal (0)
+ collector.receiverErrors(0)._2 should include ("report error")
+ collector.receiverErrors(0)._3 should include ("report exception")
+ }
+ } finally {
+ ssc.stop()
+ }
+ }
+
/** Check if a sequence of numbers is in increasing order */
def isInIncreasingOrder(seq: Seq[Long]): Boolean = {
for(i <- 1 until seq.size) {
@@ -61,12 +95,46 @@ class StreamingListenerSuite extends TestSuiteBase with ShouldMatchers {
}
true
}
+}
+
+/** Listener that collects information on processed batches */
+class BatchInfoCollector extends StreamingListener {
+ val batchInfos = new ArrayBuffer[BatchInfo]
+ override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
+ batchInfos += batchCompleted.batchInfo
+ }
+}
+
+/** Listener that collects information on processed batches */
+class ReceiverInfoCollector extends StreamingListener {
+ val startedReceiverInfo = new ArrayBuffer[ReceiverInfo]
+ val stoppedReceiverStreamIds = new ArrayBuffer[Int]()
+ val receiverErrors = new ArrayBuffer[(Int, String, String)]()
+
+ override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) {
+ startedReceiverInfo += receiverStarted.receiverInfo
+ }
+
+ override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) {
+ stoppedReceiverStreamIds += receiverStopped.streamId
+ }
+
+ override def onReceiverError(receiverError: StreamingListenerReceiverError) {
+ receiverErrors += ((receiverError.streamId, receiverError.message, receiverError.error))
+ }
+}
- /** Listener that collects information on processed batches */
- class BatchInfoCollector extends StreamingListener {
- val batchInfos = new ArrayBuffer[BatchInfo]
- override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
- batchInfos += batchCompleted.batchInfo
+class StreamingListenerSuiteReceiver extends Receiver[Any](StorageLevel.MEMORY_ONLY) with Logging {
+ def onStart() {
+ Future {
+ logInfo("Started receiver and sleeping")
+ Thread.sleep(10)
+ logInfo("Reporting error and sleeping")
+ reportError("test report error", new Exception("test report exception"))
+ Thread.sleep(10)
+ logInfo("Stopping")
+ stop("test stop error")
}
}
+ def onStop() { }
}