aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala55
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala8
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala8
3 files changed, 60 insertions, 11 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
index 0316b6862f..55765dc906 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
@@ -27,9 +27,38 @@ import org.apache.spark.streaming.util.{RecurringTimer, SystemClock}
/** Listener object for BlockGenerator events */
private[streaming] trait BlockGeneratorListener {
- /** Called when a new block needs to be pushed */
+ /**
+ * Called after a data item is added into the BlockGenerator. The data addition and this
+ * callback are synchronized with the block generation and its associated callback,
+ * so block generation waits for the active data addition+callback to complete. This is useful
+ * for updating metadata on successful buffering of a data item, specifically that metadata
+ * that will be useful when a block is generated. Any long blocking operation in this callback
+ * will hurt the throughput.
+ */
+ def onAddData(data: Any, metadata: Any)
+
+ /**
+ * Called when a new block of data is generated by the block generator. The block generation
+ * and this callback are synchronized with the data addition and its associated callback, so
+ * the data addition waits for the block generation+callback to complete. This is useful
+ * for updating metadata when a block has been generated, specifically metadata that will
+ * be useful when the block has been successfully stored. Any long blocking operation in this
+ * callback will hurt the throughput.
+ */
+ def onGenerateBlock(blockId: StreamBlockId)
+
+ /**
+ * Called when a new block is ready to be pushed. Callers are supposed to store the block into
+ * Spark in this method. Internally this is called from a single
+ * thread, that is not synchronized with any other callbacks. Hence it is okay to do long
+ * blocking operation in this callback.
+ */
def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_])
- /** Called when an error has occurred in BlockGenerator */
+
+ /**
+ * Called when an error has occurred in the BlockGenerator. Can be called form many places
+ * so better to not do any long block operation in this callback.
+ */
def onError(message: String, throwable: Throwable)
}
@@ -80,9 +109,20 @@ private[streaming] class BlockGenerator(
* Push a single data item into the buffer. All received data items
* will be periodically pushed into BlockManager.
*/
- def += (data: Any): Unit = synchronized {
+ def addData (data: Any): Unit = synchronized {
+ waitToPush()
+ currentBuffer += data
+ }
+
+ /**
+ * Push a single data item into the buffer. After buffering the data, the
+ * `BlockGeneratorListnere.onAddData` callback will be called. All received data items
+ * will be periodically pushed into BlockManager.
+ */
+ def addDataWithCallback(data: Any, metadata: Any) = synchronized {
waitToPush()
currentBuffer += data
+ listener.onAddData(data, metadata)
}
/** Change the buffer to which single records are added to. */
@@ -93,14 +133,15 @@ private[streaming] class BlockGenerator(
if (newBlockBuffer.size > 0) {
val blockId = StreamBlockId(receiverId, time - blockInterval)
val newBlock = new Block(blockId, newBlockBuffer)
+ listener.onGenerateBlock(blockId)
blocksForPushing.put(newBlock) // put is blocking when queue is full
logDebug("Last element in " + blockId + " is " + newBlockBuffer.last)
}
} catch {
case ie: InterruptedException =>
logInfo("Block updating timer thread was interrupted")
- case t: Throwable =>
- reportError("Error in block updating thread", t)
+ case e: Exception =>
+ reportError("Error in block updating thread", e)
}
}
@@ -126,8 +167,8 @@ private[streaming] class BlockGenerator(
} catch {
case ie: InterruptedException =>
logInfo("Block pushing thread was interrupted")
- case t: Throwable =>
- reportError("Error in block pushing thread", t)
+ case e: Exception =>
+ reportError("Error in block pushing thread", e)
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 5360412330..3b1233e86c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -27,10 +27,10 @@ import akka.actor.{Actor, Props}
import akka.pattern.ask
import com.google.common.base.Throwables
import org.apache.hadoop.conf.Configuration
+
import org.apache.spark.{Logging, SparkEnv, SparkException}
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.scheduler._
-import org.apache.spark.streaming.util.WriteAheadLogFileSegment
import org.apache.spark.util.{AkkaUtils, Utils}
/**
@@ -99,6 +99,10 @@ private[streaming] class ReceiverSupervisorImpl(
/** Divides received data records into data blocks for pushing in BlockManager. */
private val blockGenerator = new BlockGenerator(new BlockGeneratorListener {
+ def onAddData(data: Any, metadata: Any): Unit = { }
+
+ def onGenerateBlock(blockId: StreamBlockId): Unit = { }
+
def onError(message: String, throwable: Throwable) {
reportError(message, throwable)
}
@@ -110,7 +114,7 @@ private[streaming] class ReceiverSupervisorImpl(
/** Push a single record of received data into block generator. */
def pushSingle(data: Any) {
- blockGenerator += (data)
+ blockGenerator.addData(data)
}
/** Store an ArrayBuffer of received data as a data block into Spark's memory. */
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
index 0f6a9489db..e26c0c6859 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
@@ -138,7 +138,7 @@ class ReceiverSuite extends FunSuite with Timeouts {
blockGenerator.start()
var count = 0
while(System.currentTimeMillis - startTime < waitTime) {
- blockGenerator += count
+ blockGenerator.addData(count)
generatedData += count
count += 1
Thread.sleep(10)
@@ -168,7 +168,7 @@ class ReceiverSuite extends FunSuite with Timeouts {
blockGenerator.start()
var count = 0
while(System.currentTimeMillis - startTime < waitTime) {
- blockGenerator += count
+ blockGenerator.addData(count)
generatedData += count
count += 1
Thread.sleep(1)
@@ -299,6 +299,10 @@ class ReceiverSuite extends FunSuite with Timeouts {
val arrayBuffers = new ArrayBuffer[ArrayBuffer[Int]]
val errors = new ArrayBuffer[Throwable]
+ def onAddData(data: Any, metadata: Any) { }
+
+ def onGenerateBlock(blockId: StreamBlockId) { }
+
def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {
val bufferOfInts = arrayBuffer.map(_.asInstanceOf[Int])
arrayBuffers += bufferOfInts