aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala22
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala32
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala47
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala72
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala8
6 files changed, 154 insertions, 36 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 115edf7ab2..a392b82999 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -176,6 +176,15 @@ class FileStreamSource(
override def toString: String = s"FileStreamSource[$qualifiedBasePath]"
+ /**
+ * Informs the source that Spark has completed processing all data for offsets less than or
+ * equal to `end` and will only request offsets greater than `end` in the future.
+ */
+ override def commit(end: Offset): Unit = {
+ // No-op for now; FileStreamSource currently garbage-collects files based on timestamp
+ // and the value of the maxFileAge parameter.
+ }
+
override def stop() {}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
index 971147840d..f3bd5bfe23 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
@@ -30,16 +30,30 @@ trait Source {
/** Returns the schema of the data from this source */
def schema: StructType
- /** Returns the maximum available offset for this source. */
+ /**
+ * Returns the maximum available offset for this source.
+ * Returns `None` if this source has never received any data.
+ */
def getOffset: Option[Offset]
/**
- * Returns the data that is between the offsets (`start`, `end`]. When `start` is `None` then
- * the batch should begin with the first available record. This method must always return the
- * same data for a particular `start` and `end` pair.
+ * Returns the data that is between the offsets (`start`, `end`]. When `start` is `None`,
+ * then the batch should begin with the first record. This method must always return the
+ * same data for a particular `start` and `end` pair; even after the Source has been restarted
+ * on a different node.
+ *
+ * Higher layers will always call this method with a value of `start` greater than or equal
+ * to the last value passed to `commit` and a value of `end` less than or equal to the
+ * last value returned by `getOffset`
*/
def getBatch(start: Option[Offset], end: Offset): DataFrame
+ /**
+ * Informs the source that Spark has completed processing all data for offsets less than or
+ * equal to `end` and will only request offsets greater than `end` in the future.
+ */
+ def commit(end: Offset) : Unit = {}
+
/** Stop this source and free any resources it has allocated. */
def stop(): Unit
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index ba8cf808e3..37af1a550a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -73,6 +73,9 @@ class StreamExecution(
/**
* Tracks how much data we have processed and committed to the sink or state store from each
* input source.
+ * Only the scheduler thread should modify this field, and only in atomic steps.
+ * Other threads should make a shallow copy if they are going to access this field more than
+ * once, since the field's value may change at any time.
*/
@volatile
var committedOffsets = new StreamProgress
@@ -80,6 +83,9 @@ class StreamExecution(
/**
* Tracks the offsets that are available to be processed, but have not yet be committed to the
* sink.
+ * Only the scheduler thread should modify this field, and only in atomic steps.
+ * Other threads should make a shallow copy if they are going to access this field more than
+ * once, since the field's value may change at any time.
*/
@volatile
private var availableOffsets = new StreamProgress
@@ -337,17 +343,27 @@ class StreamExecution(
}
if (hasNewData) {
reportTimeTaken(OFFSET_WAL_WRITE_LATENCY) {
- assert(
- offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)),
+ assert(offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)),
s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
logInfo(s"Committed offsets for batch $currentBatchId.")
+ // NOTE: The following code is correct because runBatches() processes exactly one
+ // batch at a time. If we add pipeline parallelism (multiple batches in flight at
+ // the same time), this cleanup logic will need to change.
+
+ // Now that we've updated the scheduler's persistent checkpoint, it is safe for the
+ // sources to discard data from the previous batch.
+ val prevBatchOff = offsetLog.get(currentBatchId - 1)
+ if (prevBatchOff.isDefined) {
+ prevBatchOff.get.toStreamProgress(sources).foreach {
+ case (src, off) => src.commit(off)
+ }
+ }
+
// Now that we have logged the new batch, no further processing will happen for
- // the previous batch, and it is safe to discard the old metadata.
- // Note that purge is exclusive, i.e. it purges everything before currentBatchId.
- // NOTE: If StreamExecution implements pipeline parallelism (multiple batches in
- // flight at the same time), this cleanup logic will need to change.
- offsetLog.purge(currentBatchId)
+ // the batch before the previous batch, and it is safe to discard the old metadata.
+ // Note that purge is exclusive, i.e. it purges everything before the target ID.
+ offsetLog.purge(currentBatchId - 1)
}
} else {
awaitBatchLock.lock()
@@ -455,7 +471,7 @@ class StreamExecution(
/**
* Blocks the current thread until processing for data from the given `source` has reached at
- * least the given `Offset`. This method is indented for use primarily when writing tests.
+ * least the given `Offset`. This method is intended for use primarily when writing tests.
*/
private[sql] def awaitOffset(source: Source, newOffset: Offset): Unit = {
def notDone = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index 788fcd0361..48d9791faf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.streaming
import java.util.concurrent.atomic.AtomicInteger
import javax.annotation.concurrent.GuardedBy
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.util.control.NonFatal
import org.apache.spark.internal.Logging
@@ -51,12 +51,23 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
protected val logicalPlan = StreamingExecutionRelation(this)
protected val output = logicalPlan.output
+ /**
+ * All batches from `lastCommittedOffset + 1` to `currentOffset`, inclusive.
+ * Stored in a ListBuffer to facilitate removing committed batches.
+ */
@GuardedBy("this")
- protected val batches = new ArrayBuffer[Dataset[A]]
+ protected val batches = new ListBuffer[Dataset[A]]
@GuardedBy("this")
protected var currentOffset: LongOffset = new LongOffset(-1)
+ /**
+ * Last offset that was discarded, or -1 if no commits have occurred. Note that the value
+ * -1 is used in calculations below and isn't just an arbitrary constant.
+ */
+ @GuardedBy("this")
+ protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)
+
def schema: StructType = encoder.schema
def toDS()(implicit sqlContext: SQLContext): Dataset[A] = {
@@ -85,21 +96,25 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
override def toString: String = s"MemoryStream[${Utils.truncatedString(output, ",")}]"
override def getOffset: Option[Offset] = synchronized {
- if (batches.isEmpty) {
+ if (currentOffset.offset == -1) {
None
} else {
Some(currentOffset)
}
}
- /**
- * Returns the data that is between the offsets (`start`, `end`].
- */
override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+ // Compute the internal batch numbers to fetch: [startOrdinal, endOrdinal)
val startOrdinal =
start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1
val endOrdinal = end.asInstanceOf[LongOffset].offset.toInt + 1
- val newBlocks = synchronized { batches.slice(startOrdinal, endOrdinal) }
+
+ // Internal buffer only holds the batches after lastCommittedOffset.
+ val newBlocks = synchronized {
+ val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1
+ val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1
+ batches.slice(sliceStart, sliceEnd)
+ }
logDebug(
s"MemoryBatch [$startOrdinal, $endOrdinal]: ${newBlocks.flatMap(_.collect()).mkString(", ")}")
@@ -111,11 +126,29 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
}
}
+ override def commit(end: Offset): Unit = synchronized {
+ end match {
+ case newOffset: LongOffset =>
+ val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
+
+ if (offsetDiff < 0) {
+ sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end")
+ }
+
+ batches.trimStart(offsetDiff)
+ lastOffsetCommitted = newOffset
+ case _ =>
+ sys.error(s"MemoryStream.commit() received an offset ($end) that did not originate with " +
+ "an instance of this class")
+ }
+ }
+
override def stop() {}
def reset(): Unit = synchronized {
batches.clear()
currentOffset = new LongOffset(-1)
+ lastOffsetCommitted = new LongOffset(-1)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala
index fb15239f9a..c662e7c6bc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala
@@ -24,14 +24,15 @@ import java.text.SimpleDateFormat
import java.util.Calendar
import javax.annotation.concurrent.GuardedBy
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.ListBuffer
import scala.util.{Failure, Success, Try}
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext}
+import org.apache.spark.sql._
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
+
object TextSocketSource {
val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) ::
@@ -53,8 +54,18 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo
@GuardedBy("this")
private var readThread: Thread = null
+ /**
+ * All batches from `lastCommittedOffset + 1` to `currentOffset`, inclusive.
+ * Stored in a ListBuffer to facilitate removing committed batches.
+ */
+ @GuardedBy("this")
+ protected val batches = new ListBuffer[(String, Timestamp)]
+
+ @GuardedBy("this")
+ protected var currentOffset: LongOffset = new LongOffset(-1)
+
@GuardedBy("this")
- private var lines = new ArrayBuffer[(String, Timestamp)]
+ protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)
initialize()
@@ -74,10 +85,12 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo
return
}
TextSocketSource.this.synchronized {
- lines += ((line,
+ val newData = (line,
Timestamp.valueOf(
TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime()))
- ))
+ )
+ currentOffset = currentOffset + 1
+ batches.append(newData)
}
}
} catch {
@@ -92,21 +105,54 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo
override def schema: StructType = if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP
else TextSocketSource.SCHEMA_REGULAR
- /** Returns the maximum available offset for this source. */
override def getOffset: Option[Offset] = synchronized {
- if (lines.isEmpty) None else Some(LongOffset(lines.size - 1))
+ if (currentOffset.offset == -1) {
+ None
+ } else {
+ Some(currentOffset)
+ }
}
/** Returns the data that is between the offsets (`start`, `end`]. */
override def getBatch(start: Option[Offset], end: Offset): DataFrame = synchronized {
- val startIdx = start.map(_.asInstanceOf[LongOffset].offset.toInt + 1).getOrElse(0)
- val endIdx = end.asInstanceOf[LongOffset].offset.toInt + 1
- val data = synchronized { lines.slice(startIdx, endIdx) }
+ val startOrdinal =
+ start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1
+ val endOrdinal = end.asInstanceOf[LongOffset].offset.toInt + 1
+
+ // Internal buffer only holds the batches after lastOffsetCommitted
+ val rawList = synchronized {
+ val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1
+ val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1
+ batches.slice(sliceStart, sliceEnd)
+ }
+
import sqlContext.implicits._
+ val rawBatch = sqlContext.createDataset(rawList)
+
+ // Underlying MemoryStream has schema (String, Timestamp); strip out the timestamp
+ // if requested.
if (includeTimestamp) {
- data.toDF("value", "timestamp")
+ rawBatch.toDF("value", "timestamp")
+ } else {
+ // Strip out timestamp
+ rawBatch.select("_1").toDF("value")
+ }
+ }
+
+ override def commit(end: Offset): Unit = synchronized {
+ if (end.isInstanceOf[LongOffset]) {
+ val newOffset = end.asInstanceOf[LongOffset]
+ val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
+
+ if (offsetDiff < 0) {
+ sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end")
+ }
+
+ batches.trimStart(offsetDiff)
+ lastOffsetCommitted = newOffset
} else {
- data.map(_._1).toDF("value")
+ sys.error(s"TextSocketStream.commit() received an offset ($end) that did not " +
+ s"originate with an instance of this class")
}
}
@@ -141,7 +187,7 @@ class TextSocketSourceProvider extends StreamSourceProvider with DataSourceRegis
providerName: String,
parameters: Map[String, String]): (String, StructType) = {
logWarning("The socket source should not be used for production applications! " +
- "It does not support recovery and stores state indefinitely.")
+ "It does not support recovery.")
if (!parameters.contains("host")) {
throw new AnalysisException("Set a host to read from with option(\"host\", ...).")
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 92020be978..dad410486e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -252,8 +252,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
val inputData = MemoryStream[Int]
val mapped = inputData.toDS().map(6 / _)
- // Run 3 batches, and then assert that only 1 metadata file is left at the end
- // since the first 2 should have been purged.
+ // Run 3 batches, and then assert that only 2 metadata files is are at the end
+ // since the first should have been purged.
testStream(mapped)(
AddData(inputData, 1, 2),
CheckAnswer(6, 3),
@@ -262,11 +262,11 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
AddData(inputData, 4, 6),
CheckAnswer(6, 3, 6, 3, 1, 1),
- AssertOnQuery("metadata log should contain only one file") { q =>
+ AssertOnQuery("metadata log should contain only two files") { q =>
val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toString)
val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName())
val toTest = logFileNames.filter(! _.endsWith(".crc")) // Workaround for SPARK-17475
- assert(toTest.size == 1 && toTest.head == "2")
+ assert(toTest.size == 2 && toTest.head == "1")
true
}
)