aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-06-05 12:46:02 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-06-05 12:46:02 -0700
commit4f16d3fe2e260a716b5b4e4005cb6229386440ed (patch)
tree9d4a79e53d46d51d89e9cb5daf0bfc0fa255fc04 /streaming
parent3f80bc841ab155925fb0530eef5927990f4a5793 (diff)
downloadspark-4f16d3fe2e260a716b5b4e4005cb6229386440ed.tar.gz
spark-4f16d3fe2e260a716b5b4e4005cb6229386440ed.tar.bz2
spark-4f16d3fe2e260a716b5b4e4005cb6229386440ed.zip
[SPARK-8112] [STREAMING] Fix the negative event count issue
Author: zsxwing <zsxwing@gmail.com> Closes #6659 from zsxwing/SPARK-8112 and squashes the following commits: a5d7da6 [zsxwing] Address comments d255b6e [zsxwing] Fix the negative event count issue
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala2
5 files changed, 10 insertions, 6 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
index e4ff05e12f..e76e7eb0de 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
@@ -70,7 +70,7 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray
// Register the input blocks information into InputInfoTracker
- val inputInfo = InputInfo(id, blockInfos.map(_.numRecords).sum)
+ val inputInfo = InputInfo(id, blockInfos.flatMap(_.numRecords).sum)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
if (blockInfos.nonEmpty) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 92938379b9..8be732b64e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -138,8 +138,8 @@ private[streaming] class ReceiverSupervisorImpl(
) {
val blockId = blockIdOption.getOrElse(nextBlockId)
val numRecords = receivedBlock match {
- case ArrayBufferBlock(arrayBuffer) => arrayBuffer.size
- case _ => -1
+ case ArrayBufferBlock(arrayBuffer) => Some(arrayBuffer.size.toLong)
+ case _ => None
}
val time = System.currentTimeMillis
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala
index a72efccf2f..7c0db8a863 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala
@@ -23,7 +23,9 @@ import org.apache.spark.Logging
import org.apache.spark.streaming.{Time, StreamingContext}
/** To track the information of input stream at specified batch time. */
-private[streaming] case class InputInfo(inputStreamId: Int, numRecords: Long)
+private[streaming] case class InputInfo(inputStreamId: Int, numRecords: Long) {
+ require(numRecords >= 0, "numRecords must not be negative")
+}
/**
* This class manages all the input streams as well as their input data statistics. The information
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala
index dc11e84f29..656ac80df8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala
@@ -24,11 +24,13 @@ import org.apache.spark.streaming.util.WriteAheadLogRecordHandle
/** Information about blocks received by the receiver */
private[streaming] case class ReceivedBlockInfo(
streamId: Int,
- numRecords: Long,
+ numRecords: Option[Long],
metadataOption: Option[Any],
blockStoreResult: ReceivedBlockStoreResult
) {
+ require(numRecords.isEmpty || numRecords.get >= 0, "numRecords must not be negative")
+
@volatile private var _isBlockIdValid = true
def blockId: StreamBlockId = blockStoreResult.blockId
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index 6f0ee774cb..be305b5e0d 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -224,7 +224,7 @@ class ReceivedBlockTrackerSuite
/** Generate blocks infos using random ids */
def generateBlockInfos(): Seq[ReceivedBlockInfo] = {
- List.fill(5)(ReceivedBlockInfo(streamId, 0, None,
+ List.fill(5)(ReceivedBlockInfo(streamId, Some(0L), None,
BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt)))))
}