aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala69
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala74
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala17
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala29
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala43
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala12
9 files changed, 167 insertions, 101 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index afae0786b7..171b1378e5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -336,34 +336,23 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
assertStreaming("startStream() can only be called on continuous queries")
if (source == "memory") {
- val queryName =
- extraOptions.getOrElse(
- "queryName", throw new AnalysisException("queryName must be specified for memory sink"))
- val checkpointLocation = getCheckpointLocation(queryName, failIfNotSet = false).getOrElse {
- Utils.createTempDir(namePrefix = "memory.stream").getCanonicalPath
- }
-
- // If offsets have already been created, we trying to resume a query.
- val checkpointPath = new Path(checkpointLocation, "offsets")
- val fs = checkpointPath.getFileSystem(df.sparkSession.sessionState.newHadoopConf())
- if (fs.exists(checkpointPath)) {
- throw new AnalysisException(
- s"Unable to resume query written to memory sink. Delete $checkpointPath to start over.")
- } else {
- checkpointPath.toUri.toString
+ if (extraOptions.get("queryName").isEmpty) {
+ throw new AnalysisException("queryName must be specified for memory sink")
}
val sink = new MemorySink(df.schema, outputMode)
val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink))
- resultDf.createOrReplaceTempView(queryName)
- val continuousQuery = df.sparkSession.sessionState.continuousQueryManager.startQuery(
- queryName,
- checkpointLocation,
+ val query = df.sparkSession.sessionState.continuousQueryManager.startQuery(
+ extraOptions.get("queryName"),
+ extraOptions.get("checkpointLocation"),
df,
sink,
outputMode,
- trigger)
- continuousQuery
+ useTempCheckpointLocation = true,
+ recoverFromCheckpointLocation = false,
+ trigger = trigger)
+ resultDf.createOrReplaceTempView(query.name)
+ query
} else {
val dataSource =
DataSource(
@@ -371,14 +360,13 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
className = source,
options = extraOptions.toMap,
partitionColumns = normalizedParCols.getOrElse(Nil))
- val queryName = extraOptions.getOrElse("queryName", StreamExecution.nextName)
df.sparkSession.sessionState.continuousQueryManager.startQuery(
- queryName,
- getCheckpointLocation(queryName, failIfNotSet = true).get,
+ extraOptions.get("queryName"),
+ extraOptions.get("checkpointLocation"),
df,
dataSource.createSink(outputMode),
outputMode,
- trigger)
+ trigger = trigger)
}
}
@@ -437,38 +425,15 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
assertStreaming(
"foreach() can only be called on streaming Datasets/DataFrames.")
- val queryName = extraOptions.getOrElse("queryName", StreamExecution.nextName)
val sink = new ForeachSink[T](ds.sparkSession.sparkContext.clean(writer))(ds.exprEnc)
df.sparkSession.sessionState.continuousQueryManager.startQuery(
- queryName,
- getCheckpointLocation(queryName, failIfNotSet = false).getOrElse {
- Utils.createTempDir(namePrefix = "foreach.stream").getCanonicalPath
- },
+ extraOptions.get("queryName"),
+ extraOptions.get("checkpointLocation"),
df,
sink,
outputMode,
- trigger)
- }
-
- /**
- * Returns the checkpointLocation for a query. If `failIfNotSet` is `true` but the checkpoint
- * location is not set, [[AnalysisException]] will be thrown. If `failIfNotSet` is `false`, `None`
- * will be returned if the checkpoint location is not set.
- */
- private def getCheckpointLocation(queryName: String, failIfNotSet: Boolean): Option[String] = {
- val checkpointLocation = extraOptions.get("checkpointLocation").map { userSpecified =>
- new Path(userSpecified).toUri.toString
- }.orElse {
- df.sparkSession.conf.get(SQLConf.CHECKPOINT_LOCATION).map { location =>
- new Path(location, queryName).toUri.toString
- }
- }
- if (failIfNotSet && checkpointLocation.isEmpty) {
- throw new AnalysisException("checkpointLocation must be specified either " +
- """through option("checkpointLocation", ...) or """ +
- s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", ...)""")
- }
- checkpointLocation
+ useTempCheckpointLocation = true,
+ trigger = trigger)
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 954fc33ecc..5095fe7fca 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.streaming
import java.util.concurrent.{CountDownLatch, TimeUnit}
-import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.locks.ReentrantLock
import scala.collection.mutable.ArrayBuffer
@@ -44,6 +44,7 @@ import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
*/
class StreamExecution(
override val sparkSession: SparkSession,
+ override val id: Long,
override val name: String,
checkpointRoot: String,
private[sql] val logicalPlan: LogicalPlan,
@@ -492,6 +493,7 @@ class StreamExecution(
private def toInfo: ContinuousQueryInfo = {
new ContinuousQueryInfo(
this.name,
+ this.id,
this.sourceStatuses,
this.sinkStatus)
}
@@ -503,7 +505,7 @@ class StreamExecution(
}
private[sql] object StreamExecution {
- private val nextId = new AtomicInteger()
+ private val _nextId = new AtomicLong(0)
- def nextName: String = s"query-${nextId.getAndIncrement}"
+ def nextId: Long = _nextId.getAndIncrement()
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala
index 3bbb0b8a88..1e0a47deca 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala
@@ -30,12 +30,21 @@ import org.apache.spark.sql.SparkSession
trait ContinuousQuery {
/**
- * Returns the name of the query.
+ * Returns the name of the query. This name is unique across all active queries. This can be
+ * set in the[[org.apache.spark.sql.DataFrameWriter DataFrameWriter]] as
+ * `dataframe.write().queryName("query").startStream()`.
* @since 2.0.0
*/
def name: String
/**
+ * Returns the unique id of this query. This id is automatically generated and is unique across
+ * all queries that have been started in the current process.
+ * @since 2.0.0
+ */
+ def id: Long
+
+ /**
* Returns the [[SparkSession]] associated with `this`.
* @since 2.0.0
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala
index 57b718b08f..19f22704ba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala
@@ -23,12 +23,15 @@ import org.apache.spark.annotation.Experimental
* :: Experimental ::
* A class used to report information about the progress of a [[ContinuousQuery]].
*
- * @param name The [[ContinuousQuery]] name.
+ * @param name The [[ContinuousQuery]] name. This name is unique across all active queries.
+ * @param id The [[ContinuousQuery]] id. This id is unique across
+ * all queries that have been started in the current process.
* @param sourceStatuses The current statuses of the [[ContinuousQuery]]'s sources.
* @param sinkStatus The current status of the [[ContinuousQuery]]'s sink.
*/
@Experimental
class ContinuousQueryInfo private[sql](
val name: String,
+ val id: Long,
val sourceStatuses: Seq[SourceStatus],
val sinkStatus: SinkStatus)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala
index 1bfdd2da4e..0f4a9c9975 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala
@@ -19,13 +19,15 @@ package org.apache.spark.sql.streaming
import scala.collection.mutable
+import org.apache.hadoop.fs.Path
+
import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.util.{Clock, SystemClock}
+import org.apache.spark.util.{Clock, SystemClock, Utils}
/**
* :: Experimental ::
@@ -39,7 +41,7 @@ class ContinuousQueryManager private[sql] (sparkSession: SparkSession) {
private[sql] val stateStoreCoordinator =
StateStoreCoordinatorRef.forDriver(sparkSession.sparkContext.env)
private val listenerBus = new ContinuousQueryListenerBus(sparkSession.sparkContext.listenerBus)
- private val activeQueries = new mutable.HashMap[String, ContinuousQuery]
+ private val activeQueries = new mutable.HashMap[Long, ContinuousQuery]
private val activeQueriesLock = new Object
private val awaitTerminationLock = new Object
@@ -55,13 +57,12 @@ class ContinuousQueryManager private[sql] (sparkSession: SparkSession) {
}
/**
- * Returns an active query from this SQLContext or throws exception if bad name
+ * Returns the query if there is an active query with the given id, or null.
*
* @since 2.0.0
*/
- def get(name: String): ContinuousQuery = activeQueriesLock.synchronized {
- activeQueries.getOrElse(name,
- throw new IllegalArgumentException(s"There is no active query with name $name"))
+ def get(id: Long): ContinuousQuery = activeQueriesLock.synchronized {
+ activeQueries.get(id).orNull
}
/**
@@ -168,20 +169,66 @@ class ContinuousQueryManager private[sql] (sparkSession: SparkSession) {
listenerBus.post(event)
}
- /** Start a query */
+ /**
+ * Start a [[ContinuousQuery]].
+ * @param userSpecifiedName Query name optionally specified by the user.
+ * @param userSpecifiedCheckpointLocation Checkpoint location optionally specified by the user.
+ * @param df Streaming DataFrame.
+ * @param sink Sink to write the streaming outputs.
+ * @param outputMode Output mode for the sink.
+ * @param useTempCheckpointLocation Whether to use a temporary checkpoint location when the user
+ * has not specified one. If false, then error will be thrown.
+ * @param recoverFromCheckpointLocation Whether to recover query from the checkpoint location.
+ * If false and the checkpoint location exists, then error
+ * will be thrown.
+ * @param trigger [[Trigger]] for the query.
+ * @param triggerClock [[Clock]] to use for the triggering.
+ */
private[sql] def startQuery(
- name: String,
- checkpointLocation: String,
+ userSpecifiedName: Option[String],
+ userSpecifiedCheckpointLocation: Option[String],
df: DataFrame,
sink: Sink,
outputMode: OutputMode,
+ useTempCheckpointLocation: Boolean = false,
+ recoverFromCheckpointLocation: Boolean = true,
trigger: Trigger = ProcessingTime(0),
triggerClock: Clock = new SystemClock()): ContinuousQuery = {
activeQueriesLock.synchronized {
- if (activeQueries.contains(name)) {
+ val id = StreamExecution.nextId
+ val name = userSpecifiedName.getOrElse(s"query-$id")
+ if (activeQueries.values.exists(_.name == name)) {
throw new IllegalArgumentException(
s"Cannot start query with name $name as a query with that name is already active")
}
+ val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified =>
+ new Path(userSpecified).toUri.toString
+ }.orElse {
+ df.sparkSession.conf.get(SQLConf.CHECKPOINT_LOCATION).map { location =>
+ new Path(location, name).toUri.toString
+ }
+ }.getOrElse {
+ if (useTempCheckpointLocation) {
+ Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
+ } else {
+ throw new AnalysisException(
+ "checkpointLocation must be specified either " +
+ """through option("checkpointLocation", ...) or """ +
+ s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", ...)""")
+ }
+ }
+
+ // If offsets have already been created, we trying to resume a query.
+ if (!recoverFromCheckpointLocation) {
+ val checkpointPath = new Path(checkpointLocation, "offsets")
+ val fs = checkpointPath.getFileSystem(df.sparkSession.sessionState.newHadoopConf())
+ if (fs.exists(checkpointPath)) {
+ throw new AnalysisException(
+ s"This query does not support recovering from checkpoint location. " +
+ s"Delete $checkpointPath to start over.")
+ }
+ }
+
val analyzedPlan = df.queryExecution.analyzed
df.queryExecution.assertAnalyzed()
@@ -203,6 +250,7 @@ class ContinuousQueryManager private[sql] (sparkSession: SparkSession) {
}
val query = new StreamExecution(
sparkSession,
+ id,
name,
checkpointLocation,
logicalPlan,
@@ -211,7 +259,7 @@ class ContinuousQueryManager private[sql] (sparkSession: SparkSession) {
triggerClock,
outputMode)
query.start()
- activeQueries.put(name, query)
+ activeQueries.put(id, query)
query
}
}
@@ -219,7 +267,7 @@ class ContinuousQueryManager private[sql] (sparkSession: SparkSession) {
/** Notify (by the ContinuousQuery) that the query has been terminated */
private[sql] def notifyQueryTermination(terminatedQuery: ContinuousQuery): Unit = {
activeQueriesLock.synchronized {
- activeQueries -= terminatedQuery.name
+ activeQueries -= terminatedQuery.id
}
awaitTerminationLock.synchronized {
if (lastTerminatedQuery == null || terminatedQuery.exception.nonEmpty) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala
index 9b59ab60a6..8e1de09a96 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala
@@ -50,9 +50,11 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter {
withListenerAdded(listener) {
testStream(input.toDS)(
StartStream(),
- Assert("Incorrect query status in onQueryStarted") {
+ AssertOnQuery("Incorrect query status in onQueryStarted") { query =>
val status = listener.startStatus
assert(status != null)
+ assert(status.name === query.name)
+ assert(status.id === query.id)
assert(status.sourceStatuses.size === 1)
assert(status.sourceStatuses(0).description.contains("Memory"))
@@ -67,13 +69,15 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter {
},
AddDataMemory(input, Seq(1, 2, 3)),
CheckAnswer(1, 2, 3),
- Assert("Incorrect query status in onQueryProgress") {
+ AssertOnQuery("Incorrect query status in onQueryProgress") { query =>
eventually(Timeout(streamingTimeout)) {
// There should be only on progress event as batch has been processed
assert(listener.progressStatuses.size === 1)
val status = listener.progressStatuses.peek()
assert(status != null)
+ assert(status.name === query.name)
+ assert(status.id === query.id)
assert(status.sourceStatuses(0).offsetDesc === Some(LongOffset(0).toString))
assert(status.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(0)).toString)
@@ -82,12 +86,16 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter {
}
},
StopStream,
- Assert("Incorrect query status in onQueryTerminated") {
+ AssertOnQuery("Incorrect query status in onQueryTerminated") { query =>
eventually(Timeout(streamingTimeout)) {
val status = listener.terminationStatus
assert(status != null)
+ assert(status.name === query.name)
+ assert(status.id === query.id)
assert(status.sourceStatuses(0).offsetDesc === Some(LongOffset(0).toString))
assert(status.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(0)).toString)
+ assert(listener.terminationStackTrace.isEmpty)
+ assert(listener.terminationException === None)
}
listener.checkAsyncErrors()
}
@@ -161,6 +169,7 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter {
test("QueryStarted serialization") {
val queryStartedInfo = new ContinuousQueryInfo(
"name",
+ 1,
Seq(new SourceStatus("source1", None), new SourceStatus("source2", None)),
new SinkStatus("sink", CompositeOffset(None :: None :: Nil).toString))
val queryStarted = new ContinuousQueryListener.QueryStarted(queryStartedInfo)
@@ -173,6 +182,7 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter {
test("QueryProgress serialization") {
val queryProcessInfo = new ContinuousQueryInfo(
"name",
+ 1,
Seq(
new SourceStatus("source1", Some(LongOffset(0).toString)),
new SourceStatus("source2", Some(LongOffset(1).toString))),
@@ -187,6 +197,7 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter {
test("QueryTerminated serialization") {
val queryTerminatedInfo = new ContinuousQueryInfo(
"name",
+ 1,
Seq(
new SourceStatus("source1", Some(LongOffset(0).toString)),
new SourceStatus("source2", Some(LongOffset(1).toString))),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
index c1e4970b3a..f81608bdb8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
@@ -59,23 +59,15 @@ class ContinuousQueryManagerSuite extends StreamTest with BeforeAndAfter {
assert(spark.streams.active.toSet === queries.toSet)
val (q1, q2, q3) = (queries(0), queries(1), queries(2))
- assert(spark.streams.get(q1.name).eq(q1))
- assert(spark.streams.get(q2.name).eq(q2))
- assert(spark.streams.get(q3.name).eq(q3))
- intercept[IllegalArgumentException] {
- spark.streams.get("non-existent-name")
- }
-
+ assert(spark.streams.get(q1.id).eq(q1))
+ assert(spark.streams.get(q2.id).eq(q2))
+ assert(spark.streams.get(q3.id).eq(q3))
+ assert(spark.streams.get(-1) === null) // non-existent id
q1.stop()
assert(spark.streams.active.toSet === Set(q2, q3))
- val ex1 = withClue("no error while getting non-active query") {
- intercept[IllegalArgumentException] {
- spark.streams.get(q1.name)
- }
- }
- assert(ex1.getMessage.contains(q1.name), "error does not contain name of query to be fetched")
- assert(spark.streams.get(q2.name).eq(q2))
+ assert(spark.streams.get(q1.id) === null)
+ assert(spark.streams.get(q2.id).eq(q2))
m2.addData(0) // q2 should terminate with error
@@ -83,12 +75,7 @@ class ContinuousQueryManagerSuite extends StreamTest with BeforeAndAfter {
require(!q2.isActive)
require(q2.exception.isDefined)
}
- withClue("no error while getting non-active query") {
- intercept[IllegalArgumentException] {
- spark.streams.get(q2.name).eq(q2)
- }
- }
-
+ assert(spark.streams.get(q2.id) === null)
assert(spark.streams.active.toSet === Set(q3))
}
}
@@ -227,7 +214,7 @@ class ContinuousQueryManagerSuite extends StreamTest with BeforeAndAfter {
}
- /** Run a body of code by defining a query each on multiple datasets */
+ /** Run a body of code by defining a query on each dataset */
private def withQueriesOn(datasets: Dataset[_]*)(body: Seq[ContinuousQuery] => Unit): Unit = {
failAfter(streamingTimeout) {
val queries = withClue("Error starting queries") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
index 55424058f5..43a88576cf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
@@ -17,15 +17,56 @@
package org.apache.spark.sql.streaming
+import org.scalatest.BeforeAndAfter
+
import org.apache.spark.SparkException
import org.apache.spark.sql.execution.streaming.{CompositeOffset, LongOffset, MemoryStream, StreamExecution}
+import org.apache.spark.util.Utils
-class ContinuousQuerySuite extends StreamTest {
+class ContinuousQuerySuite extends StreamTest with BeforeAndAfter {
import AwaitTerminationTester._
import testImplicits._
+ after {
+ sqlContext.streams.active.foreach(_.stop())
+ }
+
+ test("names unique across active queries, ids unique across all started queries") {
+ val inputData = MemoryStream[Int]
+ val mapped = inputData.toDS().map { 6 / _}
+
+ def startQuery(queryName: String): ContinuousQuery = {
+ val metadataRoot = Utils.createTempDir(namePrefix = "streaming.checkpoint").getCanonicalPath
+ val writer = mapped.write
+ writer
+ .queryName(queryName)
+ .format("memory")
+ .option("checkpointLocation", metadataRoot)
+ .startStream()
+ }
+
+ val q1 = startQuery("q1")
+ assert(q1.name === "q1")
+
+ // Verify that another query with same name cannot be started
+ val e1 = intercept[IllegalArgumentException] {
+ startQuery("q1")
+ }
+ Seq("q1", "already active").foreach { s => assert(e1.getMessage.contains(s)) }
+
+ // Verify q1 was unaffected by the above exception and stop it
+ assert(q1.isActive)
+ q1.stop()
+
+ // Verify another query can be started with name q1, but will have different id
+ val q2 = startQuery("q1")
+ assert(q2.name === "q1")
+ assert(q2.id !== q1.id)
+ q2.stop()
+ }
+
testQuietly("lifecycle states and awaitTermination") {
val inputData = MemoryStream[Int]
val mapped = inputData.toDS().map { 6 / _}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 7f1e5fe613..cbfa6ff07d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -188,8 +188,8 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
new AssertOnQuery(condition, message)
}
- def apply(message: String)(condition: StreamExecution => Boolean): AssertOnQuery = {
- new AssertOnQuery(condition, message)
+ def apply(message: String)(condition: StreamExecution => Unit): AssertOnQuery = {
+ new AssertOnQuery(s => { condition; true }, message)
}
}
@@ -305,13 +305,13 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
spark
.streams
.startQuery(
- StreamExecution.nextName,
- metadataRoot,
+ None,
+ Some(metadataRoot),
stream,
sink,
outputMode,
- trigger,
- triggerClock)
+ trigger = trigger,
+ triggerClock = triggerClock)
.asInstanceOf[StreamExecution]
currentStream.microBatchThread.setUncaughtExceptionHandler(
new UncaughtExceptionHandler {