From bb57bfe97d9fb077885065b8e804b85d4c493faf Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 5 Dec 2016 18:17:38 -0800 Subject: [SPARK-18657][SPARK-18668] Make StreamingQuery.id persists across restart and not auto-generate StreamingQuery.name ## What changes were proposed in this pull request? Here are the major changes in this PR. - Added the ability to recover `StreamingQuery.id` from checkpoint location, by writing the id to `checkpointLoc/metadata`. - Added `StreamingQuery.runId` which is unique for every query started and does not persist across restarts. This is to identify each restart of a query separately (same as earlier behavior of `id`). - Removed auto-generation of `StreamingQuery.name`. The purpose of name was to have the ability to define an identifier across restarts, but since id is precisely that, there is no need for a auto-generated name. This means name becomes purely cosmetic, and is null by default. - Added `runId` to `StreamingQueryListener` events and `StreamingQueryProgress`. Implementation details - Renamed existing `StreamExecutionMetadata` to `OffsetSeqMetadata`, and moved it to the file `OffsetSeq.scala`, because that is what this metadata is tied to. Also did some refactoring to make the code cleaner (got rid of a lot of `.json` and `.getOrElse("{}")`). - Added the `id` as the new `StreamMetadata`. - When a StreamingQuery is created it gets or writes the `StreamMetadata` from `checkpointLoc/metadata`. - All internal logging in `StreamExecution` uses `(name, id, runId)` instead of just `name` TODO - [x] Test handling of name=null in json generation of StreamingQueryProgress - [x] Test handling of name=null in json generation of StreamingQueryListener events - [x] Test python API of runId ## How was this patch tested? Updated unit tests and new unit tests Author: Tathagata Das Closes #16113 from tdas/SPARK-18657. --- .../spark/sql/execution/streaming/OffsetSeq.scala | 27 +++++- .../sql/execution/streaming/OffsetSeqLog.scala | 2 +- .../sql/execution/streaming/ProgressReporter.scala | 6 +- .../sql/execution/streaming/StreamExecution.scala | 105 ++++++++++----------- .../sql/execution/streaming/StreamMetadata.scala | 88 +++++++++++++++++ .../sql/execution/streaming/StreamProgress.scala | 2 +- .../spark/sql/streaming/StreamingQuery.scala | 19 +++- .../sql/streaming/StreamingQueryListener.scala | 10 +- .../sql/streaming/StreamingQueryManager.scala | 25 +++-- .../org/apache/spark/sql/streaming/progress.scala | 7 +- .../query-metadata-logs-version-2.1.0.txt | 3 + .../execution/streaming/OffsetSeqLogSuite.scala | 13 ++- .../execution/streaming/StreamMetadataSuite.scala | 55 +++++++++++ .../streaming/StreamExecutionMetadataSuite.scala | 35 ------- .../streaming/StreamingQueryListenerSuite.scala | 46 +++++---- .../StreamingQueryStatusAndProgressSuite.scala | 78 ++++++++++++--- .../spark/sql/streaming/StreamingQuerySuite.scala | 100 ++++++++++++++------ 17 files changed, 446 insertions(+), 175 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala create mode 100644 sql/core/src/test/resources/structured-streaming/query-metadata-logs-version-2.1.0.txt create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetadataSuite.scala delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala (limited to 'sql') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala index 7469caeee3..e5a1997d6b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala @@ -17,13 +17,16 @@ package org.apache.spark.sql.execution.streaming +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization + /** * An ordered collection of offsets, used to track the progress of processing data from one or more * [[Source]]s that are present in a streaming query. This is similar to simplified, single-instance * vector clock that must progress linearly forward. */ -case class OffsetSeq(offsets: Seq[Option[Offset]], metadata: Option[String] = None) { +case class OffsetSeq(offsets: Seq[Option[Offset]], metadata: Option[OffsetSeqMetadata] = None) { /** * Unpacks an offset into [[StreamProgress]] by associating each offset with the order list of @@ -54,6 +57,26 @@ object OffsetSeq { * `nulls` in the sequence are converted to `None`s. */ def fill(metadata: Option[String], offsets: Offset*): OffsetSeq = { - OffsetSeq(offsets.map(Option(_)), metadata) + OffsetSeq(offsets.map(Option(_)), metadata.map(OffsetSeqMetadata.apply)) } } + + +/** + * Contains metadata associated with a [[OffsetSeq]]. This information is + * persisted to the offset log in the checkpoint location via the [[OffsetSeq]] metadata field. + * + * @param batchWatermarkMs: The current eventTime watermark, used to + * bound the lateness of data that will processed. Time unit: milliseconds + * @param batchTimestampMs: The current batch processing timestamp. + * Time unit: milliseconds + */ +case class OffsetSeqMetadata(var batchWatermarkMs: Long = 0, var batchTimestampMs: Long = 0) { + def json: String = Serialization.write(this)(OffsetSeqMetadata.format) +} + +object OffsetSeqMetadata { + private implicit val format = Serialization.formats(NoTypeHints) + def apply(json: String): OffsetSeqMetadata = Serialization.read[OffsetSeqMetadata](json) +} + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala index cc25b4474b..3210d8ad64 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala @@ -74,7 +74,7 @@ class OffsetSeqLog(sparkSession: SparkSession, path: String) // write metadata out.write('\n') - out.write(offsetSeq.metadata.getOrElse("").getBytes(UTF_8)) + out.write(offsetSeq.metadata.map(_.json).getOrElse("").getBytes(UTF_8)) // write offsets, one per line offsetSeq.offsets.map(_.map(_.json)).foreach { offset => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index ba77e7c7bf..7d0d086746 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -43,6 +43,7 @@ trait ProgressReporter extends Logging { // Internal state of the stream, required for computing metrics. protected def id: UUID + protected def runId: UUID protected def name: String protected def triggerClock: Clock protected def logicalPlan: LogicalPlan @@ -52,7 +53,7 @@ trait ProgressReporter extends Logging { protected def committedOffsets: StreamProgress protected def sources: Seq[Source] protected def sink: Sink - protected def streamExecutionMetadata: StreamExecutionMetadata + protected def offsetSeqMetadata: OffsetSeqMetadata protected def currentBatchId: Long protected def sparkSession: SparkSession @@ -134,11 +135,12 @@ trait ProgressReporter extends Logging { val newProgress = new StreamingQueryProgress( id = id, + runId = runId, name = name, timestamp = currentTriggerStartTimestamp, batchId = currentBatchId, durationMs = currentDurationsMs.toMap.mapValues(long2Long).asJava, - currentWatermark = streamExecutionMetadata.batchWatermarkMs, + currentWatermark = offsetSeqMetadata.batchWatermarkMs, stateOperators = executionStats.stateOperators.toArray, sources = sourceProgress.toArray, sink = sinkProgress) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 6b1c01ab2a..083cce8eb5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -25,8 +25,6 @@ import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal import org.apache.hadoop.fs.Path -import org.json4s.NoTypeHints -import org.json4s.jackson.Serialization import org.apache.spark.internal.Logging import org.apache.spark.sql._ @@ -58,9 +56,6 @@ class StreamExecution( import org.apache.spark.sql.streaming.StreamingQueryListener._ - // TODO: restore this from the checkpoint directory. - override val id: UUID = UUID.randomUUID() - private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay private val noDataProgressEventInterval = @@ -98,8 +93,30 @@ class StreamExecution( /** The current batchId or -1 if execution has not yet been initialized. */ protected var currentBatchId: Long = -1 - /** Stream execution metadata */ - protected var streamExecutionMetadata = StreamExecutionMetadata() + /** Metadata associated with the whole query */ + protected val streamMetadata: StreamMetadata = { + val metadataPath = new Path(checkpointFile("metadata")) + val hadoopConf = sparkSession.sessionState.newHadoopConf() + StreamMetadata.read(metadataPath, hadoopConf).getOrElse { + val newMetadata = new StreamMetadata(UUID.randomUUID.toString) + StreamMetadata.write(newMetadata, metadataPath, hadoopConf) + newMetadata + } + } + + /** Metadata associated with the offset seq of a batch in the query. */ + protected var offsetSeqMetadata = OffsetSeqMetadata() + + override val id: UUID = UUID.fromString(streamMetadata.id) + + override val runId: UUID = UUID.randomUUID + + /** + * Pretty identified string of printing in logs. Format is + * If name is set "queryName [id = xyz, runId = abc]" else "[id = xyz, runId = abc]" + */ + private val prettyIdString = + Option(name).map(_ + " ").getOrElse("") + s"[id = $id, runId = $runId]" /** All stream sources present in the query plan. */ protected val sources = @@ -128,8 +145,9 @@ class StreamExecution( /* Get the call site in the caller thread; will pass this into the micro batch thread */ private val callSite = Utils.getCallSite() - /** Used to report metrics to coda-hale. */ - lazy val streamMetrics = new MetricsReporter(this, s"spark.streaming.$name") + /** Used to report metrics to coda-hale. This uses id for easier tracking across restarts. */ + lazy val streamMetrics = new MetricsReporter( + this, s"spark.streaming.${Option(name).getOrElse(id)}") /** * The thread that runs the micro-batches of this stream. Note that this thread must be @@ -137,7 +155,7 @@ class StreamExecution( * [[HDFSMetadataLog]]. See SPARK-14131 for more details. */ val microBatchThread = - new StreamExecutionThread(s"stream execution thread for $name") { + new StreamExecutionThread(s"stream execution thread for $prettyIdString") { override def run(): Unit = { // To fix call site like "run at :0", we bridge the call site from the caller // thread to this micro batch thread @@ -191,7 +209,7 @@ class StreamExecution( sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics) } - postEvent(new QueryStartedEvent(id, name)) // Assumption: Does not throw exception. + postEvent(new QueryStartedEvent(id, runId, name)) // Assumption: Does not throw exception. // Unblock starting thread startLatch.countDown() @@ -261,10 +279,10 @@ class StreamExecution( case e: Throwable => streamDeathCause = new StreamingQueryException( this, - s"Query $name terminated with exception: ${e.getMessage}", + s"Query $prettyIdString terminated with exception: ${e.getMessage}", e, - committedOffsets.toOffsetSeq(sources, streamExecutionMetadata.json).toString, - availableOffsets.toOffsetSeq(sources, streamExecutionMetadata.json).toString) + committedOffsets.toOffsetSeq(sources, offsetSeqMetadata).toString, + availableOffsets.toOffsetSeq(sources, offsetSeqMetadata).toString) logError(s"Query $name terminated with error", e) updateStatusMessage(s"Terminated with exception: ${e.getMessage}") // Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to @@ -282,7 +300,7 @@ class StreamExecution( // Notify others sparkSession.streams.notifyQueryTermination(StreamExecution.this) postEvent( - new QueryTerminatedEvent(id, exception.map(_.cause).map(Utils.exceptionString))) + new QueryTerminatedEvent(id, runId, exception.map(_.cause).map(Utils.exceptionString))) terminationLatch.countDown() } } @@ -301,9 +319,9 @@ class StreamExecution( logInfo(s"Resuming streaming query, starting with batch $batchId") currentBatchId = batchId availableOffsets = nextOffsets.toStreamProgress(sources) - streamExecutionMetadata = StreamExecutionMetadata(nextOffsets.metadata.getOrElse("{}")) + offsetSeqMetadata = nextOffsets.metadata.getOrElse(OffsetSeqMetadata()) logDebug(s"Found possibly unprocessed offsets $availableOffsets " + - s"at batch timestamp ${streamExecutionMetadata.batchTimestampMs}") + s"at batch timestamp ${offsetSeqMetadata.batchTimestampMs}") offsetLog.get(batchId - 1).foreach { case lastOffsets => @@ -359,15 +377,15 @@ class StreamExecution( } if (hasNewData) { // Current batch timestamp in milliseconds - streamExecutionMetadata.batchTimestampMs = triggerClock.getTimeMillis() + offsetSeqMetadata.batchTimestampMs = triggerClock.getTimeMillis() updateStatusMessage("Writing offsets to log") reportTimeTaken("walCommit") { assert(offsetLog.add( currentBatchId, - availableOffsets.toOffsetSeq(sources, streamExecutionMetadata.json)), + availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)), s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId") logInfo(s"Committed offsets for batch $currentBatchId. " + - s"Metadata ${streamExecutionMetadata.toString}") + s"Metadata ${offsetSeqMetadata.toString}") // NOTE: The following code is correct because runBatches() processes exactly one // batch at a time. If we add pipeline parallelism (multiple batches in flight at @@ -437,21 +455,21 @@ class StreamExecution( val triggerLogicalPlan = withNewSources transformAllExpressions { case a: Attribute if replacementMap.contains(a) => replacementMap(a) case ct: CurrentTimestamp => - CurrentBatchTimestamp(streamExecutionMetadata.batchTimestampMs, + CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs, ct.dataType) case cd: CurrentDate => - CurrentBatchTimestamp(streamExecutionMetadata.batchTimestampMs, + CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs, cd.dataType) } - val executedPlan = reportTimeTaken("queryPlanning") { + reportTimeTaken("queryPlanning") { lastExecution = new IncrementalExecution( sparkSession, triggerLogicalPlan, outputMode, checkpointFile("state"), currentBatchId, - streamExecutionMetadata.batchWatermarkMs) + offsetSeqMetadata.batchWatermarkMs) lastExecution.executedPlan // Force the lazy generation of execution plan } @@ -468,12 +486,12 @@ class StreamExecution( logTrace(s"Maximum observed eventTime: ${e.maxEventTime.value}") (e.maxEventTime.value / 1000) - e.delay.milliseconds() }.headOption.foreach { newWatermark => - if (newWatermark > streamExecutionMetadata.batchWatermarkMs) { + if (newWatermark > offsetSeqMetadata.batchWatermarkMs) { logInfo(s"Updating eventTime watermark to: $newWatermark ms") - streamExecutionMetadata.batchWatermarkMs = newWatermark + offsetSeqMetadata.batchWatermarkMs = newWatermark } else { logTrace(s"Event time didn't move: $newWatermark < " + - s"$streamExecutionMetadata.currentEventTimeWatermark") + s"$offsetSeqMetadata.currentEventTimeWatermark") } } @@ -503,7 +521,7 @@ class StreamExecution( microBatchThread.join() } uniqueSources.foreach(_.stop()) - logInfo(s"Query $name was stopped") + logInfo(s"Query $prettyIdString was stopped") } /** @@ -594,7 +612,7 @@ class StreamExecution( override def explain(): Unit = explain(extended = false) override def toString: String = { - s"Streaming Query - $name [state = $state]" + s"Streaming Query $prettyIdString [state = $state]" } def toDebugString: String = { @@ -603,7 +621,7 @@ class StreamExecution( } else "" s""" |=== Streaming Query === - |Name: $name + |Identifier: $prettyIdString |Current Offsets: $committedOffsets | |Current State: $state @@ -622,33 +640,6 @@ class StreamExecution( case object TERMINATED extends State } -/** - * Contains metadata associated with a stream execution. This information is - * persisted to the offset log via the OffsetSeq metadata field. Current - * information contained in this object includes: - * - * @param batchWatermarkMs: The current eventTime watermark, used to - * bound the lateness of data that will processed. Time unit: milliseconds - * @param batchTimestampMs: The current batch processing timestamp. - * Time unit: milliseconds - */ -case class StreamExecutionMetadata( - var batchWatermarkMs: Long = 0, - var batchTimestampMs: Long = 0) { - private implicit val formats = StreamExecutionMetadata.formats - - /** - * JSON string representation of this object. - */ - def json: String = Serialization.write(this) -} - -object StreamExecutionMetadata { - private implicit val formats = Serialization.formats(NoTypeHints) - - def apply(json: String): StreamExecutionMetadata = - Serialization.read[StreamExecutionMetadata](json) -} /** * A special thread to run the stream query. Some codes require to run in the StreamExecutionThread diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala new file mode 100644 index 0000000000..7807c9fae8 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.{InputStreamReader, OutputStreamWriter} +import java.nio.charset.StandardCharsets + +import scala.util.control.NonFatal + +import org.apache.commons.io.IOUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, FSDataOutputStream, Path} +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.streaming.StreamingQuery + +/** + * Contains metadata associated with a [[StreamingQuery]]. This information is written + * in the checkpoint location the first time a query is started and recovered every time the query + * is restarted. + * + * @param id unique id of the [[StreamingQuery]] that needs to be persisted across restarts + */ +case class StreamMetadata(id: String) { + def json: String = Serialization.write(this)(StreamMetadata.format) +} + +object StreamMetadata extends Logging { + implicit val format = Serialization.formats(NoTypeHints) + + /** Read the metadata from file if it exists */ + def read(metadataFile: Path, hadoopConf: Configuration): Option[StreamMetadata] = { + val fs = FileSystem.get(hadoopConf) + if (fs.exists(metadataFile)) { + var input: FSDataInputStream = null + try { + input = fs.open(metadataFile) + val reader = new InputStreamReader(input, StandardCharsets.UTF_8) + val metadata = Serialization.read[StreamMetadata](reader) + Some(metadata) + } catch { + case NonFatal(e) => + logError(s"Error reading stream metadata from $metadataFile", e) + throw e + } finally { + IOUtils.closeQuietly(input) + } + } else None + } + + /** Write metadata to file */ + def write( + metadata: StreamMetadata, + metadataFile: Path, + hadoopConf: Configuration): Unit = { + var output: FSDataOutputStream = null + try { + val fs = FileSystem.get(hadoopConf) + output = fs.create(metadataFile) + val writer = new OutputStreamWriter(output) + Serialization.write(metadata, writer) + writer.close() + } catch { + case NonFatal(e) => + logError(s"Error writing stream metadata $metadata to $metadataFile", e) + throw e + } finally { + IOUtils.closeQuietly(output) + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala index 21b8750ca9..a3f3662e6f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala @@ -26,7 +26,7 @@ class StreamProgress( val baseMap: immutable.Map[Source, Offset] = new immutable.HashMap[Source, Offset]) extends scala.collection.immutable.Map[Source, Offset] { - def toOffsetSeq(source: Seq[Source], metadata: String): OffsetSeq = { + def toOffsetSeq(source: Seq[Source], metadata: OffsetSeqMetadata): OffsetSeq = { OffsetSeq(source.map(get), Some(metadata)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala index 8fc4e43b6d..1794e75462 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala @@ -32,20 +32,31 @@ import org.apache.spark.sql.SparkSession trait StreamingQuery { /** - * Returns the name of the query. This name is unique across all active queries. This can be - * set in the `org.apache.spark.sql.streaming.DataStreamWriter` as - * `dataframe.writeStream.queryName("query").start()`. + * Returns the user-specified name of the query, or null if not specified. + * This name can be specified in the `org.apache.spark.sql.streaming.DataStreamWriter` + * as `dataframe.writeStream.queryName("query").start()`. + * This name, if set, must be unique across all active queries. * * @since 2.0.0 */ def name: String /** - * Returns the unique id of this query. + * Returns the unique id of this query that persists across restarts from checkpoint data. + * That is, this id is generated when a query is started for the first time, and + * will be the same every time it is restarted from checkpoint data. Also see [[runId]]. + * * @since 2.1.0 */ def id: UUID + /** + * Returns the unique id of this run of the query. That is, every start/restart of a query will + * generated a unique runId. Therefore, every time a query is restarted from + * checkpoint, it will have the same [[id]] but different [[runId]]s. + */ + def runId: UUID + /** * Returns the `SparkSession` associated with `this`. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala index d9ee75c064..6fc859d88d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala @@ -86,7 +86,10 @@ object StreamingQueryListener { * @since 2.1.0 */ @Experimental - class QueryStartedEvent private[sql](val id: UUID, val name: String) extends Event + class QueryStartedEvent private[sql]( + val id: UUID, + val runId: UUID, + val name: String) extends Event /** * :: Experimental :: @@ -106,5 +109,8 @@ object StreamingQueryListener { * @since 2.1.0 */ @Experimental - class QueryTerminatedEvent private[sql](val id: UUID, val exception: Option[String]) extends Event + class QueryTerminatedEvent private[sql]( + val id: UUID, + val runId: UUID, + val exception: Option[String]) extends Event } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index c448468bea..c6ab41655f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -207,10 +207,14 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) { trigger: Trigger = ProcessingTime(0), triggerClock: Clock = new SystemClock()): StreamingQuery = { activeQueriesLock.synchronized { - val name = userSpecifiedName.getOrElse(s"query-${StreamingQueryManager.nextId}") - if (activeQueries.values.exists(_.name == name)) { - throw new IllegalArgumentException( - s"Cannot start query with name $name as a query with that name is already active") + val name = userSpecifiedName match { + case Some(n) => + if (activeQueries.values.exists(_.name == userSpecifiedName.get)) { + throw new IllegalArgumentException( + s"Cannot start query with name $n as a query with that name is already active") + } + n + case None => null } val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified => new Path(userSpecified).toUri.toString @@ -268,6 +272,14 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) { trigger, triggerClock, outputMode) + + if (activeQueries.values.exists(_.id == query.id)) { + throw new IllegalStateException( + s"Cannot start query with id ${query.id} as another query with same id is " + + s"already active. Perhaps you are attempting to restart a query from checkpoint" + + s"that is already active.") + } + query.start() activeQueries.put(query.id, query) query @@ -287,8 +299,3 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) { } } } - -private object StreamingQueryManager { - private val _nextId = new AtomicLong(0) - private def nextId: Long = _nextId.getAndIncrement() -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index fb5bad0123..f768080f5d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -57,8 +57,9 @@ class StateOperatorProgress private[sql]( * a trigger. Each event relates to processing done for a single trigger of the streaming * query. Events are emitted even when no new data is available to be processed. * - * @param id A unique id of the query. - * @param name Name of the query. This name is unique across all active queries. + * @param id An unique query id that persists across restarts. See `StreamingQuery.id()`. + * @param runId A query id that is unique for every start/restart. See `StreamingQuery.runId()`. + * @param name User-specified name of the query, null if not specified. * @param timestamp Timestamp (ms) of the beginning of the trigger. * @param batchId A unique id for the current batch of data being processed. Note that in the * case of retries after a failure a given batchId my be executed more than once. @@ -73,6 +74,7 @@ class StateOperatorProgress private[sql]( @Experimental class StreamingQueryProgress private[sql]( val id: UUID, + val runId: UUID, val name: String, val timestamp: Long, val batchId: Long, @@ -105,6 +107,7 @@ class StreamingQueryProgress private[sql]( } ("id" -> JString(id.toString)) ~ + ("runId" -> JString(runId.toString)) ~ ("name" -> JString(name)) ~ ("timestamp" -> JInt(timestamp)) ~ ("numInputRows" -> JInt(numInputRows)) ~ diff --git a/sql/core/src/test/resources/structured-streaming/query-metadata-logs-version-2.1.0.txt b/sql/core/src/test/resources/structured-streaming/query-metadata-logs-version-2.1.0.txt new file mode 100644 index 0000000000..79613e2362 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/query-metadata-logs-version-2.1.0.txt @@ -0,0 +1,3 @@ +{ + "id": "d366a8bf-db79-42ca-b5a4-d9ca0a11d63e" +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala index 3afd11fa46..d3a83ea0b9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala @@ -27,10 +27,19 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext { /** test string offset type */ case class StringOffset(override val json: String) extends Offset - testWithUninterruptibleThread("serialization - deserialization") { + test("OffsetSeqMetadata - deserialization") { + assert(OffsetSeqMetadata(0, 0) === OffsetSeqMetadata("""{}""")) + assert(OffsetSeqMetadata(1, 0) === OffsetSeqMetadata("""{"batchWatermarkMs":1}""")) + assert(OffsetSeqMetadata(0, 2) === OffsetSeqMetadata("""{"batchTimestampMs":2}""")) + assert( + OffsetSeqMetadata(1, 2) === + OffsetSeqMetadata("""{"batchWatermarkMs":1,"batchTimestampMs":2}""")) + } + + testWithUninterruptibleThread("OffsetSeqLog - serialization - deserialization") { withTempDir { temp => val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir - val metadataLog = new OffsetSeqLog(spark, dir.getAbsolutePath) + val metadataLog = new OffsetSeqLog(spark, dir.getAbsolutePath) val batch0 = OffsetSeq.fill(LongOffset(0), LongOffset(1), LongOffset(2)) val batch1 = OffsetSeq.fill(StringOffset("one"), StringOffset("two"), StringOffset("three")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetadataSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetadataSuite.scala new file mode 100644 index 0000000000..87f8004ab9 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetadataSuite.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.File +import java.util.UUID + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.streaming.StreamTest + +class StreamMetadataSuite extends StreamTest { + + test("writing and reading") { + withTempDir { dir => + val id = UUID.randomUUID.toString + val metadata = StreamMetadata(id) + val file = new Path(new File(dir, "test").toString) + StreamMetadata.write(metadata, file, hadoopConf) + val readMetadata = StreamMetadata.read(file, hadoopConf) + assert(readMetadata.nonEmpty) + assert(readMetadata.get.id === id) + } + } + + test("read Spark 2.1.0 format") { + // query-metadata-logs-version-2.1.0.txt has the execution metadata generated by Spark 2.1.0 + assert( + readForResource("query-metadata-logs-version-2.1.0.txt") === + StreamMetadata("d366a8bf-db79-42ca-b5a4-d9ca0a11d63e")) + } + + private def readForResource(fileName: String): StreamMetadata = { + val input = getClass.getResource(s"/structured-streaming/$fileName") + StreamMetadata.read(new Path(input.toString), hadoopConf).get + } + + private val hadoopConf = new Configuration() +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala deleted file mode 100644 index c7139c588d..0000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.streaming - -import org.apache.spark.sql.execution.streaming.StreamExecutionMetadata - -class StreamExecutionMetadataSuite extends StreamTest { - - test("stream execution metadata") { - assert(StreamExecutionMetadata(0, 0) === - StreamExecutionMetadata("""{}""")) - assert(StreamExecutionMetadata(1, 0) === - StreamExecutionMetadata("""{"batchWatermarkMs":1}""")) - assert(StreamExecutionMetadata(0, 2) === - StreamExecutionMetadata("""{"batchTimestampMs":2}""")) - assert(StreamExecutionMetadata(1, 2) === - StreamExecutionMetadata( - """{"batchWatermarkMs":1,"batchTimestampMs":2}""")) - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 3086abf03c..a38c05eed5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -69,6 +69,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { AssertOnQuery { query => assert(listener.startEvent !== null) assert(listener.startEvent.id === query.id) + assert(listener.startEvent.runId === query.runId) assert(listener.startEvent.name === query.name) assert(listener.progressEvents.isEmpty) assert(listener.terminationEvent === null) @@ -92,6 +93,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { eventually(Timeout(streamingTimeout)) { assert(listener.terminationEvent !== null) assert(listener.terminationEvent.id === query.id) + assert(listener.terminationEvent.runId === query.runId) assert(listener.terminationEvent.exception === None) } listener.checkAsyncErrors() @@ -167,30 +169,40 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { } test("QueryStartedEvent serialization") { - val queryStarted = new StreamingQueryListener.QueryStartedEvent(UUID.randomUUID(), "name") - val json = JsonProtocol.sparkEventToJson(queryStarted) - val newQueryStarted = JsonProtocol.sparkEventFromJson(json) - .asInstanceOf[StreamingQueryListener.QueryStartedEvent] + def testSerialization(event: QueryStartedEvent): Unit = { + val json = JsonProtocol.sparkEventToJson(event) + val newEvent = JsonProtocol.sparkEventFromJson(json).asInstanceOf[QueryStartedEvent] + assert(newEvent.id === event.id) + assert(newEvent.runId === event.runId) + assert(newEvent.name === event.name) + } + + testSerialization(new QueryStartedEvent(UUID.randomUUID, UUID.randomUUID, "name")) + testSerialization(new QueryStartedEvent(UUID.randomUUID, UUID.randomUUID, null)) } test("QueryProgressEvent serialization") { - val event = new StreamingQueryListener.QueryProgressEvent( - StreamingQueryStatusAndProgressSuite.testProgress) - val json = JsonProtocol.sparkEventToJson(event) - val newEvent = JsonProtocol.sparkEventFromJson(json) - .asInstanceOf[StreamingQueryListener.QueryProgressEvent] - assert(event.progress.json === newEvent.progress.json) + def testSerialization(event: QueryProgressEvent): Unit = { + val json = JsonProtocol.sparkEventToJson(event) + val newEvent = JsonProtocol.sparkEventFromJson(json).asInstanceOf[QueryProgressEvent] + assert(newEvent.progress.json === event.progress.json) // json as a proxy for equality + } + testSerialization(new QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress1)) + testSerialization(new QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress2)) } test("QueryTerminatedEvent serialization") { + def testSerialization(event: QueryTerminatedEvent): Unit = { + val json = JsonProtocol.sparkEventToJson(event) + val newEvent = JsonProtocol.sparkEventFromJson(json).asInstanceOf[QueryTerminatedEvent] + assert(newEvent.id === event.id) + assert(newEvent.runId === event.runId) + assert(newEvent.exception === event.exception) + } + val exception = new RuntimeException("exception") - val queryQueryTerminated = new StreamingQueryListener.QueryTerminatedEvent( - UUID.randomUUID, Some(exception.getMessage)) - val json = JsonProtocol.sparkEventToJson(queryQueryTerminated) - val newQueryTerminated = JsonProtocol.sparkEventFromJson(json) - .asInstanceOf[StreamingQueryListener.QueryTerminatedEvent] - assert(queryQueryTerminated.id === newQueryTerminated.id) - assert(queryQueryTerminated.exception === newQueryTerminated.exception) + testSerialization( + new QueryTerminatedEvent(UUID.randomUUID, UUID.randomUUID, Some(exception.getMessage))) } test("only one progress event per interval when no data") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala index 4da712fa0f..96f19db1a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala @@ -31,12 +31,13 @@ import org.apache.spark.sql.streaming.StreamingQueryStatusAndProgressSuite._ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite { test("StreamingQueryProgress - prettyJson") { - val json = testProgress.prettyJson - assert(json === + val json1 = testProgress1.prettyJson + assert(json1 === s""" |{ - | "id" : "${testProgress.id.toString}", - | "name" : "name", + | "id" : "${testProgress1.id.toString}", + | "runId" : "${testProgress1.runId.toString}", + | "name" : "myName", | "timestamp" : 1, | "numInputRows" : 678, | "inputRowsPerSecond" : 10.0, @@ -60,16 +61,48 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite { | } |} """.stripMargin.trim) - assert(compact(parse(json)) === testProgress.json) - + assert(compact(parse(json1)) === testProgress1.json) + + val json2 = testProgress2.prettyJson + assert( + json2 === + s""" + |{ + | "id" : "${testProgress2.id.toString}", + | "runId" : "${testProgress2.runId.toString}", + | "name" : null, + | "timestamp" : 1, + | "numInputRows" : 678, + | "durationMs" : { + | "total" : 0 + | }, + | "currentWatermark" : 3, + | "stateOperators" : [ { + | "numRowsTotal" : 0, + | "numRowsUpdated" : 1 + | } ], + | "sources" : [ { + | "description" : "source", + | "startOffset" : 123, + | "endOffset" : 456, + | "numInputRows" : 678 + | } ], + | "sink" : { + | "description" : "sink" + | } + |} + """.stripMargin.trim) + assert(compact(parse(json2)) === testProgress2.json) } test("StreamingQueryProgress - json") { - assert(compact(parse(testProgress.json)) === testProgress.json) + assert(compact(parse(testProgress1.json)) === testProgress1.json) + assert(compact(parse(testProgress2.json)) === testProgress2.json) } test("StreamingQueryProgress - toString") { - assert(testProgress.toString === testProgress.prettyJson) + assert(testProgress1.toString === testProgress1.prettyJson) + assert(testProgress2.toString === testProgress2.prettyJson) } test("StreamingQueryStatus - prettyJson") { @@ -94,9 +127,10 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite { } object StreamingQueryStatusAndProgressSuite { - val testProgress = new StreamingQueryProgress( - id = UUID.randomUUID(), - name = "name", + val testProgress1 = new StreamingQueryProgress( + id = UUID.randomUUID, + runId = UUID.randomUUID, + name = "myName", timestamp = 1L, batchId = 2L, durationMs = Map("total" -> 0L).mapValues(long2Long).asJava, @@ -115,6 +149,28 @@ object StreamingQueryStatusAndProgressSuite { sink = new SinkProgress("sink") ) + val testProgress2 = new StreamingQueryProgress( + id = UUID.randomUUID, + runId = UUID.randomUUID, + name = null, // should not be present in the json + timestamp = 1L, + batchId = 2L, + durationMs = Map("total" -> 0L).mapValues(long2Long).asJava, + currentWatermark = 3L, + stateOperators = Array(new StateOperatorProgress(numRowsTotal = 0, numRowsUpdated = 1)), + sources = Array( + new SourceProgress( + description = "source", + startOffset = "123", + endOffset = "456", + numInputRows = 678, + inputRowsPerSecond = Double.NaN, // should not be present in the json + processedRowsPerSecond = Double.NegativeInfinity // should not be present in the json + ) + ), + sink = new SinkProgress("sink") + ) + val testStatus = new StreamingQueryStatus("active", true, false) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index f7fc19494d..893cb762c6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.streaming +import org.apache.commons.lang3.RandomStringUtils import org.scalactic.TolerantNumerics import org.scalatest.concurrent.Eventually._ import org.scalatest.BeforeAndAfter @@ -28,7 +29,7 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.SparkException import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.functions._ -import org.apache.spark.util.{ManualClock, Utils} +import org.apache.spark.util.ManualClock class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { @@ -43,38 +44,77 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { sqlContext.streams.active.foreach(_.stop()) } - test("names unique across active queries, ids unique across all started queries") { - val inputData = MemoryStream[Int] - val mapped = inputData.toDS().map { 6 / _} + test("name unique in active queries") { + withTempDir { dir => + def startQuery(name: Option[String]): StreamingQuery = { + val writer = MemoryStream[Int].toDS.writeStream + name.foreach(writer.queryName) + writer + .foreach(new TestForeachWriter) + .start() + } - def startQuery(queryName: String): StreamingQuery = { - val metadataRoot = Utils.createTempDir(namePrefix = "streaming.checkpoint").getCanonicalPath - val writer = mapped.writeStream - writer - .queryName(queryName) - .format("memory") - .option("checkpointLocation", metadataRoot) - .start() - } + // No name by default, multiple active queries can have no name + val q1 = startQuery(name = None) + assert(q1.name === null) + val q2 = startQuery(name = None) + assert(q2.name === null) + + // Can be set by user + val q3 = startQuery(name = Some("q3")) + assert(q3.name === "q3") - val q1 = startQuery("q1") - assert(q1.name === "q1") + // Multiple active queries cannot have same name + val e = intercept[IllegalArgumentException] { + startQuery(name = Some("q3")) + } - // Verify that another query with same name cannot be started - val e1 = intercept[IllegalArgumentException] { - startQuery("q1") + q1.stop() + q2.stop() + q3.stop() } - Seq("q1", "already active").foreach { s => assert(e1.getMessage.contains(s)) } + } - // Verify q1 was unaffected by the above exception and stop it - assert(q1.isActive) - q1.stop() + test( + "id unique in active queries + persists across restarts, runId unique across start/restarts") { + val inputData = MemoryStream[Int] + withTempDir { dir => + var cpDir: String = null + + def startQuery(restart: Boolean): StreamingQuery = { + if (cpDir == null || !restart) cpDir = s"$dir/${RandomStringUtils.randomAlphabetic(10)}" + MemoryStream[Int].toDS().groupBy().count() + .writeStream + .format("memory") + .outputMode("complete") + .queryName(s"name${RandomStringUtils.randomAlphabetic(10)}") + .option("checkpointLocation", cpDir) + .start() + } - // Verify another query can be started with name q1, but will have different id - val q2 = startQuery("q1") - assert(q2.name === "q1") - assert(q2.id !== q1.id) - q2.stop() + // id and runId unique for new queries + val q1 = startQuery(restart = false) + val q2 = startQuery(restart = false) + assert(q1.id !== q2.id) + assert(q1.runId !== q2.runId) + q1.stop() + q2.stop() + + // id persists across restarts, runId unique across restarts + val q3 = startQuery(restart = false) + q3.stop() + + val q4 = startQuery(restart = true) + q4.stop() + assert(q3.id === q3.id) + assert(q3.runId !== q4.runId) + + // Only one query with same id can be active + val q5 = startQuery(restart = false) + val e = intercept[IllegalStateException] { + startQuery(restart = true) + } + } } testQuietly("isActive, exception, and awaitTermination") { @@ -105,9 +145,9 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { TestAwaitTermination(ExpectException[SparkException], timeoutMs = 10), AssertOnQuery(q => { q.exception.get.startOffset === - q.committedOffsets.toOffsetSeq(Seq(inputData), "{}").toString && + q.committedOffsets.toOffsetSeq(Seq(inputData), OffsetSeqMetadata()).toString && q.exception.get.endOffset === - q.availableOffsets.toOffsetSeq(Seq(inputData), "{}").toString + q.availableOffsets.toOffsetSeq(Seq(inputData), OffsetSeqMetadata()).toString }, "incorrect start offset or end offset on exception") ) } @@ -274,7 +314,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { /** Whether metrics of a query is registered for reporting */ def isMetricsRegistered(query: StreamingQuery): Boolean = { - val sourceName = s"spark.streaming.${query.name}" + val sourceName = s"spark.streaming.${query.id}" val sources = spark.sparkContext.env.metricsSystem.getSourcesByName(sourceName) require(sources.size <= 1) sources.nonEmpty -- cgit v1.2.3