aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorfrreiss <frreiss@us.ibm.com>2016-10-26 17:33:08 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-10-26 17:33:08 -0700
commit5b27598ff50cb08e7570fade458da0a3d4d4eabc (patch)
treecb1aa8d34585bf459168bd3e5a323637fe686877 /sql
parenta76846cfb1c2d6c8f4d647426030b59de20d9433 (diff)
downloadspark-5b27598ff50cb08e7570fade458da0a3d4d4eabc.tar.gz
spark-5b27598ff50cb08e7570fade458da0a3d4d4eabc.tar.bz2
spark-5b27598ff50cb08e7570fade458da0a3d4d4eabc.zip
[SPARK-16963][STREAMING][SQL] Changes to Source trait and related implementation classes
## What changes were proposed in this pull request? This PR contains changes to the Source trait such that the scheduler can notify data sources when it is safe to discard buffered data. Summary of changes: * Added a method `commit(end: Offset)` that tells the Source that is OK to discard all offsets up `end`, inclusive. * Changed the semantics of a `None` value for the `getBatch` method to mean "from the very beginning of the stream"; as opposed to "all data present in the Source's buffer". * Added notes that the upper layers of the system will never call `getBatch` with a start value less than the last value passed to `commit`. * Added a `lastCommittedOffset` method to allow the scheduler to query the status of each Source on restart. This addition is not strictly necessary, but it seemed like a good idea -- Sources will be maintaining their own persistent state, and there may be bugs in the checkpointing code. * The scheduler in `StreamExecution.scala` now calls `commit` on its stream sources after marking each batch as complete in its checkpoint. * `MemoryStream` now cleans committed batches out of its internal buffer. * `TextSocketSource` now cleans committed batches from its internal buffer. ## How was this patch tested? Existing regression tests already exercise the new code. Author: frreiss <frreiss@us.ibm.com> Closes #14553 from frreiss/fred-16963.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala22
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala32
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala47
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala72
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala8
6 files changed, 154 insertions, 36 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 115edf7ab2..a392b82999 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -176,6 +176,15 @@ class FileStreamSource(
override def toString: String = s"FileStreamSource[$qualifiedBasePath]"
+ /**
+ * Informs the source that Spark has completed processing all data for offsets less than or
+ * equal to `end` and will only request offsets greater than `end` in the future.
+ */
+ override def commit(end: Offset): Unit = {
+ // No-op for now; FileStreamSource currently garbage-collects files based on timestamp
+ // and the value of the maxFileAge parameter.
+ }
+
override def stop() {}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
index 971147840d..f3bd5bfe23 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
@@ -30,16 +30,30 @@ trait Source {
/** Returns the schema of the data from this source */
def schema: StructType
- /** Returns the maximum available offset for this source. */
+ /**
+ * Returns the maximum available offset for this source.
+ * Returns `None` if this source has never received any data.
+ */
def getOffset: Option[Offset]
/**
- * Returns the data that is between the offsets (`start`, `end`]. When `start` is `None` then
- * the batch should begin with the first available record. This method must always return the
- * same data for a particular `start` and `end` pair.
+ * Returns the data that is between the offsets (`start`, `end`]. When `start` is `None`,
+ * then the batch should begin with the first record. This method must always return the
+ * same data for a particular `start` and `end` pair; even after the Source has been restarted
+ * on a different node.
+ *
+ * Higher layers will always call this method with a value of `start` greater than or equal
+ * to the last value passed to `commit` and a value of `end` less than or equal to the
+ * last value returned by `getOffset`
*/
def getBatch(start: Option[Offset], end: Offset): DataFrame
+ /**
+ * Informs the source that Spark has completed processing all data for offsets less than or
+ * equal to `end` and will only request offsets greater than `end` in the future.
+ */
+ def commit(end: Offset) : Unit = {}
+
/** Stop this source and free any resources it has allocated. */
def stop(): Unit
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index ba8cf808e3..37af1a550a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -73,6 +73,9 @@ class StreamExecution(
/**
* Tracks how much data we have processed and committed to the sink or state store from each
* input source.
+ * Only the scheduler thread should modify this field, and only in atomic steps.
+ * Other threads should make a shallow copy if they are going to access this field more than
+ * once, since the field's value may change at any time.
*/
@volatile
var committedOffsets = new StreamProgress
@@ -80,6 +83,9 @@ class StreamExecution(
/**
* Tracks the offsets that are available to be processed, but have not yet be committed to the
* sink.
+ * Only the scheduler thread should modify this field, and only in atomic steps.
+ * Other threads should make a shallow copy if they are going to access this field more than
+ * once, since the field's value may change at any time.
*/
@volatile
private var availableOffsets = new StreamProgress
@@ -337,17 +343,27 @@ class StreamExecution(
}
if (hasNewData) {
reportTimeTaken(OFFSET_WAL_WRITE_LATENCY) {
- assert(
- offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)),
+ assert(offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)),
s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
logInfo(s"Committed offsets for batch $currentBatchId.")
+ // NOTE: The following code is correct because runBatches() processes exactly one
+ // batch at a time. If we add pipeline parallelism (multiple batches in flight at
+ // the same time), this cleanup logic will need to change.
+
+ // Now that we've updated the scheduler's persistent checkpoint, it is safe for the
+ // sources to discard data from the previous batch.
+ val prevBatchOff = offsetLog.get(currentBatchId - 1)
+ if (prevBatchOff.isDefined) {
+ prevBatchOff.get.toStreamProgress(sources).foreach {
+ case (src, off) => src.commit(off)
+ }
+ }
+
// Now that we have logged the new batch, no further processing will happen for
- // the previous batch, and it is safe to discard the old metadata.
- // Note that purge is exclusive, i.e. it purges everything before currentBatchId.
- // NOTE: If StreamExecution implements pipeline parallelism (multiple batches in
- // flight at the same time), this cleanup logic will need to change.
- offsetLog.purge(currentBatchId)
+ // the batch before the previous batch, and it is safe to discard the old metadata.
+ // Note that purge is exclusive, i.e. it purges everything before the target ID.
+ offsetLog.purge(currentBatchId - 1)
}
} else {
awaitBatchLock.lock()
@@ -455,7 +471,7 @@ class StreamExecution(
/**
* Blocks the current thread until processing for data from the given `source` has reached at
- * least the given `Offset`. This method is indented for use primarily when writing tests.
+ * least the given `Offset`. This method is intended for use primarily when writing tests.
*/
private[sql] def awaitOffset(source: Source, newOffset: Offset): Unit = {
def notDone = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index 788fcd0361..48d9791faf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.streaming
import java.util.concurrent.atomic.AtomicInteger
import javax.annotation.concurrent.GuardedBy
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.util.control.NonFatal
import org.apache.spark.internal.Logging
@@ -51,12 +51,23 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
protected val logicalPlan = StreamingExecutionRelation(this)
protected val output = logicalPlan.output
+ /**
+ * All batches from `lastCommittedOffset + 1` to `currentOffset`, inclusive.
+ * Stored in a ListBuffer to facilitate removing committed batches.
+ */
@GuardedBy("this")
- protected val batches = new ArrayBuffer[Dataset[A]]
+ protected val batches = new ListBuffer[Dataset[A]]
@GuardedBy("this")
protected var currentOffset: LongOffset = new LongOffset(-1)
+ /**
+ * Last offset that was discarded, or -1 if no commits have occurred. Note that the value
+ * -1 is used in calculations below and isn't just an arbitrary constant.
+ */
+ @GuardedBy("this")
+ protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)
+
def schema: StructType = encoder.schema
def toDS()(implicit sqlContext: SQLContext): Dataset[A] = {
@@ -85,21 +96,25 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
override def toString: String = s"MemoryStream[${Utils.truncatedString(output, ",")}]"
override def getOffset: Option[Offset] = synchronized {
- if (batches.isEmpty) {
+ if (currentOffset.offset == -1) {
None
} else {
Some(currentOffset)
}
}
- /**
- * Returns the data that is between the offsets (`start`, `end`].
- */
override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+ // Compute the internal batch numbers to fetch: [startOrdinal, endOrdinal)
val startOrdinal =
start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1
val endOrdinal = end.asInstanceOf[LongOffset].offset.toInt + 1
- val newBlocks = synchronized { batches.slice(startOrdinal, endOrdinal) }
+
+ // Internal buffer only holds the batches after lastCommittedOffset.
+ val newBlocks = synchronized {
+ val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1
+ val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1
+ batches.slice(sliceStart, sliceEnd)
+ }
logDebug(
s"MemoryBatch [$startOrdinal, $endOrdinal]: ${newBlocks.flatMap(_.collect()).mkString(", ")}")
@@ -111,11 +126,29 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
}
}
+ override def commit(end: Offset): Unit = synchronized {
+ end match {
+ case newOffset: LongOffset =>
+ val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
+
+ if (offsetDiff < 0) {
+ sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end")
+ }
+
+ batches.trimStart(offsetDiff)
+ lastOffsetCommitted = newOffset
+ case _ =>
+ sys.error(s"MemoryStream.commit() received an offset ($end) that did not originate with " +
+ "an instance of this class")
+ }
+ }
+
override def stop() {}
def reset(): Unit = synchronized {
batches.clear()
currentOffset = new LongOffset(-1)
+ lastOffsetCommitted = new LongOffset(-1)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala
index fb15239f9a..c662e7c6bc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala
@@ -24,14 +24,15 @@ import java.text.SimpleDateFormat
import java.util.Calendar
import javax.annotation.concurrent.GuardedBy
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.ListBuffer
import scala.util.{Failure, Success, Try}
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext}
+import org.apache.spark.sql._
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
+
object TextSocketSource {
val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) ::
@@ -53,8 +54,18 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo
@GuardedBy("this")
private var readThread: Thread = null
+ /**
+ * All batches from `lastCommittedOffset + 1` to `currentOffset`, inclusive.
+ * Stored in a ListBuffer to facilitate removing committed batches.
+ */
+ @GuardedBy("this")
+ protected val batches = new ListBuffer[(String, Timestamp)]
+
+ @GuardedBy("this")
+ protected var currentOffset: LongOffset = new LongOffset(-1)
+
@GuardedBy("this")
- private var lines = new ArrayBuffer[(String, Timestamp)]
+ protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)
initialize()
@@ -74,10 +85,12 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo
return
}
TextSocketSource.this.synchronized {
- lines += ((line,
+ val newData = (line,
Timestamp.valueOf(
TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime()))
- ))
+ )
+ currentOffset = currentOffset + 1
+ batches.append(newData)
}
}
} catch {
@@ -92,21 +105,54 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo
override def schema: StructType = if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP
else TextSocketSource.SCHEMA_REGULAR
- /** Returns the maximum available offset for this source. */
override def getOffset: Option[Offset] = synchronized {
- if (lines.isEmpty) None else Some(LongOffset(lines.size - 1))
+ if (currentOffset.offset == -1) {
+ None
+ } else {
+ Some(currentOffset)
+ }
}
/** Returns the data that is between the offsets (`start`, `end`]. */
override def getBatch(start: Option[Offset], end: Offset): DataFrame = synchronized {
- val startIdx = start.map(_.asInstanceOf[LongOffset].offset.toInt + 1).getOrElse(0)
- val endIdx = end.asInstanceOf[LongOffset].offset.toInt + 1
- val data = synchronized { lines.slice(startIdx, endIdx) }
+ val startOrdinal =
+ start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1
+ val endOrdinal = end.asInstanceOf[LongOffset].offset.toInt + 1
+
+ // Internal buffer only holds the batches after lastOffsetCommitted
+ val rawList = synchronized {
+ val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1
+ val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1
+ batches.slice(sliceStart, sliceEnd)
+ }
+
import sqlContext.implicits._
+ val rawBatch = sqlContext.createDataset(rawList)
+
+ // Underlying MemoryStream has schema (String, Timestamp); strip out the timestamp
+ // if requested.
if (includeTimestamp) {
- data.toDF("value", "timestamp")
+ rawBatch.toDF("value", "timestamp")
+ } else {
+ // Strip out timestamp
+ rawBatch.select("_1").toDF("value")
+ }
+ }
+
+ override def commit(end: Offset): Unit = synchronized {
+ if (end.isInstanceOf[LongOffset]) {
+ val newOffset = end.asInstanceOf[LongOffset]
+ val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
+
+ if (offsetDiff < 0) {
+ sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end")
+ }
+
+ batches.trimStart(offsetDiff)
+ lastOffsetCommitted = newOffset
} else {
- data.map(_._1).toDF("value")
+ sys.error(s"TextSocketStream.commit() received an offset ($end) that did not " +
+ s"originate with an instance of this class")
}
}
@@ -141,7 +187,7 @@ class TextSocketSourceProvider extends StreamSourceProvider with DataSourceRegis
providerName: String,
parameters: Map[String, String]): (String, StructType) = {
logWarning("The socket source should not be used for production applications! " +
- "It does not support recovery and stores state indefinitely.")
+ "It does not support recovery.")
if (!parameters.contains("host")) {
throw new AnalysisException("Set a host to read from with option(\"host\", ...).")
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 92020be978..dad410486e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -252,8 +252,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
val inputData = MemoryStream[Int]
val mapped = inputData.toDS().map(6 / _)
- // Run 3 batches, and then assert that only 1 metadata file is left at the end
- // since the first 2 should have been purged.
+ // Run 3 batches, and then assert that only 2 metadata files is are at the end
+ // since the first should have been purged.
testStream(mapped)(
AddData(inputData, 1, 2),
CheckAnswer(6, 3),
@@ -262,11 +262,11 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
AddData(inputData, 4, 6),
CheckAnswer(6, 3, 6, 3, 1, 1),
- AssertOnQuery("metadata log should contain only one file") { q =>
+ AssertOnQuery("metadata log should contain only two files") { q =>
val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toString)
val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName())
val toTest = logFileNames.filter(! _.endsWith(".crc")) // Workaround for SPARK-17475
- assert(toTest.size == 1 && toTest.head == "2")
+ assert(toTest.size == 2 && toTest.head == "1")
true
}
)