aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-05-23 02:11:17 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-05-23 02:11:17 -0700
commitad0badba1450295982738934da2cc121cde18213 (patch)
tree58e2abb36dc3ba6cd2a4be855b3d9600d8267805 /streaming
parenta40bca0111de45763c3ef4270afb2185c16b8f95 (diff)
downloadspark-ad0badba1450295982738934da2cc121cde18213.tar.gz
spark-ad0badba1450295982738934da2cc121cde18213.tar.bz2
spark-ad0badba1450295982738934da2cc121cde18213.zip
[SPARK-7777][Streaming] Handle the case when there is no block in a batch
In the old implementation, if a batch has no block, `areWALRecordHandlesPresent` will be `true` and it will return `WriteAheadLogBackedBlockRDD`. This PR handles this case by returning `WriteAheadLogBackedBlockRDD` or `BlockRDD` according to the configuration. Author: zsxwing <zsxwing@gmail.com> Closes #6372 from zsxwing/SPARK-7777 and squashes the following commits: 788f895 [zsxwing] Handle the case when there is no block in a batch
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala47
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala31
2 files changed, 60 insertions, 18 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
index 5cfe43a1ce..e4ff05e12f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
@@ -73,27 +73,38 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
val inputInfo = InputInfo(id, blockInfos.map(_.numRecords).sum)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
- // Are WAL record handles present with all the blocks
- val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }
+ if (blockInfos.nonEmpty) {
+ // Are WAL record handles present with all the blocks
+ val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }
- if (areWALRecordHandlesPresent) {
- // If all the blocks have WAL record handle, then create a WALBackedBlockRDD
- val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
- val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray
- new WriteAheadLogBackedBlockRDD[T](
- ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
- } else {
- // Else, create a BlockRDD. However, if there are some blocks with WAL info but not others
- // then that is unexpected and log a warning accordingly.
- if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
- if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
- logError("Some blocks do not have Write Ahead Log information; " +
- "this is unexpected and data may not be recoverable after driver failures")
- } else {
- logWarning("Some blocks have Write Ahead Log information; this is unexpected")
+ if (areWALRecordHandlesPresent) {
+ // If all the blocks have WAL record handle, then create a WALBackedBlockRDD
+ val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
+ val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray
+ new WriteAheadLogBackedBlockRDD[T](
+ ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
+ } else {
+ // Else, create a BlockRDD. However, if there are some blocks with WAL info but not
+ // others then that is unexpected and log a warning accordingly.
+ if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
+ if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
+ logError("Some blocks do not have Write Ahead Log information; " +
+ "this is unexpected and data may not be recoverable after driver failures")
+ } else {
+ logWarning("Some blocks have Write Ahead Log information; this is unexpected")
+ }
}
+ new BlockRDD[T](ssc.sc, blockIds)
+ }
+ } else {
+ // If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD
+ // according to the configuration
+ if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
+ new WriteAheadLogBackedBlockRDD[T](
+ ssc.sparkContext, Array.empty, Array.empty, Array.empty)
+ } else {
+ new BlockRDD[T](ssc.sc, Array.empty)
}
- new BlockRDD[T](ssc.sc, blockIds)
}
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 93e6b0cd7c..0122514f93 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -39,6 +39,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.scheduler.{StreamingListenerBatchCompleted, StreamingListener}
import org.apache.spark.util.{ManualClock, Utils}
import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream}
+import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD
import org.apache.spark.streaming.receiver.Receiver
class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
@@ -105,6 +106,36 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
}
+ test("socket input stream - no block in a batch") {
+ withTestServer(new TestServer()) { testServer =>
+ testServer.start()
+
+ withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
+ ssc.addStreamingListener(ssc.progressListener)
+
+ val batchCounter = new BatchCounter(ssc)
+ val networkStream = ssc.socketTextStream(
+ "localhost", testServer.port, StorageLevel.MEMORY_AND_DISK)
+ val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
+ val outputStream = new TestOutputStream(networkStream, outputBuffer)
+ outputStream.register()
+ ssc.start()
+
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ clock.advance(batchDuration.milliseconds)
+
+ // Make sure the first batch is finished
+ if (!batchCounter.waitUntilBatchesCompleted(1, 30000)) {
+ fail("Timeout: cannot finish all batches in 30 seconds")
+ }
+
+ networkStream.generatedRDDs.foreach { case (_, rdd) =>
+ assert(!rdd.isInstanceOf[WriteAheadLogBackedBlockRDD[_]])
+ }
+ }
+ }
+ }
+
test("binary records stream") {
val testDir: File = null
try {