aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-12-05 18:17:38 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2016-12-05 18:17:38 -0800
commitbb57bfe97d9fb077885065b8e804b85d4c493faf (patch)
tree384926cb923ea82b7195197da27834e4d47548f1
parent1b2785c3d0a40da2fca923af78066060dbfbcf0a (diff)
downloadspark-bb57bfe97d9fb077885065b8e804b85d4c493faf.tar.gz
spark-bb57bfe97d9fb077885065b8e804b85d4c493faf.tar.bz2
spark-bb57bfe97d9fb077885065b8e804b85d4c493faf.zip
[SPARK-18657][SPARK-18668] Make StreamingQuery.id persists across restart and not auto-generate StreamingQuery.name
## What changes were proposed in this pull request? Here are the major changes in this PR. - Added the ability to recover `StreamingQuery.id` from checkpoint location, by writing the id to `checkpointLoc/metadata`. - Added `StreamingQuery.runId` which is unique for every query started and does not persist across restarts. This is to identify each restart of a query separately (same as earlier behavior of `id`). - Removed auto-generation of `StreamingQuery.name`. The purpose of name was to have the ability to define an identifier across restarts, but since id is precisely that, there is no need for a auto-generated name. This means name becomes purely cosmetic, and is null by default. - Added `runId` to `StreamingQueryListener` events and `StreamingQueryProgress`. Implementation details - Renamed existing `StreamExecutionMetadata` to `OffsetSeqMetadata`, and moved it to the file `OffsetSeq.scala`, because that is what this metadata is tied to. Also did some refactoring to make the code cleaner (got rid of a lot of `.json` and `.getOrElse("{}")`). - Added the `id` as the new `StreamMetadata`. - When a StreamingQuery is created it gets or writes the `StreamMetadata` from `checkpointLoc/metadata`. - All internal logging in `StreamExecution` uses `(name, id, runId)` instead of just `name` TODO - [x] Test handling of name=null in json generation of StreamingQueryProgress - [x] Test handling of name=null in json generation of StreamingQueryListener events - [x] Test python API of runId ## How was this patch tested? Updated unit tests and new unit tests Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #16113 from tdas/SPARK-18657.
-rw-r--r--project/MimaExcludes.scala3
-rw-r--r--python/pyspark/sql/streaming.py19
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala27
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala105
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala88
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala19
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala25
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala7
-rw-r--r--sql/core/src/test/resources/structured-streaming/query-metadata-logs-version-2.1.0.txt3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala13
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetadataSuite.scala55
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala35
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala46
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala78
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala100
19 files changed, 466 insertions, 177 deletions
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index f3e5a21d77..82d50f9891 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -108,6 +108,9 @@ object MimaExcludes {
// [SPARK-18236] Reduce duplicate objects in Spark UI and HistoryServer
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.scheduler.TaskInfo.accumulables"),
+ // [SPARK-18657] Add StreamingQuery.runId
+ ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.runId"),
+
// [SPARK-18694] Add StreamingQuery.explain and exception to Python and fix StreamingQueryException
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryException$"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.startOffset"),
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 4a7d17ba51..ee7a26d00d 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -51,14 +51,29 @@ class StreamingQuery(object):
@property
@since(2.0)
def id(self):
- """The id of the streaming query.
+ """Returns the unique id of this query that persists across restarts from checkpoint data.
+ That is, this id is generated when a query is started for the first time, and
+ will be the same every time it is restarted from checkpoint data.
+ There can only be one query with the same id active in a Spark cluster.
+ Also see, `runId`.
"""
return self._jsq.id().toString()
@property
+ @since(2.1)
+ def runId(self):
+ """Returns the unique id of this query that does not persist across restarts. That is, every
+ query that is started (or restarted from checkpoint) will have a different runId.
+ """
+ return self._jsq.runId().toString()
+
+ @property
@since(2.0)
def name(self):
- """The name of the streaming query. This name is unique across all active queries.
+ """Returns the user-specified name of the query, or null if not specified.
+ This name can be specified in the `org.apache.spark.sql.streaming.DataStreamWriter`
+ as `dataframe.writeStream.queryName("query").start()`.
+ This name, if set, must be unique across all active queries.
"""
return self._jsq.name()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
index 7469caeee3..e5a1997d6b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
@@ -17,13 +17,16 @@
package org.apache.spark.sql.execution.streaming
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
/**
* An ordered collection of offsets, used to track the progress of processing data from one or more
* [[Source]]s that are present in a streaming query. This is similar to simplified, single-instance
* vector clock that must progress linearly forward.
*/
-case class OffsetSeq(offsets: Seq[Option[Offset]], metadata: Option[String] = None) {
+case class OffsetSeq(offsets: Seq[Option[Offset]], metadata: Option[OffsetSeqMetadata] = None) {
/**
* Unpacks an offset into [[StreamProgress]] by associating each offset with the order list of
@@ -54,6 +57,26 @@ object OffsetSeq {
* `nulls` in the sequence are converted to `None`s.
*/
def fill(metadata: Option[String], offsets: Offset*): OffsetSeq = {
- OffsetSeq(offsets.map(Option(_)), metadata)
+ OffsetSeq(offsets.map(Option(_)), metadata.map(OffsetSeqMetadata.apply))
}
}
+
+
+/**
+ * Contains metadata associated with a [[OffsetSeq]]. This information is
+ * persisted to the offset log in the checkpoint location via the [[OffsetSeq]] metadata field.
+ *
+ * @param batchWatermarkMs: The current eventTime watermark, used to
+ * bound the lateness of data that will processed. Time unit: milliseconds
+ * @param batchTimestampMs: The current batch processing timestamp.
+ * Time unit: milliseconds
+ */
+case class OffsetSeqMetadata(var batchWatermarkMs: Long = 0, var batchTimestampMs: Long = 0) {
+ def json: String = Serialization.write(this)(OffsetSeqMetadata.format)
+}
+
+object OffsetSeqMetadata {
+ private implicit val format = Serialization.formats(NoTypeHints)
+ def apply(json: String): OffsetSeqMetadata = Serialization.read[OffsetSeqMetadata](json)
+}
+
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
index cc25b4474b..3210d8ad64 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
@@ -74,7 +74,7 @@ class OffsetSeqLog(sparkSession: SparkSession, path: String)
// write metadata
out.write('\n')
- out.write(offsetSeq.metadata.getOrElse("").getBytes(UTF_8))
+ out.write(offsetSeq.metadata.map(_.json).getOrElse("").getBytes(UTF_8))
// write offsets, one per line
offsetSeq.offsets.map(_.map(_.json)).foreach { offset =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index ba77e7c7bf..7d0d086746 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -43,6 +43,7 @@ trait ProgressReporter extends Logging {
// Internal state of the stream, required for computing metrics.
protected def id: UUID
+ protected def runId: UUID
protected def name: String
protected def triggerClock: Clock
protected def logicalPlan: LogicalPlan
@@ -52,7 +53,7 @@ trait ProgressReporter extends Logging {
protected def committedOffsets: StreamProgress
protected def sources: Seq[Source]
protected def sink: Sink
- protected def streamExecutionMetadata: StreamExecutionMetadata
+ protected def offsetSeqMetadata: OffsetSeqMetadata
protected def currentBatchId: Long
protected def sparkSession: SparkSession
@@ -134,11 +135,12 @@ trait ProgressReporter extends Logging {
val newProgress = new StreamingQueryProgress(
id = id,
+ runId = runId,
name = name,
timestamp = currentTriggerStartTimestamp,
batchId = currentBatchId,
durationMs = currentDurationsMs.toMap.mapValues(long2Long).asJava,
- currentWatermark = streamExecutionMetadata.batchWatermarkMs,
+ currentWatermark = offsetSeqMetadata.batchWatermarkMs,
stateOperators = executionStats.stateOperators.toArray,
sources = sourceProgress.toArray,
sink = sinkProgress)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 6b1c01ab2a..083cce8eb5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -25,8 +25,6 @@ import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal
import org.apache.hadoop.fs.Path
-import org.json4s.NoTypeHints
-import org.json4s.jackson.Serialization
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
@@ -58,9 +56,6 @@ class StreamExecution(
import org.apache.spark.sql.streaming.StreamingQueryListener._
- // TODO: restore this from the checkpoint directory.
- override val id: UUID = UUID.randomUUID()
-
private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay
private val noDataProgressEventInterval =
@@ -98,8 +93,30 @@ class StreamExecution(
/** The current batchId or -1 if execution has not yet been initialized. */
protected var currentBatchId: Long = -1
- /** Stream execution metadata */
- protected var streamExecutionMetadata = StreamExecutionMetadata()
+ /** Metadata associated with the whole query */
+ protected val streamMetadata: StreamMetadata = {
+ val metadataPath = new Path(checkpointFile("metadata"))
+ val hadoopConf = sparkSession.sessionState.newHadoopConf()
+ StreamMetadata.read(metadataPath, hadoopConf).getOrElse {
+ val newMetadata = new StreamMetadata(UUID.randomUUID.toString)
+ StreamMetadata.write(newMetadata, metadataPath, hadoopConf)
+ newMetadata
+ }
+ }
+
+ /** Metadata associated with the offset seq of a batch in the query. */
+ protected var offsetSeqMetadata = OffsetSeqMetadata()
+
+ override val id: UUID = UUID.fromString(streamMetadata.id)
+
+ override val runId: UUID = UUID.randomUUID
+
+ /**
+ * Pretty identified string of printing in logs. Format is
+ * If name is set "queryName [id = xyz, runId = abc]" else "[id = xyz, runId = abc]"
+ */
+ private val prettyIdString =
+ Option(name).map(_ + " ").getOrElse("") + s"[id = $id, runId = $runId]"
/** All stream sources present in the query plan. */
protected val sources =
@@ -128,8 +145,9 @@ class StreamExecution(
/* Get the call site in the caller thread; will pass this into the micro batch thread */
private val callSite = Utils.getCallSite()
- /** Used to report metrics to coda-hale. */
- lazy val streamMetrics = new MetricsReporter(this, s"spark.streaming.$name")
+ /** Used to report metrics to coda-hale. This uses id for easier tracking across restarts. */
+ lazy val streamMetrics = new MetricsReporter(
+ this, s"spark.streaming.${Option(name).getOrElse(id)}")
/**
* The thread that runs the micro-batches of this stream. Note that this thread must be
@@ -137,7 +155,7 @@ class StreamExecution(
* [[HDFSMetadataLog]]. See SPARK-14131 for more details.
*/
val microBatchThread =
- new StreamExecutionThread(s"stream execution thread for $name") {
+ new StreamExecutionThread(s"stream execution thread for $prettyIdString") {
override def run(): Unit = {
// To fix call site like "run at <unknown>:0", we bridge the call site from the caller
// thread to this micro batch thread
@@ -191,7 +209,7 @@ class StreamExecution(
sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics)
}
- postEvent(new QueryStartedEvent(id, name)) // Assumption: Does not throw exception.
+ postEvent(new QueryStartedEvent(id, runId, name)) // Assumption: Does not throw exception.
// Unblock starting thread
startLatch.countDown()
@@ -261,10 +279,10 @@ class StreamExecution(
case e: Throwable =>
streamDeathCause = new StreamingQueryException(
this,
- s"Query $name terminated with exception: ${e.getMessage}",
+ s"Query $prettyIdString terminated with exception: ${e.getMessage}",
e,
- committedOffsets.toOffsetSeq(sources, streamExecutionMetadata.json).toString,
- availableOffsets.toOffsetSeq(sources, streamExecutionMetadata.json).toString)
+ committedOffsets.toOffsetSeq(sources, offsetSeqMetadata).toString,
+ availableOffsets.toOffsetSeq(sources, offsetSeqMetadata).toString)
logError(s"Query $name terminated with error", e)
updateStatusMessage(s"Terminated with exception: ${e.getMessage}")
// Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to
@@ -282,7 +300,7 @@ class StreamExecution(
// Notify others
sparkSession.streams.notifyQueryTermination(StreamExecution.this)
postEvent(
- new QueryTerminatedEvent(id, exception.map(_.cause).map(Utils.exceptionString)))
+ new QueryTerminatedEvent(id, runId, exception.map(_.cause).map(Utils.exceptionString)))
terminationLatch.countDown()
}
}
@@ -301,9 +319,9 @@ class StreamExecution(
logInfo(s"Resuming streaming query, starting with batch $batchId")
currentBatchId = batchId
availableOffsets = nextOffsets.toStreamProgress(sources)
- streamExecutionMetadata = StreamExecutionMetadata(nextOffsets.metadata.getOrElse("{}"))
+ offsetSeqMetadata = nextOffsets.metadata.getOrElse(OffsetSeqMetadata())
logDebug(s"Found possibly unprocessed offsets $availableOffsets " +
- s"at batch timestamp ${streamExecutionMetadata.batchTimestampMs}")
+ s"at batch timestamp ${offsetSeqMetadata.batchTimestampMs}")
offsetLog.get(batchId - 1).foreach {
case lastOffsets =>
@@ -359,15 +377,15 @@ class StreamExecution(
}
if (hasNewData) {
// Current batch timestamp in milliseconds
- streamExecutionMetadata.batchTimestampMs = triggerClock.getTimeMillis()
+ offsetSeqMetadata.batchTimestampMs = triggerClock.getTimeMillis()
updateStatusMessage("Writing offsets to log")
reportTimeTaken("walCommit") {
assert(offsetLog.add(
currentBatchId,
- availableOffsets.toOffsetSeq(sources, streamExecutionMetadata.json)),
+ availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)),
s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
logInfo(s"Committed offsets for batch $currentBatchId. " +
- s"Metadata ${streamExecutionMetadata.toString}")
+ s"Metadata ${offsetSeqMetadata.toString}")
// NOTE: The following code is correct because runBatches() processes exactly one
// batch at a time. If we add pipeline parallelism (multiple batches in flight at
@@ -437,21 +455,21 @@ class StreamExecution(
val triggerLogicalPlan = withNewSources transformAllExpressions {
case a: Attribute if replacementMap.contains(a) => replacementMap(a)
case ct: CurrentTimestamp =>
- CurrentBatchTimestamp(streamExecutionMetadata.batchTimestampMs,
+ CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
ct.dataType)
case cd: CurrentDate =>
- CurrentBatchTimestamp(streamExecutionMetadata.batchTimestampMs,
+ CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
cd.dataType)
}
- val executedPlan = reportTimeTaken("queryPlanning") {
+ reportTimeTaken("queryPlanning") {
lastExecution = new IncrementalExecution(
sparkSession,
triggerLogicalPlan,
outputMode,
checkpointFile("state"),
currentBatchId,
- streamExecutionMetadata.batchWatermarkMs)
+ offsetSeqMetadata.batchWatermarkMs)
lastExecution.executedPlan // Force the lazy generation of execution plan
}
@@ -468,12 +486,12 @@ class StreamExecution(
logTrace(s"Maximum observed eventTime: ${e.maxEventTime.value}")
(e.maxEventTime.value / 1000) - e.delay.milliseconds()
}.headOption.foreach { newWatermark =>
- if (newWatermark > streamExecutionMetadata.batchWatermarkMs) {
+ if (newWatermark > offsetSeqMetadata.batchWatermarkMs) {
logInfo(s"Updating eventTime watermark to: $newWatermark ms")
- streamExecutionMetadata.batchWatermarkMs = newWatermark
+ offsetSeqMetadata.batchWatermarkMs = newWatermark
} else {
logTrace(s"Event time didn't move: $newWatermark < " +
- s"$streamExecutionMetadata.currentEventTimeWatermark")
+ s"$offsetSeqMetadata.currentEventTimeWatermark")
}
}
@@ -503,7 +521,7 @@ class StreamExecution(
microBatchThread.join()
}
uniqueSources.foreach(_.stop())
- logInfo(s"Query $name was stopped")
+ logInfo(s"Query $prettyIdString was stopped")
}
/**
@@ -594,7 +612,7 @@ class StreamExecution(
override def explain(): Unit = explain(extended = false)
override def toString: String = {
- s"Streaming Query - $name [state = $state]"
+ s"Streaming Query $prettyIdString [state = $state]"
}
def toDebugString: String = {
@@ -603,7 +621,7 @@ class StreamExecution(
} else ""
s"""
|=== Streaming Query ===
- |Name: $name
+ |Identifier: $prettyIdString
|Current Offsets: $committedOffsets
|
|Current State: $state
@@ -622,33 +640,6 @@ class StreamExecution(
case object TERMINATED extends State
}
-/**
- * Contains metadata associated with a stream execution. This information is
- * persisted to the offset log via the OffsetSeq metadata field. Current
- * information contained in this object includes:
- *
- * @param batchWatermarkMs: The current eventTime watermark, used to
- * bound the lateness of data that will processed. Time unit: milliseconds
- * @param batchTimestampMs: The current batch processing timestamp.
- * Time unit: milliseconds
- */
-case class StreamExecutionMetadata(
- var batchWatermarkMs: Long = 0,
- var batchTimestampMs: Long = 0) {
- private implicit val formats = StreamExecutionMetadata.formats
-
- /**
- * JSON string representation of this object.
- */
- def json: String = Serialization.write(this)
-}
-
-object StreamExecutionMetadata {
- private implicit val formats = Serialization.formats(NoTypeHints)
-
- def apply(json: String): StreamExecutionMetadata =
- Serialization.read[StreamExecutionMetadata](json)
-}
/**
* A special thread to run the stream query. Some codes require to run in the StreamExecutionThread
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
new file mode 100644
index 0000000000..7807c9fae8
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{InputStreamReader, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+
+import scala.util.control.NonFatal
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, FSDataOutputStream, Path}
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.streaming.StreamingQuery
+
+/**
+ * Contains metadata associated with a [[StreamingQuery]]. This information is written
+ * in the checkpoint location the first time a query is started and recovered every time the query
+ * is restarted.
+ *
+ * @param id unique id of the [[StreamingQuery]] that needs to be persisted across restarts
+ */
+case class StreamMetadata(id: String) {
+ def json: String = Serialization.write(this)(StreamMetadata.format)
+}
+
+object StreamMetadata extends Logging {
+ implicit val format = Serialization.formats(NoTypeHints)
+
+ /** Read the metadata from file if it exists */
+ def read(metadataFile: Path, hadoopConf: Configuration): Option[StreamMetadata] = {
+ val fs = FileSystem.get(hadoopConf)
+ if (fs.exists(metadataFile)) {
+ var input: FSDataInputStream = null
+ try {
+ input = fs.open(metadataFile)
+ val reader = new InputStreamReader(input, StandardCharsets.UTF_8)
+ val metadata = Serialization.read[StreamMetadata](reader)
+ Some(metadata)
+ } catch {
+ case NonFatal(e) =>
+ logError(s"Error reading stream metadata from $metadataFile", e)
+ throw e
+ } finally {
+ IOUtils.closeQuietly(input)
+ }
+ } else None
+ }
+
+ /** Write metadata to file */
+ def write(
+ metadata: StreamMetadata,
+ metadataFile: Path,
+ hadoopConf: Configuration): Unit = {
+ var output: FSDataOutputStream = null
+ try {
+ val fs = FileSystem.get(hadoopConf)
+ output = fs.create(metadataFile)
+ val writer = new OutputStreamWriter(output)
+ Serialization.write(metadata, writer)
+ writer.close()
+ } catch {
+ case NonFatal(e) =>
+ logError(s"Error writing stream metadata $metadata to $metadataFile", e)
+ throw e
+ } finally {
+ IOUtils.closeQuietly(output)
+ }
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
index 21b8750ca9..a3f3662e6f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
@@ -26,7 +26,7 @@ class StreamProgress(
val baseMap: immutable.Map[Source, Offset] = new immutable.HashMap[Source, Offset])
extends scala.collection.immutable.Map[Source, Offset] {
- def toOffsetSeq(source: Seq[Source], metadata: String): OffsetSeq = {
+ def toOffsetSeq(source: Seq[Source], metadata: OffsetSeqMetadata): OffsetSeq = {
OffsetSeq(source.map(get), Some(metadata))
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
index 8fc4e43b6d..1794e75462 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
@@ -32,21 +32,32 @@ import org.apache.spark.sql.SparkSession
trait StreamingQuery {
/**
- * Returns the name of the query. This name is unique across all active queries. This can be
- * set in the `org.apache.spark.sql.streaming.DataStreamWriter` as
- * `dataframe.writeStream.queryName("query").start()`.
+ * Returns the user-specified name of the query, or null if not specified.
+ * This name can be specified in the `org.apache.spark.sql.streaming.DataStreamWriter`
+ * as `dataframe.writeStream.queryName("query").start()`.
+ * This name, if set, must be unique across all active queries.
*
* @since 2.0.0
*/
def name: String
/**
- * Returns the unique id of this query.
+ * Returns the unique id of this query that persists across restarts from checkpoint data.
+ * That is, this id is generated when a query is started for the first time, and
+ * will be the same every time it is restarted from checkpoint data. Also see [[runId]].
+ *
* @since 2.1.0
*/
def id: UUID
/**
+ * Returns the unique id of this run of the query. That is, every start/restart of a query will
+ * generated a unique runId. Therefore, every time a query is restarted from
+ * checkpoint, it will have the same [[id]] but different [[runId]]s.
+ */
+ def runId: UUID
+
+ /**
* Returns the `SparkSession` associated with `this`.
*
* @since 2.0.0
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
index d9ee75c064..6fc859d88d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
@@ -86,7 +86,10 @@ object StreamingQueryListener {
* @since 2.1.0
*/
@Experimental
- class QueryStartedEvent private[sql](val id: UUID, val name: String) extends Event
+ class QueryStartedEvent private[sql](
+ val id: UUID,
+ val runId: UUID,
+ val name: String) extends Event
/**
* :: Experimental ::
@@ -106,5 +109,8 @@ object StreamingQueryListener {
* @since 2.1.0
*/
@Experimental
- class QueryTerminatedEvent private[sql](val id: UUID, val exception: Option[String]) extends Event
+ class QueryTerminatedEvent private[sql](
+ val id: UUID,
+ val runId: UUID,
+ val exception: Option[String]) extends Event
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index c448468bea..c6ab41655f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -207,10 +207,14 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
trigger: Trigger = ProcessingTime(0),
triggerClock: Clock = new SystemClock()): StreamingQuery = {
activeQueriesLock.synchronized {
- val name = userSpecifiedName.getOrElse(s"query-${StreamingQueryManager.nextId}")
- if (activeQueries.values.exists(_.name == name)) {
- throw new IllegalArgumentException(
- s"Cannot start query with name $name as a query with that name is already active")
+ val name = userSpecifiedName match {
+ case Some(n) =>
+ if (activeQueries.values.exists(_.name == userSpecifiedName.get)) {
+ throw new IllegalArgumentException(
+ s"Cannot start query with name $n as a query with that name is already active")
+ }
+ n
+ case None => null
}
val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified =>
new Path(userSpecified).toUri.toString
@@ -268,6 +272,14 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
trigger,
triggerClock,
outputMode)
+
+ if (activeQueries.values.exists(_.id == query.id)) {
+ throw new IllegalStateException(
+ s"Cannot start query with id ${query.id} as another query with same id is " +
+ s"already active. Perhaps you are attempting to restart a query from checkpoint" +
+ s"that is already active.")
+ }
+
query.start()
activeQueries.put(query.id, query)
query
@@ -287,8 +299,3 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
}
}
}
-
-private object StreamingQueryManager {
- private val _nextId = new AtomicLong(0)
- private def nextId: Long = _nextId.getAndIncrement()
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
index fb5bad0123..f768080f5d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -57,8 +57,9 @@ class StateOperatorProgress private[sql](
* a trigger. Each event relates to processing done for a single trigger of the streaming
* query. Events are emitted even when no new data is available to be processed.
*
- * @param id A unique id of the query.
- * @param name Name of the query. This name is unique across all active queries.
+ * @param id An unique query id that persists across restarts. See `StreamingQuery.id()`.
+ * @param runId A query id that is unique for every start/restart. See `StreamingQuery.runId()`.
+ * @param name User-specified name of the query, null if not specified.
* @param timestamp Timestamp (ms) of the beginning of the trigger.
* @param batchId A unique id for the current batch of data being processed. Note that in the
* case of retries after a failure a given batchId my be executed more than once.
@@ -73,6 +74,7 @@ class StateOperatorProgress private[sql](
@Experimental
class StreamingQueryProgress private[sql](
val id: UUID,
+ val runId: UUID,
val name: String,
val timestamp: Long,
val batchId: Long,
@@ -105,6 +107,7 @@ class StreamingQueryProgress private[sql](
}
("id" -> JString(id.toString)) ~
+ ("runId" -> JString(runId.toString)) ~
("name" -> JString(name)) ~
("timestamp" -> JInt(timestamp)) ~
("numInputRows" -> JInt(numInputRows)) ~
diff --git a/sql/core/src/test/resources/structured-streaming/query-metadata-logs-version-2.1.0.txt b/sql/core/src/test/resources/structured-streaming/query-metadata-logs-version-2.1.0.txt
new file mode 100644
index 0000000000..79613e2362
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/query-metadata-logs-version-2.1.0.txt
@@ -0,0 +1,3 @@
+{
+ "id": "d366a8bf-db79-42ca-b5a4-d9ca0a11d63e"
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
index 3afd11fa46..d3a83ea0b9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
@@ -27,10 +27,19 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext {
/** test string offset type */
case class StringOffset(override val json: String) extends Offset
- testWithUninterruptibleThread("serialization - deserialization") {
+ test("OffsetSeqMetadata - deserialization") {
+ assert(OffsetSeqMetadata(0, 0) === OffsetSeqMetadata("""{}"""))
+ assert(OffsetSeqMetadata(1, 0) === OffsetSeqMetadata("""{"batchWatermarkMs":1}"""))
+ assert(OffsetSeqMetadata(0, 2) === OffsetSeqMetadata("""{"batchTimestampMs":2}"""))
+ assert(
+ OffsetSeqMetadata(1, 2) ===
+ OffsetSeqMetadata("""{"batchWatermarkMs":1,"batchTimestampMs":2}"""))
+ }
+
+ testWithUninterruptibleThread("OffsetSeqLog - serialization - deserialization") {
withTempDir { temp =>
val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir
- val metadataLog = new OffsetSeqLog(spark, dir.getAbsolutePath)
+ val metadataLog = new OffsetSeqLog(spark, dir.getAbsolutePath)
val batch0 = OffsetSeq.fill(LongOffset(0), LongOffset(1), LongOffset(2))
val batch1 = OffsetSeq.fill(StringOffset("one"), StringOffset("two"), StringOffset("three"))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetadataSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetadataSuite.scala
new file mode 100644
index 0000000000..87f8004ab9
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetadataSuite.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.File
+import java.util.UUID
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.streaming.StreamTest
+
+class StreamMetadataSuite extends StreamTest {
+
+ test("writing and reading") {
+ withTempDir { dir =>
+ val id = UUID.randomUUID.toString
+ val metadata = StreamMetadata(id)
+ val file = new Path(new File(dir, "test").toString)
+ StreamMetadata.write(metadata, file, hadoopConf)
+ val readMetadata = StreamMetadata.read(file, hadoopConf)
+ assert(readMetadata.nonEmpty)
+ assert(readMetadata.get.id === id)
+ }
+ }
+
+ test("read Spark 2.1.0 format") {
+ // query-metadata-logs-version-2.1.0.txt has the execution metadata generated by Spark 2.1.0
+ assert(
+ readForResource("query-metadata-logs-version-2.1.0.txt") ===
+ StreamMetadata("d366a8bf-db79-42ca-b5a4-d9ca0a11d63e"))
+ }
+
+ private def readForResource(fileName: String): StreamMetadata = {
+ val input = getClass.getResource(s"/structured-streaming/$fileName")
+ StreamMetadata.read(new Path(input.toString), hadoopConf).get
+ }
+
+ private val hadoopConf = new Configuration()
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala
deleted file mode 100644
index c7139c588d..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming
-
-import org.apache.spark.sql.execution.streaming.StreamExecutionMetadata
-
-class StreamExecutionMetadataSuite extends StreamTest {
-
- test("stream execution metadata") {
- assert(StreamExecutionMetadata(0, 0) ===
- StreamExecutionMetadata("""{}"""))
- assert(StreamExecutionMetadata(1, 0) ===
- StreamExecutionMetadata("""{"batchWatermarkMs":1}"""))
- assert(StreamExecutionMetadata(0, 2) ===
- StreamExecutionMetadata("""{"batchTimestampMs":2}"""))
- assert(StreamExecutionMetadata(1, 2) ===
- StreamExecutionMetadata(
- """{"batchWatermarkMs":1,"batchTimestampMs":2}"""))
- }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 3086abf03c..a38c05eed5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -69,6 +69,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
AssertOnQuery { query =>
assert(listener.startEvent !== null)
assert(listener.startEvent.id === query.id)
+ assert(listener.startEvent.runId === query.runId)
assert(listener.startEvent.name === query.name)
assert(listener.progressEvents.isEmpty)
assert(listener.terminationEvent === null)
@@ -92,6 +93,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
eventually(Timeout(streamingTimeout)) {
assert(listener.terminationEvent !== null)
assert(listener.terminationEvent.id === query.id)
+ assert(listener.terminationEvent.runId === query.runId)
assert(listener.terminationEvent.exception === None)
}
listener.checkAsyncErrors()
@@ -167,30 +169,40 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
}
test("QueryStartedEvent serialization") {
- val queryStarted = new StreamingQueryListener.QueryStartedEvent(UUID.randomUUID(), "name")
- val json = JsonProtocol.sparkEventToJson(queryStarted)
- val newQueryStarted = JsonProtocol.sparkEventFromJson(json)
- .asInstanceOf[StreamingQueryListener.QueryStartedEvent]
+ def testSerialization(event: QueryStartedEvent): Unit = {
+ val json = JsonProtocol.sparkEventToJson(event)
+ val newEvent = JsonProtocol.sparkEventFromJson(json).asInstanceOf[QueryStartedEvent]
+ assert(newEvent.id === event.id)
+ assert(newEvent.runId === event.runId)
+ assert(newEvent.name === event.name)
+ }
+
+ testSerialization(new QueryStartedEvent(UUID.randomUUID, UUID.randomUUID, "name"))
+ testSerialization(new QueryStartedEvent(UUID.randomUUID, UUID.randomUUID, null))
}
test("QueryProgressEvent serialization") {
- val event = new StreamingQueryListener.QueryProgressEvent(
- StreamingQueryStatusAndProgressSuite.testProgress)
- val json = JsonProtocol.sparkEventToJson(event)
- val newEvent = JsonProtocol.sparkEventFromJson(json)
- .asInstanceOf[StreamingQueryListener.QueryProgressEvent]
- assert(event.progress.json === newEvent.progress.json)
+ def testSerialization(event: QueryProgressEvent): Unit = {
+ val json = JsonProtocol.sparkEventToJson(event)
+ val newEvent = JsonProtocol.sparkEventFromJson(json).asInstanceOf[QueryProgressEvent]
+ assert(newEvent.progress.json === event.progress.json) // json as a proxy for equality
+ }
+ testSerialization(new QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress1))
+ testSerialization(new QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress2))
}
test("QueryTerminatedEvent serialization") {
+ def testSerialization(event: QueryTerminatedEvent): Unit = {
+ val json = JsonProtocol.sparkEventToJson(event)
+ val newEvent = JsonProtocol.sparkEventFromJson(json).asInstanceOf[QueryTerminatedEvent]
+ assert(newEvent.id === event.id)
+ assert(newEvent.runId === event.runId)
+ assert(newEvent.exception === event.exception)
+ }
+
val exception = new RuntimeException("exception")
- val queryQueryTerminated = new StreamingQueryListener.QueryTerminatedEvent(
- UUID.randomUUID, Some(exception.getMessage))
- val json = JsonProtocol.sparkEventToJson(queryQueryTerminated)
- val newQueryTerminated = JsonProtocol.sparkEventFromJson(json)
- .asInstanceOf[StreamingQueryListener.QueryTerminatedEvent]
- assert(queryQueryTerminated.id === newQueryTerminated.id)
- assert(queryQueryTerminated.exception === newQueryTerminated.exception)
+ testSerialization(
+ new QueryTerminatedEvent(UUID.randomUUID, UUID.randomUUID, Some(exception.getMessage)))
}
test("only one progress event per interval when no data") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
index 4da712fa0f..96f19db1a9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
@@ -31,12 +31,13 @@ import org.apache.spark.sql.streaming.StreamingQueryStatusAndProgressSuite._
class StreamingQueryStatusAndProgressSuite extends SparkFunSuite {
test("StreamingQueryProgress - prettyJson") {
- val json = testProgress.prettyJson
- assert(json ===
+ val json1 = testProgress1.prettyJson
+ assert(json1 ===
s"""
|{
- | "id" : "${testProgress.id.toString}",
- | "name" : "name",
+ | "id" : "${testProgress1.id.toString}",
+ | "runId" : "${testProgress1.runId.toString}",
+ | "name" : "myName",
| "timestamp" : 1,
| "numInputRows" : 678,
| "inputRowsPerSecond" : 10.0,
@@ -60,16 +61,48 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite {
| }
|}
""".stripMargin.trim)
- assert(compact(parse(json)) === testProgress.json)
-
+ assert(compact(parse(json1)) === testProgress1.json)
+
+ val json2 = testProgress2.prettyJson
+ assert(
+ json2 ===
+ s"""
+ |{
+ | "id" : "${testProgress2.id.toString}",
+ | "runId" : "${testProgress2.runId.toString}",
+ | "name" : null,
+ | "timestamp" : 1,
+ | "numInputRows" : 678,
+ | "durationMs" : {
+ | "total" : 0
+ | },
+ | "currentWatermark" : 3,
+ | "stateOperators" : [ {
+ | "numRowsTotal" : 0,
+ | "numRowsUpdated" : 1
+ | } ],
+ | "sources" : [ {
+ | "description" : "source",
+ | "startOffset" : 123,
+ | "endOffset" : 456,
+ | "numInputRows" : 678
+ | } ],
+ | "sink" : {
+ | "description" : "sink"
+ | }
+ |}
+ """.stripMargin.trim)
+ assert(compact(parse(json2)) === testProgress2.json)
}
test("StreamingQueryProgress - json") {
- assert(compact(parse(testProgress.json)) === testProgress.json)
+ assert(compact(parse(testProgress1.json)) === testProgress1.json)
+ assert(compact(parse(testProgress2.json)) === testProgress2.json)
}
test("StreamingQueryProgress - toString") {
- assert(testProgress.toString === testProgress.prettyJson)
+ assert(testProgress1.toString === testProgress1.prettyJson)
+ assert(testProgress2.toString === testProgress2.prettyJson)
}
test("StreamingQueryStatus - prettyJson") {
@@ -94,9 +127,10 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite {
}
object StreamingQueryStatusAndProgressSuite {
- val testProgress = new StreamingQueryProgress(
- id = UUID.randomUUID(),
- name = "name",
+ val testProgress1 = new StreamingQueryProgress(
+ id = UUID.randomUUID,
+ runId = UUID.randomUUID,
+ name = "myName",
timestamp = 1L,
batchId = 2L,
durationMs = Map("total" -> 0L).mapValues(long2Long).asJava,
@@ -115,6 +149,28 @@ object StreamingQueryStatusAndProgressSuite {
sink = new SinkProgress("sink")
)
+ val testProgress2 = new StreamingQueryProgress(
+ id = UUID.randomUUID,
+ runId = UUID.randomUUID,
+ name = null, // should not be present in the json
+ timestamp = 1L,
+ batchId = 2L,
+ durationMs = Map("total" -> 0L).mapValues(long2Long).asJava,
+ currentWatermark = 3L,
+ stateOperators = Array(new StateOperatorProgress(numRowsTotal = 0, numRowsUpdated = 1)),
+ sources = Array(
+ new SourceProgress(
+ description = "source",
+ startOffset = "123",
+ endOffset = "456",
+ numInputRows = 678,
+ inputRowsPerSecond = Double.NaN, // should not be present in the json
+ processedRowsPerSecond = Double.NegativeInfinity // should not be present in the json
+ )
+ ),
+ sink = new SinkProgress("sink")
+ )
+
val testStatus = new StreamingQueryStatus("active", true, false)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index f7fc19494d..893cb762c6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.streaming
+import org.apache.commons.lang3.RandomStringUtils
import org.scalactic.TolerantNumerics
import org.scalatest.concurrent.Eventually._
import org.scalatest.BeforeAndAfter
@@ -28,7 +29,7 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.SparkException
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions._
-import org.apache.spark.util.{ManualClock, Utils}
+import org.apache.spark.util.ManualClock
class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
@@ -43,38 +44,77 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
sqlContext.streams.active.foreach(_.stop())
}
- test("names unique across active queries, ids unique across all started queries") {
- val inputData = MemoryStream[Int]
- val mapped = inputData.toDS().map { 6 / _}
+ test("name unique in active queries") {
+ withTempDir { dir =>
+ def startQuery(name: Option[String]): StreamingQuery = {
+ val writer = MemoryStream[Int].toDS.writeStream
+ name.foreach(writer.queryName)
+ writer
+ .foreach(new TestForeachWriter)
+ .start()
+ }
- def startQuery(queryName: String): StreamingQuery = {
- val metadataRoot = Utils.createTempDir(namePrefix = "streaming.checkpoint").getCanonicalPath
- val writer = mapped.writeStream
- writer
- .queryName(queryName)
- .format("memory")
- .option("checkpointLocation", metadataRoot)
- .start()
- }
+ // No name by default, multiple active queries can have no name
+ val q1 = startQuery(name = None)
+ assert(q1.name === null)
+ val q2 = startQuery(name = None)
+ assert(q2.name === null)
+
+ // Can be set by user
+ val q3 = startQuery(name = Some("q3"))
+ assert(q3.name === "q3")
- val q1 = startQuery("q1")
- assert(q1.name === "q1")
+ // Multiple active queries cannot have same name
+ val e = intercept[IllegalArgumentException] {
+ startQuery(name = Some("q3"))
+ }
- // Verify that another query with same name cannot be started
- val e1 = intercept[IllegalArgumentException] {
- startQuery("q1")
+ q1.stop()
+ q2.stop()
+ q3.stop()
}
- Seq("q1", "already active").foreach { s => assert(e1.getMessage.contains(s)) }
+ }
- // Verify q1 was unaffected by the above exception and stop it
- assert(q1.isActive)
- q1.stop()
+ test(
+ "id unique in active queries + persists across restarts, runId unique across start/restarts") {
+ val inputData = MemoryStream[Int]
+ withTempDir { dir =>
+ var cpDir: String = null
+
+ def startQuery(restart: Boolean): StreamingQuery = {
+ if (cpDir == null || !restart) cpDir = s"$dir/${RandomStringUtils.randomAlphabetic(10)}"
+ MemoryStream[Int].toDS().groupBy().count()
+ .writeStream
+ .format("memory")
+ .outputMode("complete")
+ .queryName(s"name${RandomStringUtils.randomAlphabetic(10)}")
+ .option("checkpointLocation", cpDir)
+ .start()
+ }
- // Verify another query can be started with name q1, but will have different id
- val q2 = startQuery("q1")
- assert(q2.name === "q1")
- assert(q2.id !== q1.id)
- q2.stop()
+ // id and runId unique for new queries
+ val q1 = startQuery(restart = false)
+ val q2 = startQuery(restart = false)
+ assert(q1.id !== q2.id)
+ assert(q1.runId !== q2.runId)
+ q1.stop()
+ q2.stop()
+
+ // id persists across restarts, runId unique across restarts
+ val q3 = startQuery(restart = false)
+ q3.stop()
+
+ val q4 = startQuery(restart = true)
+ q4.stop()
+ assert(q3.id === q3.id)
+ assert(q3.runId !== q4.runId)
+
+ // Only one query with same id can be active
+ val q5 = startQuery(restart = false)
+ val e = intercept[IllegalStateException] {
+ startQuery(restart = true)
+ }
+ }
}
testQuietly("isActive, exception, and awaitTermination") {
@@ -105,9 +145,9 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
TestAwaitTermination(ExpectException[SparkException], timeoutMs = 10),
AssertOnQuery(q => {
q.exception.get.startOffset ===
- q.committedOffsets.toOffsetSeq(Seq(inputData), "{}").toString &&
+ q.committedOffsets.toOffsetSeq(Seq(inputData), OffsetSeqMetadata()).toString &&
q.exception.get.endOffset ===
- q.availableOffsets.toOffsetSeq(Seq(inputData), "{}").toString
+ q.availableOffsets.toOffsetSeq(Seq(inputData), OffsetSeqMetadata()).toString
}, "incorrect start offset or end offset on exception")
)
}
@@ -274,7 +314,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
/** Whether metrics of a query is registered for reporting */
def isMetricsRegistered(query: StreamingQuery): Boolean = {
- val sourceName = s"spark.streaming.${query.name}"
+ val sourceName = s"spark.streaming.${query.id}"
val sources = spark.sparkContext.env.metricsSystem.getSourcesByName(sourceName)
require(sources.size <= 1)
sources.nonEmpty