aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala60
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala8
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/metadata1
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/03
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/13
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/1.deltabin0 -> 46 bytes
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/2.deltabin0 -> 46 bytes
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/1.deltabin0 -> 79 bytes
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/2.deltabin0 -> 79 bytes
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/1.deltabin0 -> 79 bytes
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/2.deltabin0 -> 79 bytes
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/3/1.deltabin0 -> 73 bytes
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/3/2.deltabin0 -> 79 bytes
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/4/1.deltabin0 -> 79 bytes
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/4/2.deltabin0 -> 46 bytes
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/5/1.deltabin0 -> 46 bytes
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/5/2.deltabin0 -> 46 bytes
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/6/1.deltabin0 -> 46 bytes
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/6/2.deltabin0 -> 79 bytes
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/7/1.deltabin0 -> 46 bytes
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/7/2.deltabin0 -> 79 bytes
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/1.deltabin0 -> 46 bytes
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/2.deltabin0 -> 46 bytes
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/1.deltabin0 -> 46 bytes
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/2.deltabin0 -> 79 bytes
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala38
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala101
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala22
30 files changed, 207 insertions, 47 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
index e5a1997d6b..8249adab4b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.streaming
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization
-
/**
* An ordered collection of offsets, used to track the progress of processing data from one or more
* [[Source]]s that are present in a streaming query. This is similar to simplified, single-instance
@@ -70,8 +69,12 @@ object OffsetSeq {
* bound the lateness of data that will processed. Time unit: milliseconds
* @param batchTimestampMs: The current batch processing timestamp.
* Time unit: milliseconds
+ * @param conf: Additional conf_s to be persisted across batches, e.g. number of shuffle partitions.
*/
-case class OffsetSeqMetadata(var batchWatermarkMs: Long = 0, var batchTimestampMs: Long = 0) {
+case class OffsetSeqMetadata(
+ batchWatermarkMs: Long = 0,
+ batchTimestampMs: Long = 0,
+ conf: Map[String, String] = Map.empty) {
def json: String = Serialization.write(this)(OffsetSeqMetadata.format)
}
@@ -79,4 +82,3 @@ object OffsetSeqMetadata {
private implicit val format = Serialization.formats(NoTypeHints)
def apply(json: String): OffsetSeqMetadata = Serialization.read[OffsetSeqMetadata](json)
}
-
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 529263805c..40faddccc2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Curre
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.command.StreamingExplainCommand
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming._
import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
@@ -117,7 +118,9 @@ class StreamExecution(
}
/** Metadata associated with the offset seq of a batch in the query. */
- protected var offsetSeqMetadata = OffsetSeqMetadata()
+ protected var offsetSeqMetadata = OffsetSeqMetadata(batchWatermarkMs = 0, batchTimestampMs = 0,
+ conf = Map(SQLConf.SHUFFLE_PARTITIONS.key ->
+ sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS).toString))
override val id: UUID = UUID.fromString(streamMetadata.id)
@@ -256,6 +259,15 @@ class StreamExecution(
updateStatusMessage("Initializing sources")
// force initialization of the logical plan so that the sources can be created
logicalPlan
+
+ // Isolated spark session to run the batches with.
+ val sparkSessionToRunBatches = sparkSession.cloneSession()
+ // Adaptive execution can change num shuffle partitions, disallow
+ sparkSessionToRunBatches.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
+ offsetSeqMetadata = OffsetSeqMetadata(batchWatermarkMs = 0, batchTimestampMs = 0,
+ conf = Map(SQLConf.SHUFFLE_PARTITIONS.key ->
+ sparkSessionToRunBatches.conf.get(SQLConf.SHUFFLE_PARTITIONS.key)))
+
if (state.compareAndSet(INITIALIZING, ACTIVE)) {
// Unblock `awaitInitialization`
initializationLatch.countDown()
@@ -268,7 +280,7 @@ class StreamExecution(
reportTimeTaken("triggerExecution") {
if (currentBatchId < 0) {
// We'll do this initialization only once
- populateStartOffsets()
+ populateStartOffsets(sparkSessionToRunBatches)
logDebug(s"Stream running from $committedOffsets to $availableOffsets")
} else {
constructNextBatch()
@@ -276,7 +288,7 @@ class StreamExecution(
if (dataAvailable) {
currentStatus = currentStatus.copy(isDataAvailable = true)
updateStatusMessage("Processing new data")
- runBatch()
+ runBatch(sparkSessionToRunBatches)
}
}
@@ -381,13 +393,32 @@ class StreamExecution(
* - committedOffsets
* - availableOffsets
*/
- private def populateStartOffsets(): Unit = {
+ private def populateStartOffsets(sparkSessionToRunBatches: SparkSession): Unit = {
offsetLog.getLatest() match {
case Some((batchId, nextOffsets)) =>
logInfo(s"Resuming streaming query, starting with batch $batchId")
currentBatchId = batchId
availableOffsets = nextOffsets.toStreamProgress(sources)
- offsetSeqMetadata = nextOffsets.metadata.getOrElse(OffsetSeqMetadata())
+
+ // update offset metadata
+ nextOffsets.metadata.foreach { metadata =>
+ val shufflePartitionsSparkSession: Int =
+ sparkSessionToRunBatches.conf.get(SQLConf.SHUFFLE_PARTITIONS)
+ val shufflePartitionsToUse = metadata.conf.getOrElse(SQLConf.SHUFFLE_PARTITIONS.key, {
+ // For backward compatibility, if # partitions was not recorded in the offset log,
+ // then ensure it is not missing. The new value is picked up from the conf.
+ logWarning("Number of shuffle partitions from previous run not found in checkpoint. "
+ + s"Using the value from the conf, $shufflePartitionsSparkSession partitions.")
+ shufflePartitionsSparkSession
+ })
+ offsetSeqMetadata = OffsetSeqMetadata(
+ metadata.batchWatermarkMs, metadata.batchTimestampMs,
+ metadata.conf + (SQLConf.SHUFFLE_PARTITIONS.key -> shufflePartitionsToUse.toString))
+ // Update conf with correct number of shuffle partitions
+ sparkSessionToRunBatches.conf.set(
+ SQLConf.SHUFFLE_PARTITIONS.key, shufflePartitionsToUse.toString)
+ }
+
logDebug(s"Found possibly unprocessed offsets $availableOffsets " +
s"at batch timestamp ${offsetSeqMetadata.batchTimestampMs}")
@@ -444,8 +475,7 @@ class StreamExecution(
}
}
if (hasNewData) {
- // Current batch timestamp in milliseconds
- offsetSeqMetadata.batchTimestampMs = triggerClock.getTimeMillis()
+ var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs
// Update the eventTime watermark if we find one in the plan.
if (lastExecution != null) {
lastExecution.executedPlan.collect {
@@ -453,16 +483,19 @@ class StreamExecution(
logDebug(s"Observed event time stats: ${e.eventTimeStats.value}")
e.eventTimeStats.value.max - e.delayMs
}.headOption.foreach { newWatermarkMs =>
- if (newWatermarkMs > offsetSeqMetadata.batchWatermarkMs) {
+ if (newWatermarkMs > batchWatermarkMs) {
logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")
- offsetSeqMetadata.batchWatermarkMs = newWatermarkMs
+ batchWatermarkMs = newWatermarkMs
} else {
logDebug(
s"Event time didn't move: $newWatermarkMs < " +
- s"${offsetSeqMetadata.batchWatermarkMs}")
+ s"$batchWatermarkMs")
}
}
}
+ offsetSeqMetadata = offsetSeqMetadata.copy(
+ batchWatermarkMs = batchWatermarkMs,
+ batchTimestampMs = triggerClock.getTimeMillis()) // Current batch timestamp in milliseconds
updateStatusMessage("Writing offsets to log")
reportTimeTaken("walCommit") {
@@ -505,8 +538,9 @@ class StreamExecution(
/**
* Processes any data available between `availableOffsets` and `committedOffsets`.
+ * @param sparkSessionToRunBatch Isolated [[SparkSession]] to run this batch with.
*/
- private def runBatch(): Unit = {
+ private def runBatch(sparkSessionToRunBatch: SparkSession): Unit = {
// Request unprocessed data from all sources.
newData = reportTimeTaken("getBatch") {
availableOffsets.flatMap {
@@ -551,7 +585,7 @@ class StreamExecution(
reportTimeTaken("queryPlanning") {
lastExecution = new IncrementalExecution(
- sparkSession,
+ sparkSessionToRunBatch,
triggerLogicalPlan,
outputMode,
checkpointFile("state"),
@@ -561,7 +595,7 @@ class StreamExecution(
}
val nextBatch =
- new Dataset(sparkSession, lastExecution, RowEncoder(lastExecution.analyzed.schema))
+ new Dataset(sparkSessionToRunBatch, lastExecution, RowEncoder(lastExecution.analyzed.schema))
reportTimeTaken("addBatch") {
sink.addBatch(currentBatchId, nextBatch)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 38edb40dfb..7810d9f6e9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -25,6 +25,7 @@ import scala.collection.mutable
import org.apache.hadoop.fs.Path
import org.apache.spark.annotation.{Experimental, InterfaceStability}
+import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
import org.apache.spark.sql.execution.streaming._
@@ -40,7 +41,7 @@ import org.apache.spark.util.{Clock, SystemClock, Utils}
*/
@Experimental
@InterfaceStability.Evolving
-class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
+class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Logging {
private[sql] val stateStoreCoordinator =
StateStoreCoordinatorRef.forDriver(sparkSession.sparkContext.env)
@@ -234,9 +235,8 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
}
if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
- throw new AnalysisException(
- s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} " +
- "is not supported in streaming DataFrames/Datasets")
+ logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} " +
+ "is not supported in streaming DataFrames/Datasets and will be disabled.")
}
new StreamingQueryWrapper(new StreamExecution(
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/metadata b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/metadata
new file mode 100644
index 0000000000..3492220e36
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/metadata
@@ -0,0 +1 @@
+{"id":"dddc5e7f-1e71-454c-8362-de184444fb5a"} \ No newline at end of file
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/0 b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/0
new file mode 100644
index 0000000000..cbde042e79
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/0
@@ -0,0 +1,3 @@
+v1
+{"batchWatermarkMs":0,"batchTimestampMs":1489180207737}
+0 \ No newline at end of file
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/1 b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/1
new file mode 100644
index 0000000000..10b5774746
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/1
@@ -0,0 +1,3 @@
+v1
+{"batchWatermarkMs":0,"batchTimestampMs":1489180209261}
+2 \ No newline at end of file
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/1.delta
new file mode 100644
index 0000000000..6352978051
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/1.delta
Binary files differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/2.delta
new file mode 100644
index 0000000000..6352978051
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/2.delta
Binary files differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/1.delta
new file mode 100644
index 0000000000..7dc49cb3e4
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/1.delta
Binary files differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/2.delta
new file mode 100644
index 0000000000..8b566e81f4
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/2.delta
Binary files differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/1.delta
new file mode 100644
index 0000000000..ca2a7ed033
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/1.delta
Binary files differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/2.delta
new file mode 100644
index 0000000000..361f2db605
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/2.delta
Binary files differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/3/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/3/1.delta
new file mode 100644
index 0000000000..4c8804c61a
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/3/1.delta
Binary files differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/3/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/3/2.delta
new file mode 100644
index 0000000000..7d3e07fe03
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/3/2.delta
Binary files differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/4/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/4/1.delta
new file mode 100644
index 0000000000..fe521b8c07
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/4/1.delta
Binary files differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/4/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/4/2.delta
new file mode 100644
index 0000000000..6352978051
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/4/2.delta
Binary files differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/5/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/5/1.delta
new file mode 100644
index 0000000000..6352978051
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/5/1.delta
Binary files differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/5/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/5/2.delta
new file mode 100644
index 0000000000..6352978051
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/5/2.delta
Binary files differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/6/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/6/1.delta
new file mode 100644
index 0000000000..6352978051
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/6/1.delta
Binary files differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/6/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/6/2.delta
new file mode 100644
index 0000000000..e69925caba
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/6/2.delta
Binary files differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/7/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/7/1.delta
new file mode 100644
index 0000000000..6352978051
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/7/1.delta
Binary files differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/7/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/7/2.delta
new file mode 100644
index 0000000000..36397a3dda
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/7/2.delta
Binary files differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/1.delta
new file mode 100644
index 0000000000..6352978051
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/1.delta
Binary files differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/2.delta
new file mode 100644
index 0000000000..6352978051
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/2.delta
Binary files differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/1.delta
new file mode 100644
index 0000000000..6352978051
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/1.delta
Binary files differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/2.delta
new file mode 100644
index 0000000000..0c9b6ac5c8
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/2.delta
Binary files differ
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
index f7f0dade87..dc556322be 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
@@ -21,6 +21,7 @@ import java.io.File
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.util.stringToFile
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext {
@@ -29,12 +30,37 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext {
case class StringOffset(override val json: String) extends Offset
test("OffsetSeqMetadata - deserialization") {
- assert(OffsetSeqMetadata(0, 0) === OffsetSeqMetadata("""{}"""))
- assert(OffsetSeqMetadata(1, 0) === OffsetSeqMetadata("""{"batchWatermarkMs":1}"""))
- assert(OffsetSeqMetadata(0, 2) === OffsetSeqMetadata("""{"batchTimestampMs":2}"""))
- assert(
- OffsetSeqMetadata(1, 2) ===
- OffsetSeqMetadata("""{"batchWatermarkMs":1,"batchTimestampMs":2}"""))
+ val key = SQLConf.SHUFFLE_PARTITIONS.key
+
+ def getConfWith(shufflePartitions: Int): Map[String, String] = {
+ Map(key -> shufflePartitions.toString)
+ }
+
+ // None set
+ assert(OffsetSeqMetadata(0, 0, Map.empty) === OffsetSeqMetadata("""{}"""))
+
+ // One set
+ assert(OffsetSeqMetadata(1, 0, Map.empty) === OffsetSeqMetadata("""{"batchWatermarkMs":1}"""))
+ assert(OffsetSeqMetadata(0, 2, Map.empty) === OffsetSeqMetadata("""{"batchTimestampMs":2}"""))
+ assert(OffsetSeqMetadata(0, 0, getConfWith(shufflePartitions = 2)) ===
+ OffsetSeqMetadata(s"""{"conf": {"$key":2}}"""))
+
+ // Two set
+ assert(OffsetSeqMetadata(1, 2, Map.empty) ===
+ OffsetSeqMetadata("""{"batchWatermarkMs":1,"batchTimestampMs":2}"""))
+ assert(OffsetSeqMetadata(1, 0, getConfWith(shufflePartitions = 3)) ===
+ OffsetSeqMetadata(s"""{"batchWatermarkMs":1,"conf": {"$key":3}}"""))
+ assert(OffsetSeqMetadata(0, 2, getConfWith(shufflePartitions = 3)) ===
+ OffsetSeqMetadata(s"""{"batchTimestampMs":2,"conf": {"$key":3}}"""))
+
+ // All set
+ assert(OffsetSeqMetadata(1, 2, getConfWith(shufflePartitions = 3)) ===
+ OffsetSeqMetadata(s"""{"batchWatermarkMs":1,"batchTimestampMs":2,"conf": {"$key":3}}"""))
+
+ // Drop unknown fields
+ assert(OffsetSeqMetadata(1, 2, getConfWith(shufflePartitions = 3)) ===
+ OffsetSeqMetadata(
+ s"""{"batchWatermarkMs":1,"batchTimestampMs":2,"conf": {"$key":3}},"unknown":1"""))
}
test("OffsetSeqLog - serialization - deserialization") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 6dfcd8baba..e867fc40f7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -17,17 +17,20 @@
package org.apache.spark.sql.streaming
-import java.io.{InterruptedIOException, IOException}
+import java.io.{File, InterruptedIOException, IOException}
import java.util.concurrent.{CountDownLatch, TimeoutException, TimeUnit}
import scala.reflect.ClassTag
import scala.util.control.ControlThrowable
+import org.apache.commons.io.FileUtils
+
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.StreamSourceProvider
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
@@ -389,6 +392,102 @@ class StreamSuite extends StreamTest {
query.stop()
assert(query.exception.isEmpty)
}
+
+ test("SPARK-19873: streaming aggregation with change in number of partitions") {
+ val inputData = MemoryStream[(Int, Int)]
+ val agg = inputData.toDS().groupBy("_1").count()
+
+ testStream(agg, OutputMode.Complete())(
+ AddData(inputData, (1, 0), (2, 0)),
+ StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "2")),
+ CheckAnswer((1, 1), (2, 1)),
+ StopStream,
+ AddData(inputData, (3, 0), (2, 0)),
+ StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "5")),
+ CheckAnswer((1, 1), (2, 2), (3, 1)),
+ StopStream,
+ AddData(inputData, (3, 0), (1, 0)),
+ StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "1")),
+ CheckAnswer((1, 2), (2, 2), (3, 2)))
+ }
+
+ test("recover from a Spark v2.1 checkpoint") {
+ var inputData: MemoryStream[Int] = null
+ var query: DataStreamWriter[Row] = null
+
+ def prepareMemoryStream(): Unit = {
+ inputData = MemoryStream[Int]
+ inputData.addData(1, 2, 3, 4)
+ inputData.addData(3, 4, 5, 6)
+ inputData.addData(5, 6, 7, 8)
+
+ query = inputData
+ .toDF()
+ .groupBy($"value")
+ .agg(count("*"))
+ .writeStream
+ .outputMode("complete")
+ .format("memory")
+ }
+
+ // Get an existing checkpoint generated by Spark v2.1.
+ // v2.1 does not record # shuffle partitions in the offset metadata.
+ val resourceUri =
+ this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
+ val checkpointDir = new File(resourceUri)
+
+ // 1 - Test if recovery from the checkpoint is successful.
+ prepareMemoryStream()
+ withTempDir { dir =>
+ // Copy the checkpoint to a temp dir to prevent changes to the original.
+ // Not doing this will lead to the test passing on the first run, but fail subsequent runs.
+ FileUtils.copyDirectory(checkpointDir, dir)
+
+ // Checkpoint data was generated by a query with 10 shuffle partitions.
+ // In order to test reading from the checkpoint, the checkpoint must have two or more batches,
+ // since the last batch may be rerun.
+ withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
+ var streamingQuery: StreamingQuery = null
+ try {
+ streamingQuery =
+ query.queryName("counts").option("checkpointLocation", dir.getCanonicalPath).start()
+ streamingQuery.processAllAvailable()
+ inputData.addData(9)
+ streamingQuery.processAllAvailable()
+
+ QueryTest.checkAnswer(spark.table("counts").toDF(),
+ Row("1", 1) :: Row("2", 1) :: Row("3", 2) :: Row("4", 2) ::
+ Row("5", 2) :: Row("6", 2) :: Row("7", 1) :: Row("8", 1) :: Row("9", 1) :: Nil)
+ } finally {
+ if (streamingQuery ne null) {
+ streamingQuery.stop()
+ }
+ }
+ }
+ }
+
+ // 2 - Check recovery with wrong num shuffle partitions
+ prepareMemoryStream()
+ withTempDir { dir =>
+ FileUtils.copyDirectory(checkpointDir, dir)
+
+ // Since the number of partitions is greater than 10, should throw exception.
+ withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "15") {
+ var streamingQuery: StreamingQuery = null
+ try {
+ intercept[StreamingQueryException] {
+ streamingQuery =
+ query.queryName("badQuery").option("checkpointLocation", dir.getCanonicalPath).start()
+ streamingQuery.processAllAvailable()
+ }
+ } finally {
+ if (streamingQuery ne null) {
+ streamingQuery.stop()
+ }
+ }
+ }
+ }
+ }
}
abstract class FakeSource extends StreamSourceProvider {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
index f05e9d1fda..b49efa6890 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
@@ -239,16 +239,6 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
}
}
- test("SPARK-19268: Adaptive query execution should be disallowed") {
- withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
- val e = intercept[AnalysisException] {
- MemoryStream[Int].toDS.writeStream.queryName("test-query").format("memory").start()
- }
- assert(e.getMessage.contains(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key) &&
- e.getMessage.contains("not supported"))
- }
- }
-
/** Run a body of code by defining a query on each dataset */
private def withQueriesOn(datasets: Dataset[_]*)(body: Seq[StreamingQuery] => Unit): Unit = {
failAfter(streamingTimeout) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index f61dcdcbcf..341ab0eb92 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit
import scala.concurrent.duration._
import org.apache.hadoop.fs.Path
+import org.mockito.Matchers.{any, eq => meq}
import org.mockito.Mockito._
import org.scalatest.BeforeAndAfter
@@ -370,21 +371,22 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
.option("checkpointLocation", checkpointLocationURI.toString)
.trigger(ProcessingTime(10.seconds))
.start()
+ q.processAllAvailable()
q.stop()
verify(LastOptions.mockStreamSourceProvider).createSource(
- spark.sqlContext,
- s"$checkpointLocationURI/sources/0",
- None,
- "org.apache.spark.sql.streaming.test",
- Map.empty)
+ any(),
+ meq(s"$checkpointLocationURI/sources/0"),
+ meq(None),
+ meq("org.apache.spark.sql.streaming.test"),
+ meq(Map.empty))
verify(LastOptions.mockStreamSourceProvider).createSource(
- spark.sqlContext,
- s"$checkpointLocationURI/sources/1",
- None,
- "org.apache.spark.sql.streaming.test",
- Map.empty)
+ any(),
+ meq(s"$checkpointLocationURI/sources/1"),
+ meq(None),
+ meq("org.apache.spark.sql.streaming.test"),
+ meq(Map.empty))
}
private def newTextInput = Utils.createTempDir(namePrefix = "text").getCanonicalPath