diff options
Diffstat (limited to 'sql/core')
30 files changed, 207 insertions, 47 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala index e5a1997d6b..8249adab4b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.streaming import org.json4s.NoTypeHints import org.json4s.jackson.Serialization - /** * An ordered collection of offsets, used to track the progress of processing data from one or more * [[Source]]s that are present in a streaming query. This is similar to simplified, single-instance @@ -70,8 +69,12 @@ object OffsetSeq { * bound the lateness of data that will processed. Time unit: milliseconds * @param batchTimestampMs: The current batch processing timestamp. * Time unit: milliseconds + * @param conf: Additional conf_s to be persisted across batches, e.g. number of shuffle partitions. */ -case class OffsetSeqMetadata(var batchWatermarkMs: Long = 0, var batchTimestampMs: Long = 0) { +case class OffsetSeqMetadata( + batchWatermarkMs: Long = 0, + batchTimestampMs: Long = 0, + conf: Map[String, String] = Map.empty) { def json: String = Serialization.write(this)(OffsetSeqMetadata.format) } @@ -79,4 +82,3 @@ object OffsetSeqMetadata { private implicit val format = Serialization.formats(NoTypeHints) def apply(json: String): OffsetSeqMetadata = Serialization.read[OffsetSeqMetadata](json) } - diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 529263805c..40faddccc2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Curre import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.command.StreamingExplainCommand +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming._ import org.apache.spark.util.{Clock, UninterruptibleThread, Utils} @@ -117,7 +118,9 @@ class StreamExecution( } /** Metadata associated with the offset seq of a batch in the query. */ - protected var offsetSeqMetadata = OffsetSeqMetadata() + protected var offsetSeqMetadata = OffsetSeqMetadata(batchWatermarkMs = 0, batchTimestampMs = 0, + conf = Map(SQLConf.SHUFFLE_PARTITIONS.key -> + sparkSession.conf.get(SQLConf.SHUFFLE_PARTITIONS).toString)) override val id: UUID = UUID.fromString(streamMetadata.id) @@ -256,6 +259,15 @@ class StreamExecution( updateStatusMessage("Initializing sources") // force initialization of the logical plan so that the sources can be created logicalPlan + + // Isolated spark session to run the batches with. + val sparkSessionToRunBatches = sparkSession.cloneSession() + // Adaptive execution can change num shuffle partitions, disallow + sparkSessionToRunBatches.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false") + offsetSeqMetadata = OffsetSeqMetadata(batchWatermarkMs = 0, batchTimestampMs = 0, + conf = Map(SQLConf.SHUFFLE_PARTITIONS.key -> + sparkSessionToRunBatches.conf.get(SQLConf.SHUFFLE_PARTITIONS.key))) + if (state.compareAndSet(INITIALIZING, ACTIVE)) { // Unblock `awaitInitialization` initializationLatch.countDown() @@ -268,7 +280,7 @@ class StreamExecution( reportTimeTaken("triggerExecution") { if (currentBatchId < 0) { // We'll do this initialization only once - populateStartOffsets() + populateStartOffsets(sparkSessionToRunBatches) logDebug(s"Stream running from $committedOffsets to $availableOffsets") } else { constructNextBatch() @@ -276,7 +288,7 @@ class StreamExecution( if (dataAvailable) { currentStatus = currentStatus.copy(isDataAvailable = true) updateStatusMessage("Processing new data") - runBatch() + runBatch(sparkSessionToRunBatches) } } @@ -381,13 +393,32 @@ class StreamExecution( * - committedOffsets * - availableOffsets */ - private def populateStartOffsets(): Unit = { + private def populateStartOffsets(sparkSessionToRunBatches: SparkSession): Unit = { offsetLog.getLatest() match { case Some((batchId, nextOffsets)) => logInfo(s"Resuming streaming query, starting with batch $batchId") currentBatchId = batchId availableOffsets = nextOffsets.toStreamProgress(sources) - offsetSeqMetadata = nextOffsets.metadata.getOrElse(OffsetSeqMetadata()) + + // update offset metadata + nextOffsets.metadata.foreach { metadata => + val shufflePartitionsSparkSession: Int = + sparkSessionToRunBatches.conf.get(SQLConf.SHUFFLE_PARTITIONS) + val shufflePartitionsToUse = metadata.conf.getOrElse(SQLConf.SHUFFLE_PARTITIONS.key, { + // For backward compatibility, if # partitions was not recorded in the offset log, + // then ensure it is not missing. The new value is picked up from the conf. + logWarning("Number of shuffle partitions from previous run not found in checkpoint. " + + s"Using the value from the conf, $shufflePartitionsSparkSession partitions.") + shufflePartitionsSparkSession + }) + offsetSeqMetadata = OffsetSeqMetadata( + metadata.batchWatermarkMs, metadata.batchTimestampMs, + metadata.conf + (SQLConf.SHUFFLE_PARTITIONS.key -> shufflePartitionsToUse.toString)) + // Update conf with correct number of shuffle partitions + sparkSessionToRunBatches.conf.set( + SQLConf.SHUFFLE_PARTITIONS.key, shufflePartitionsToUse.toString) + } + logDebug(s"Found possibly unprocessed offsets $availableOffsets " + s"at batch timestamp ${offsetSeqMetadata.batchTimestampMs}") @@ -444,8 +475,7 @@ class StreamExecution( } } if (hasNewData) { - // Current batch timestamp in milliseconds - offsetSeqMetadata.batchTimestampMs = triggerClock.getTimeMillis() + var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs // Update the eventTime watermark if we find one in the plan. if (lastExecution != null) { lastExecution.executedPlan.collect { @@ -453,16 +483,19 @@ class StreamExecution( logDebug(s"Observed event time stats: ${e.eventTimeStats.value}") e.eventTimeStats.value.max - e.delayMs }.headOption.foreach { newWatermarkMs => - if (newWatermarkMs > offsetSeqMetadata.batchWatermarkMs) { + if (newWatermarkMs > batchWatermarkMs) { logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms") - offsetSeqMetadata.batchWatermarkMs = newWatermarkMs + batchWatermarkMs = newWatermarkMs } else { logDebug( s"Event time didn't move: $newWatermarkMs < " + - s"${offsetSeqMetadata.batchWatermarkMs}") + s"$batchWatermarkMs") } } } + offsetSeqMetadata = offsetSeqMetadata.copy( + batchWatermarkMs = batchWatermarkMs, + batchTimestampMs = triggerClock.getTimeMillis()) // Current batch timestamp in milliseconds updateStatusMessage("Writing offsets to log") reportTimeTaken("walCommit") { @@ -505,8 +538,9 @@ class StreamExecution( /** * Processes any data available between `availableOffsets` and `committedOffsets`. + * @param sparkSessionToRunBatch Isolated [[SparkSession]] to run this batch with. */ - private def runBatch(): Unit = { + private def runBatch(sparkSessionToRunBatch: SparkSession): Unit = { // Request unprocessed data from all sources. newData = reportTimeTaken("getBatch") { availableOffsets.flatMap { @@ -551,7 +585,7 @@ class StreamExecution( reportTimeTaken("queryPlanning") { lastExecution = new IncrementalExecution( - sparkSession, + sparkSessionToRunBatch, triggerLogicalPlan, outputMode, checkpointFile("state"), @@ -561,7 +595,7 @@ class StreamExecution( } val nextBatch = - new Dataset(sparkSession, lastExecution, RowEncoder(lastExecution.analyzed.schema)) + new Dataset(sparkSessionToRunBatch, lastExecution, RowEncoder(lastExecution.analyzed.schema)) reportTimeTaken("addBatch") { sink.addBatch(currentBatchId, nextBatch) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index 38edb40dfb..7810d9f6e9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -25,6 +25,7 @@ import scala.collection.mutable import org.apache.hadoop.fs.Path import org.apache.spark.annotation.{Experimental, InterfaceStability} +import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession} import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker import org.apache.spark.sql.execution.streaming._ @@ -40,7 +41,7 @@ import org.apache.spark.util.{Clock, SystemClock, Utils} */ @Experimental @InterfaceStability.Evolving -class StreamingQueryManager private[sql] (sparkSession: SparkSession) { +class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Logging { private[sql] val stateStoreCoordinator = StateStoreCoordinatorRef.forDriver(sparkSession.sparkContext.env) @@ -234,9 +235,8 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) { } if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) { - throw new AnalysisException( - s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} " + - "is not supported in streaming DataFrames/Datasets") + logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} " + + "is not supported in streaming DataFrames/Datasets and will be disabled.") } new StreamingQueryWrapper(new StreamExecution( diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/metadata b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/metadata new file mode 100644 index 0000000000..3492220e36 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/metadata @@ -0,0 +1 @@ +{"id":"dddc5e7f-1e71-454c-8362-de184444fb5a"}
\ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/0 b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/0 new file mode 100644 index 0000000000..cbde042e79 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/0 @@ -0,0 +1,3 @@ +v1 +{"batchWatermarkMs":0,"batchTimestampMs":1489180207737} +0
\ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/1 b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/1 new file mode 100644 index 0000000000..10b5774746 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/1 @@ -0,0 +1,3 @@ +v1 +{"batchWatermarkMs":0,"batchTimestampMs":1489180209261} +2
\ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/1.delta Binary files differnew file mode 100644 index 0000000000..6352978051 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/1.delta diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/2.delta Binary files differnew file mode 100644 index 0000000000..6352978051 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/2.delta diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/1.delta Binary files differnew file mode 100644 index 0000000000..7dc49cb3e4 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/1.delta diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/2.delta Binary files differnew file mode 100644 index 0000000000..8b566e81f4 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/2.delta diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/1.delta Binary files differnew file mode 100644 index 0000000000..ca2a7ed033 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/1.delta diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/2.delta Binary files differnew file mode 100644 index 0000000000..361f2db605 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/2.delta diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/3/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/3/1.delta Binary files differnew file mode 100644 index 0000000000..4c8804c61a --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/3/1.delta diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/3/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/3/2.delta Binary files differnew file mode 100644 index 0000000000..7d3e07fe03 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/3/2.delta diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/4/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/4/1.delta Binary files differnew file mode 100644 index 0000000000..fe521b8c07 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/4/1.delta diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/4/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/4/2.delta Binary files differnew file mode 100644 index 0000000000..6352978051 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/4/2.delta diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/5/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/5/1.delta Binary files differnew file mode 100644 index 0000000000..6352978051 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/5/1.delta diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/5/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/5/2.delta Binary files differnew file mode 100644 index 0000000000..6352978051 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/5/2.delta diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/6/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/6/1.delta Binary files differnew file mode 100644 index 0000000000..6352978051 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/6/1.delta diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/6/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/6/2.delta Binary files differnew file mode 100644 index 0000000000..e69925caba --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/6/2.delta diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/7/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/7/1.delta Binary files differnew file mode 100644 index 0000000000..6352978051 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/7/1.delta diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/7/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/7/2.delta Binary files differnew file mode 100644 index 0000000000..36397a3dda --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/7/2.delta diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/1.delta Binary files differnew file mode 100644 index 0000000000..6352978051 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/1.delta diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/2.delta Binary files differnew file mode 100644 index 0000000000..6352978051 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/2.delta diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/1.delta Binary files differnew file mode 100644 index 0000000000..6352978051 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/1.delta diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/2.delta Binary files differnew file mode 100644 index 0000000000..0c9b6ac5c8 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/2.delta diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala index f7f0dade87..dc556322be 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala @@ -21,6 +21,7 @@ import java.io.File import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.util.stringToFile +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext { @@ -29,12 +30,37 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext { case class StringOffset(override val json: String) extends Offset test("OffsetSeqMetadata - deserialization") { - assert(OffsetSeqMetadata(0, 0) === OffsetSeqMetadata("""{}""")) - assert(OffsetSeqMetadata(1, 0) === OffsetSeqMetadata("""{"batchWatermarkMs":1}""")) - assert(OffsetSeqMetadata(0, 2) === OffsetSeqMetadata("""{"batchTimestampMs":2}""")) - assert( - OffsetSeqMetadata(1, 2) === - OffsetSeqMetadata("""{"batchWatermarkMs":1,"batchTimestampMs":2}""")) + val key = SQLConf.SHUFFLE_PARTITIONS.key + + def getConfWith(shufflePartitions: Int): Map[String, String] = { + Map(key -> shufflePartitions.toString) + } + + // None set + assert(OffsetSeqMetadata(0, 0, Map.empty) === OffsetSeqMetadata("""{}""")) + + // One set + assert(OffsetSeqMetadata(1, 0, Map.empty) === OffsetSeqMetadata("""{"batchWatermarkMs":1}""")) + assert(OffsetSeqMetadata(0, 2, Map.empty) === OffsetSeqMetadata("""{"batchTimestampMs":2}""")) + assert(OffsetSeqMetadata(0, 0, getConfWith(shufflePartitions = 2)) === + OffsetSeqMetadata(s"""{"conf": {"$key":2}}""")) + + // Two set + assert(OffsetSeqMetadata(1, 2, Map.empty) === + OffsetSeqMetadata("""{"batchWatermarkMs":1,"batchTimestampMs":2}""")) + assert(OffsetSeqMetadata(1, 0, getConfWith(shufflePartitions = 3)) === + OffsetSeqMetadata(s"""{"batchWatermarkMs":1,"conf": {"$key":3}}""")) + assert(OffsetSeqMetadata(0, 2, getConfWith(shufflePartitions = 3)) === + OffsetSeqMetadata(s"""{"batchTimestampMs":2,"conf": {"$key":3}}""")) + + // All set + assert(OffsetSeqMetadata(1, 2, getConfWith(shufflePartitions = 3)) === + OffsetSeqMetadata(s"""{"batchWatermarkMs":1,"batchTimestampMs":2,"conf": {"$key":3}}""")) + + // Drop unknown fields + assert(OffsetSeqMetadata(1, 2, getConfWith(shufflePartitions = 3)) === + OffsetSeqMetadata( + s"""{"batchWatermarkMs":1,"batchTimestampMs":2,"conf": {"$key":3}},"unknown":1""")) } test("OffsetSeqLog - serialization - deserialization") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 6dfcd8baba..e867fc40f7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -17,17 +17,20 @@ package org.apache.spark.sql.streaming -import java.io.{InterruptedIOException, IOException} +import java.io.{File, InterruptedIOException, IOException} import java.util.concurrent.{CountDownLatch, TimeoutException, TimeUnit} import scala.reflect.ClassTag import scala.util.control.ControlThrowable +import org.apache.commons.io.FileUtils + import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.streaming.InternalOutputModes import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.StreamSourceProvider import org.apache.spark.sql.types.{IntegerType, StructField, StructType} @@ -389,6 +392,102 @@ class StreamSuite extends StreamTest { query.stop() assert(query.exception.isEmpty) } + + test("SPARK-19873: streaming aggregation with change in number of partitions") { + val inputData = MemoryStream[(Int, Int)] + val agg = inputData.toDS().groupBy("_1").count() + + testStream(agg, OutputMode.Complete())( + AddData(inputData, (1, 0), (2, 0)), + StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "2")), + CheckAnswer((1, 1), (2, 1)), + StopStream, + AddData(inputData, (3, 0), (2, 0)), + StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "5")), + CheckAnswer((1, 1), (2, 2), (3, 1)), + StopStream, + AddData(inputData, (3, 0), (1, 0)), + StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "1")), + CheckAnswer((1, 2), (2, 2), (3, 2))) + } + + test("recover from a Spark v2.1 checkpoint") { + var inputData: MemoryStream[Int] = null + var query: DataStreamWriter[Row] = null + + def prepareMemoryStream(): Unit = { + inputData = MemoryStream[Int] + inputData.addData(1, 2, 3, 4) + inputData.addData(3, 4, 5, 6) + inputData.addData(5, 6, 7, 8) + + query = inputData + .toDF() + .groupBy($"value") + .agg(count("*")) + .writeStream + .outputMode("complete") + .format("memory") + } + + // Get an existing checkpoint generated by Spark v2.1. + // v2.1 does not record # shuffle partitions in the offset metadata. + val resourceUri = + this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI + val checkpointDir = new File(resourceUri) + + // 1 - Test if recovery from the checkpoint is successful. + prepareMemoryStream() + withTempDir { dir => + // Copy the checkpoint to a temp dir to prevent changes to the original. + // Not doing this will lead to the test passing on the first run, but fail subsequent runs. + FileUtils.copyDirectory(checkpointDir, dir) + + // Checkpoint data was generated by a query with 10 shuffle partitions. + // In order to test reading from the checkpoint, the checkpoint must have two or more batches, + // since the last batch may be rerun. + withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "10") { + var streamingQuery: StreamingQuery = null + try { + streamingQuery = + query.queryName("counts").option("checkpointLocation", dir.getCanonicalPath).start() + streamingQuery.processAllAvailable() + inputData.addData(9) + streamingQuery.processAllAvailable() + + QueryTest.checkAnswer(spark.table("counts").toDF(), + Row("1", 1) :: Row("2", 1) :: Row("3", 2) :: Row("4", 2) :: + Row("5", 2) :: Row("6", 2) :: Row("7", 1) :: Row("8", 1) :: Row("9", 1) :: Nil) + } finally { + if (streamingQuery ne null) { + streamingQuery.stop() + } + } + } + } + + // 2 - Check recovery with wrong num shuffle partitions + prepareMemoryStream() + withTempDir { dir => + FileUtils.copyDirectory(checkpointDir, dir) + + // Since the number of partitions is greater than 10, should throw exception. + withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "15") { + var streamingQuery: StreamingQuery = null + try { + intercept[StreamingQueryException] { + streamingQuery = + query.queryName("badQuery").option("checkpointLocation", dir.getCanonicalPath).start() + streamingQuery.processAllAvailable() + } + } finally { + if (streamingQuery ne null) { + streamingQuery.stop() + } + } + } + } + } } abstract class FakeSource extends StreamSourceProvider { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala index f05e9d1fda..b49efa6890 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala @@ -239,16 +239,6 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter { } } - test("SPARK-19268: Adaptive query execution should be disallowed") { - withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { - val e = intercept[AnalysisException] { - MemoryStream[Int].toDS.writeStream.queryName("test-query").format("memory").start() - } - assert(e.getMessage.contains(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key) && - e.getMessage.contains("not supported")) - } - } - /** Run a body of code by defining a query on each dataset */ private def withQueriesOn(datasets: Dataset[_]*)(body: Seq[StreamingQuery] => Unit): Unit = { failAfter(streamingTimeout) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index f61dcdcbcf..341ab0eb92 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit import scala.concurrent.duration._ import org.apache.hadoop.fs.Path +import org.mockito.Matchers.{any, eq => meq} import org.mockito.Mockito._ import org.scalatest.BeforeAndAfter @@ -370,21 +371,22 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { .option("checkpointLocation", checkpointLocationURI.toString) .trigger(ProcessingTime(10.seconds)) .start() + q.processAllAvailable() q.stop() verify(LastOptions.mockStreamSourceProvider).createSource( - spark.sqlContext, - s"$checkpointLocationURI/sources/0", - None, - "org.apache.spark.sql.streaming.test", - Map.empty) + any(), + meq(s"$checkpointLocationURI/sources/0"), + meq(None), + meq("org.apache.spark.sql.streaming.test"), + meq(Map.empty)) verify(LastOptions.mockStreamSourceProvider).createSource( - spark.sqlContext, - s"$checkpointLocationURI/sources/1", - None, - "org.apache.spark.sql.streaming.test", - Map.empty) + any(), + meq(s"$checkpointLocationURI/sources/1"), + meq(None), + meq("org.apache.spark.sql.streaming.test"), + meq(Map.empty)) } private def newTextInput = Utils.createTempDir(namePrefix = "text").getCanonicalPath |