aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2014-11-14 14:33:37 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-11-14 14:33:37 -0800
commit5930f64bf0d2516304b21bd49eac361a54caabdd (patch)
tree4fc481c652e4c553e8c6ae4a3f87ed329908c73c /streaming/src
parent0cbdb01e1c817e71c4f80de05c4e5bb11510b368 (diff)
downloadspark-5930f64bf0d2516304b21bd49eac361a54caabdd.tar.gz
spark-5930f64bf0d2516304b21bd49eac361a54caabdd.tar.bz2
spark-5930f64bf0d2516304b21bd49eac361a54caabdd.zip
[SPARK-4062][Streaming]Add ReliableKafkaReceiver in Spark Streaming Kafka connector
Add ReliableKafkaReceiver in Kafka connector to prevent data loss if WAL in Spark Streaming is enabled. Details and design doc can be seen in [SPARK-4062](https://issues.apache.org/jira/browse/SPARK-4062). Author: jerryshao <saisai.shao@intel.com> Author: Tathagata Das <tathagata.das1565@gmail.com> Author: Saisai Shao <saisai.shao@intel.com> Closes #2991 from jerryshao/kafka-refactor and squashes the following commits: 5461f1c [Saisai Shao] Merge pull request #8 from tdas/kafka-refactor3 eae4ad6 [Tathagata Das] Refectored KafkaStreamSuiteBased to eliminate KafkaTestUtils and made Java more robust. fab14c7 [Tathagata Das] minor update. 149948b [Tathagata Das] Fixed mistake 14630aa [Tathagata Das] Minor updates. d9a452c [Tathagata Das] Minor updates. ec2e95e [Tathagata Das] Removed the receiver's locks and essentially reverted to Saisai's original design. 2a20a01 [jerryshao] Address some comments 9f636b3 [Saisai Shao] Merge pull request #5 from tdas/kafka-refactor b2b2f84 [Tathagata Das] Refactored Kafka receiver logic and Kafka testsuites e501b3c [jerryshao] Add Mima excludes b798535 [jerryshao] Fix the missed issue e5e21c1 [jerryshao] Change to while loop ea873e4 [jerryshao] Further address the comments 98f3d07 [jerryshao] Fix comment style 4854ee9 [jerryshao] Address all the comments 96c7a1d [jerryshao] Update the ReliableKafkaReceiver unit test 8135d31 [jerryshao] Fix flaky test a949741 [jerryshao] Address the comments 16bfe78 [jerryshao] Change the ordering of imports 0894aef [jerryshao] Add some comments 77c3e50 [jerryshao] Code refactor and add some unit tests dd9aeeb [jerryshao] Initial commit for reliable Kafka receiver
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala55
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala8
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala8
3 files changed, 60 insertions, 11 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
index 0316b6862f..55765dc906 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
@@ -27,9 +27,38 @@ import org.apache.spark.streaming.util.{RecurringTimer, SystemClock}
/** Listener object for BlockGenerator events */
private[streaming] trait BlockGeneratorListener {
- /** Called when a new block needs to be pushed */
+ /**
+ * Called after a data item is added into the BlockGenerator. The data addition and this
+ * callback are synchronized with the block generation and its associated callback,
+ * so block generation waits for the active data addition+callback to complete. This is useful
+ * for updating metadata on successful buffering of a data item, specifically that metadata
+ * that will be useful when a block is generated. Any long blocking operation in this callback
+ * will hurt the throughput.
+ */
+ def onAddData(data: Any, metadata: Any)
+
+ /**
+ * Called when a new block of data is generated by the block generator. The block generation
+ * and this callback are synchronized with the data addition and its associated callback, so
+ * the data addition waits for the block generation+callback to complete. This is useful
+ * for updating metadata when a block has been generated, specifically metadata that will
+ * be useful when the block has been successfully stored. Any long blocking operation in this
+ * callback will hurt the throughput.
+ */
+ def onGenerateBlock(blockId: StreamBlockId)
+
+ /**
+ * Called when a new block is ready to be pushed. Callers are supposed to store the block into
+ * Spark in this method. Internally this is called from a single
+ * thread, that is not synchronized with any other callbacks. Hence it is okay to do long
+ * blocking operation in this callback.
+ */
def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_])
- /** Called when an error has occurred in BlockGenerator */
+
+ /**
+ * Called when an error has occurred in the BlockGenerator. Can be called form many places
+ * so better to not do any long block operation in this callback.
+ */
def onError(message: String, throwable: Throwable)
}
@@ -80,9 +109,20 @@ private[streaming] class BlockGenerator(
* Push a single data item into the buffer. All received data items
* will be periodically pushed into BlockManager.
*/
- def += (data: Any): Unit = synchronized {
+ def addData (data: Any): Unit = synchronized {
+ waitToPush()
+ currentBuffer += data
+ }
+
+ /**
+ * Push a single data item into the buffer. After buffering the data, the
+ * `BlockGeneratorListnere.onAddData` callback will be called. All received data items
+ * will be periodically pushed into BlockManager.
+ */
+ def addDataWithCallback(data: Any, metadata: Any) = synchronized {
waitToPush()
currentBuffer += data
+ listener.onAddData(data, metadata)
}
/** Change the buffer to which single records are added to. */
@@ -93,14 +133,15 @@ private[streaming] class BlockGenerator(
if (newBlockBuffer.size > 0) {
val blockId = StreamBlockId(receiverId, time - blockInterval)
val newBlock = new Block(blockId, newBlockBuffer)
+ listener.onGenerateBlock(blockId)
blocksForPushing.put(newBlock) // put is blocking when queue is full
logDebug("Last element in " + blockId + " is " + newBlockBuffer.last)
}
} catch {
case ie: InterruptedException =>
logInfo("Block updating timer thread was interrupted")
- case t: Throwable =>
- reportError("Error in block updating thread", t)
+ case e: Exception =>
+ reportError("Error in block updating thread", e)
}
}
@@ -126,8 +167,8 @@ private[streaming] class BlockGenerator(
} catch {
case ie: InterruptedException =>
logInfo("Block pushing thread was interrupted")
- case t: Throwable =>
- reportError("Error in block pushing thread", t)
+ case e: Exception =>
+ reportError("Error in block pushing thread", e)
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 5360412330..3b1233e86c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -27,10 +27,10 @@ import akka.actor.{Actor, Props}
import akka.pattern.ask
import com.google.common.base.Throwables
import org.apache.hadoop.conf.Configuration
+
import org.apache.spark.{Logging, SparkEnv, SparkException}
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.scheduler._
-import org.apache.spark.streaming.util.WriteAheadLogFileSegment
import org.apache.spark.util.{AkkaUtils, Utils}
/**
@@ -99,6 +99,10 @@ private[streaming] class ReceiverSupervisorImpl(
/** Divides received data records into data blocks for pushing in BlockManager. */
private val blockGenerator = new BlockGenerator(new BlockGeneratorListener {
+ def onAddData(data: Any, metadata: Any): Unit = { }
+
+ def onGenerateBlock(blockId: StreamBlockId): Unit = { }
+
def onError(message: String, throwable: Throwable) {
reportError(message, throwable)
}
@@ -110,7 +114,7 @@ private[streaming] class ReceiverSupervisorImpl(
/** Push a single record of received data into block generator. */
def pushSingle(data: Any) {
- blockGenerator += (data)
+ blockGenerator.addData(data)
}
/** Store an ArrayBuffer of received data as a data block into Spark's memory. */
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
index 0f6a9489db..e26c0c6859 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
@@ -138,7 +138,7 @@ class ReceiverSuite extends FunSuite with Timeouts {
blockGenerator.start()
var count = 0
while(System.currentTimeMillis - startTime < waitTime) {
- blockGenerator += count
+ blockGenerator.addData(count)
generatedData += count
count += 1
Thread.sleep(10)
@@ -168,7 +168,7 @@ class ReceiverSuite extends FunSuite with Timeouts {
blockGenerator.start()
var count = 0
while(System.currentTimeMillis - startTime < waitTime) {
- blockGenerator += count
+ blockGenerator.addData(count)
generatedData += count
count += 1
Thread.sleep(1)
@@ -299,6 +299,10 @@ class ReceiverSuite extends FunSuite with Timeouts {
val arrayBuffers = new ArrayBuffer[ArrayBuffer[Int]]
val errors = new ArrayBuffer[Throwable]
+ def onAddData(data: Any, metadata: Any) { }
+
+ def onGenerateBlock(blockId: StreamBlockId) { }
+
def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {
val bufferOfInts = arrayBuffer.map(_.asInstanceOf[Int])
arrayBuffers += bufferOfInts