aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-06-13 13:44:46 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-06-13 13:44:46 -0700
commitc654ae2140bc184adb407fd02072b653c5359ee5 (patch)
tree9494bc77a4d77c5d2e8388503aebc2477a8d0185
parent5ad4e32d46599ae1b8626f08aa97345d078c28d7 (diff)
downloadspark-c654ae2140bc184adb407fd02072b653c5359ee5.tar.gz
spark-c654ae2140bc184adb407fd02072b653c5359ee5.tar.bz2
spark-c654ae2140bc184adb407fd02072b653c5359ee5.zip
[SPARK-15889][SQL][STREAMING] Add a unique id to ContinuousQuery
## What changes were proposed in this pull request? ContinuousQueries have names that are unique across all the active ones. However, when queries are rapidly restarted with same name, it causes races conditions with the listener. A listener event from a stopped query can arrive after the query has been restarted, leading to complexities in monitoring infrastructure. Along with this change, I have also consolidated all the messy code paths to start queries with different sinks. ## How was this patch tested? Added unit tests, and existing unit tests. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #13613 from tdas/SPARK-15889.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala69
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala74
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala17
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala29
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala43
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala12
9 files changed, 167 insertions, 101 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index afae0786b7..171b1378e5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -336,34 +336,23 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
assertStreaming("startStream() can only be called on continuous queries")
if (source == "memory") {
- val queryName =
- extraOptions.getOrElse(
- "queryName", throw new AnalysisException("queryName must be specified for memory sink"))
- val checkpointLocation = getCheckpointLocation(queryName, failIfNotSet = false).getOrElse {
- Utils.createTempDir(namePrefix = "memory.stream").getCanonicalPath
- }
-
- // If offsets have already been created, we trying to resume a query.
- val checkpointPath = new Path(checkpointLocation, "offsets")
- val fs = checkpointPath.getFileSystem(df.sparkSession.sessionState.newHadoopConf())
- if (fs.exists(checkpointPath)) {
- throw new AnalysisException(
- s"Unable to resume query written to memory sink. Delete $checkpointPath to start over.")
- } else {
- checkpointPath.toUri.toString
+ if (extraOptions.get("queryName").isEmpty) {
+ throw new AnalysisException("queryName must be specified for memory sink")
}
val sink = new MemorySink(df.schema, outputMode)
val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink))
- resultDf.createOrReplaceTempView(queryName)
- val continuousQuery = df.sparkSession.sessionState.continuousQueryManager.startQuery(
- queryName,
- checkpointLocation,
+ val query = df.sparkSession.sessionState.continuousQueryManager.startQuery(
+ extraOptions.get("queryName"),
+ extraOptions.get("checkpointLocation"),
df,
sink,
outputMode,
- trigger)
- continuousQuery
+ useTempCheckpointLocation = true,
+ recoverFromCheckpointLocation = false,
+ trigger = trigger)
+ resultDf.createOrReplaceTempView(query.name)
+ query
} else {
val dataSource =
DataSource(
@@ -371,14 +360,13 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
className = source,
options = extraOptions.toMap,
partitionColumns = normalizedParCols.getOrElse(Nil))
- val queryName = extraOptions.getOrElse("queryName", StreamExecution.nextName)
df.sparkSession.sessionState.continuousQueryManager.startQuery(
- queryName,
- getCheckpointLocation(queryName, failIfNotSet = true).get,
+ extraOptions.get("queryName"),
+ extraOptions.get("checkpointLocation"),
df,
dataSource.createSink(outputMode),
outputMode,
- trigger)
+ trigger = trigger)
}
}
@@ -437,38 +425,15 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
assertStreaming(
"foreach() can only be called on streaming Datasets/DataFrames.")
- val queryName = extraOptions.getOrElse("queryName", StreamExecution.nextName)
val sink = new ForeachSink[T](ds.sparkSession.sparkContext.clean(writer))(ds.exprEnc)
df.sparkSession.sessionState.continuousQueryManager.startQuery(
- queryName,
- getCheckpointLocation(queryName, failIfNotSet = false).getOrElse {
- Utils.createTempDir(namePrefix = "foreach.stream").getCanonicalPath
- },
+ extraOptions.get("queryName"),
+ extraOptions.get("checkpointLocation"),
df,
sink,
outputMode,
- trigger)
- }
-
- /**
- * Returns the checkpointLocation for a query. If `failIfNotSet` is `true` but the checkpoint
- * location is not set, [[AnalysisException]] will be thrown. If `failIfNotSet` is `false`, `None`
- * will be returned if the checkpoint location is not set.
- */
- private def getCheckpointLocation(queryName: String, failIfNotSet: Boolean): Option[String] = {
- val checkpointLocation = extraOptions.get("checkpointLocation").map { userSpecified =>
- new Path(userSpecified).toUri.toString
- }.orElse {
- df.sparkSession.conf.get(SQLConf.CHECKPOINT_LOCATION).map { location =>
- new Path(location, queryName).toUri.toString
- }
- }
- if (failIfNotSet && checkpointLocation.isEmpty) {
- throw new AnalysisException("checkpointLocation must be specified either " +
- """through option("checkpointLocation", ...) or """ +
- s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", ...)""")
- }
- checkpointLocation
+ useTempCheckpointLocation = true,
+ trigger = trigger)
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 954fc33ecc..5095fe7fca 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.streaming
import java.util.concurrent.{CountDownLatch, TimeUnit}
-import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.locks.ReentrantLock
import scala.collection.mutable.ArrayBuffer
@@ -44,6 +44,7 @@ import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
*/
class StreamExecution(
override val sparkSession: SparkSession,
+ override val id: Long,
override val name: String,
checkpointRoot: String,
private[sql] val logicalPlan: LogicalPlan,
@@ -492,6 +493,7 @@ class StreamExecution(
private def toInfo: ContinuousQueryInfo = {
new ContinuousQueryInfo(
this.name,
+ this.id,
this.sourceStatuses,
this.sinkStatus)
}
@@ -503,7 +505,7 @@ class StreamExecution(
}
private[sql] object StreamExecution {
- private val nextId = new AtomicInteger()
+ private val _nextId = new AtomicLong(0)
- def nextName: String = s"query-${nextId.getAndIncrement}"
+ def nextId: Long = _nextId.getAndIncrement()
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala
index 3bbb0b8a88..1e0a47deca 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala
@@ -30,12 +30,21 @@ import org.apache.spark.sql.SparkSession
trait ContinuousQuery {
/**
- * Returns the name of the query.
+ * Returns the name of the query. This name is unique across all active queries. This can be
+ * set in the[[org.apache.spark.sql.DataFrameWriter DataFrameWriter]] as
+ * `dataframe.write().queryName("query").startStream()`.
* @since 2.0.0
*/
def name: String
/**
+ * Returns the unique id of this query. This id is automatically generated and is unique across
+ * all queries that have been started in the current process.
+ * @since 2.0.0
+ */
+ def id: Long
+
+ /**
* Returns the [[SparkSession]] associated with `this`.
* @since 2.0.0
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala
index 57b718b08f..19f22704ba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala
@@ -23,12 +23,15 @@ import org.apache.spark.annotation.Experimental
* :: Experimental ::
* A class used to report information about the progress of a [[ContinuousQuery]].
*
- * @param name The [[ContinuousQuery]] name.
+ * @param name The [[ContinuousQuery]] name. This name is unique across all active queries.
+ * @param id The [[ContinuousQuery]] id. This id is unique across
+ * all queries that have been started in the current process.
* @param sourceStatuses The current statuses of the [[ContinuousQuery]]'s sources.
* @param sinkStatus The current status of the [[ContinuousQuery]]'s sink.
*/
@Experimental
class ContinuousQueryInfo private[sql](
val name: String,
+ val id: Long,
val sourceStatuses: Seq[SourceStatus],
val sinkStatus: SinkStatus)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala
index 1bfdd2da4e..0f4a9c9975 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala
@@ -19,13 +19,15 @@ package org.apache.spark.sql.streaming
import scala.collection.mutable
+import org.apache.hadoop.fs.Path
+
import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.util.{Clock, SystemClock}
+import org.apache.spark.util.{Clock, SystemClock, Utils}
/**
* :: Experimental ::
@@ -39,7 +41,7 @@ class ContinuousQueryManager private[sql] (sparkSession: SparkSession) {
private[sql] val stateStoreCoordinator =
StateStoreCoordinatorRef.forDriver(sparkSession.sparkContext.env)
private val listenerBus = new ContinuousQueryListenerBus(sparkSession.sparkContext.listenerBus)
- private val activeQueries = new mutable.HashMap[String, ContinuousQuery]
+ private val activeQueries = new mutable.HashMap[Long, ContinuousQuery]
private val activeQueriesLock = new Object
private val awaitTerminationLock = new Object
@@ -55,13 +57,12 @@ class ContinuousQueryManager private[sql] (sparkSession: SparkSession) {
}
/**
- * Returns an active query from this SQLContext or throws exception if bad name
+ * Returns the query if there is an active query with the given id, or null.
*
* @since 2.0.0
*/
- def get(name: String): ContinuousQuery = activeQueriesLock.synchronized {
- activeQueries.getOrElse(name,
- throw new IllegalArgumentException(s"There is no active query with name $name"))
+ def get(id: Long): ContinuousQuery = activeQueriesLock.synchronized {
+ activeQueries.get(id).orNull
}
/**
@@ -168,20 +169,66 @@ class ContinuousQueryManager private[sql] (sparkSession: SparkSession) {
listenerBus.post(event)
}
- /** Start a query */
+ /**
+ * Start a [[ContinuousQuery]].
+ * @param userSpecifiedName Query name optionally specified by the user.
+ * @param userSpecifiedCheckpointLocation Checkpoint location optionally specified by the user.
+ * @param df Streaming DataFrame.
+ * @param sink Sink to write the streaming outputs.
+ * @param outputMode Output mode for the sink.
+ * @param useTempCheckpointLocation Whether to use a temporary checkpoint location when the user
+ * has not specified one. If false, then error will be thrown.
+ * @param recoverFromCheckpointLocation Whether to recover query from the checkpoint location.
+ * If false and the checkpoint location exists, then error
+ * will be thrown.
+ * @param trigger [[Trigger]] for the query.
+ * @param triggerClock [[Clock]] to use for the triggering.
+ */
private[sql] def startQuery(
- name: String,
- checkpointLocation: String,
+ userSpecifiedName: Option[String],
+ userSpecifiedCheckpointLocation: Option[String],
df: DataFrame,
sink: Sink,
outputMode: OutputMode,
+ useTempCheckpointLocation: Boolean = false,
+ recoverFromCheckpointLocation: Boolean = true,
trigger: Trigger = ProcessingTime(0),
triggerClock: Clock = new SystemClock()): ContinuousQuery = {
activeQueriesLock.synchronized {
- if (activeQueries.contains(name)) {
+ val id = StreamExecution.nextId
+ val name = userSpecifiedName.getOrElse(s"query-$id")
+ if (activeQueries.values.exists(_.name == name)) {
throw new IllegalArgumentException(
s"Cannot start query with name $name as a query with that name is already active")
}
+ val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified =>
+ new Path(userSpecified).toUri.toString
+ }.orElse {
+ df.sparkSession.conf.get(SQLConf.CHECKPOINT_LOCATION).map { location =>
+ new Path(location, name).toUri.toString
+ }
+ }.getOrElse {
+ if (useTempCheckpointLocation) {
+ Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
+ } else {
+ throw new AnalysisException(
+ "checkpointLocation must be specified either " +
+ """through option("checkpointLocation", ...) or """ +
+ s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", ...)""")
+ }
+ }
+
+ // If offsets have already been created, we trying to resume a query.
+ if (!recoverFromCheckpointLocation) {
+ val checkpointPath = new Path(checkpointLocation, "offsets")
+ val fs = checkpointPath.getFileSystem(df.sparkSession.sessionState.newHadoopConf())
+ if (fs.exists(checkpointPath)) {
+ throw new AnalysisException(
+ s"This query does not support recovering from checkpoint location. " +
+ s"Delete $checkpointPath to start over.")
+ }
+ }
+
val analyzedPlan = df.queryExecution.analyzed
df.queryExecution.assertAnalyzed()
@@ -203,6 +250,7 @@ class ContinuousQueryManager private[sql] (sparkSession: SparkSession) {
}
val query = new StreamExecution(
sparkSession,
+ id,
name,
checkpointLocation,
logicalPlan,
@@ -211,7 +259,7 @@ class ContinuousQueryManager private[sql] (sparkSession: SparkSession) {
triggerClock,
outputMode)
query.start()
- activeQueries.put(name, query)
+ activeQueries.put(id, query)
query
}
}
@@ -219,7 +267,7 @@ class ContinuousQueryManager private[sql] (sparkSession: SparkSession) {
/** Notify (by the ContinuousQuery) that the query has been terminated */
private[sql] def notifyQueryTermination(terminatedQuery: ContinuousQuery): Unit = {
activeQueriesLock.synchronized {
- activeQueries -= terminatedQuery.name
+ activeQueries -= terminatedQuery.id
}
awaitTerminationLock.synchronized {
if (lastTerminatedQuery == null || terminatedQuery.exception.nonEmpty) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala
index 9b59ab60a6..8e1de09a96 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala
@@ -50,9 +50,11 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter {
withListenerAdded(listener) {
testStream(input.toDS)(
StartStream(),
- Assert("Incorrect query status in onQueryStarted") {
+ AssertOnQuery("Incorrect query status in onQueryStarted") { query =>
val status = listener.startStatus
assert(status != null)
+ assert(status.name === query.name)
+ assert(status.id === query.id)
assert(status.sourceStatuses.size === 1)
assert(status.sourceStatuses(0).description.contains("Memory"))
@@ -67,13 +69,15 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter {
},
AddDataMemory(input, Seq(1, 2, 3)),
CheckAnswer(1, 2, 3),
- Assert("Incorrect query status in onQueryProgress") {
+ AssertOnQuery("Incorrect query status in onQueryProgress") { query =>
eventually(Timeout(streamingTimeout)) {
// There should be only on progress event as batch has been processed
assert(listener.progressStatuses.size === 1)
val status = listener.progressStatuses.peek()
assert(status != null)
+ assert(status.name === query.name)
+ assert(status.id === query.id)
assert(status.sourceStatuses(0).offsetDesc === Some(LongOffset(0).toString))
assert(status.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(0)).toString)
@@ -82,12 +86,16 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter {
}
},
StopStream,
- Assert("Incorrect query status in onQueryTerminated") {
+ AssertOnQuery("Incorrect query status in onQueryTerminated") { query =>
eventually(Timeout(streamingTimeout)) {
val status = listener.terminationStatus
assert(status != null)
+ assert(status.name === query.name)
+ assert(status.id === query.id)
assert(status.sourceStatuses(0).offsetDesc === Some(LongOffset(0).toString))
assert(status.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(0)).toString)
+ assert(listener.terminationStackTrace.isEmpty)
+ assert(listener.terminationException === None)
}
listener.checkAsyncErrors()
}
@@ -161,6 +169,7 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter {
test("QueryStarted serialization") {
val queryStartedInfo = new ContinuousQueryInfo(
"name",
+ 1,
Seq(new SourceStatus("source1", None), new SourceStatus("source2", None)),
new SinkStatus("sink", CompositeOffset(None :: None :: Nil).toString))
val queryStarted = new ContinuousQueryListener.QueryStarted(queryStartedInfo)
@@ -173,6 +182,7 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter {
test("QueryProgress serialization") {
val queryProcessInfo = new ContinuousQueryInfo(
"name",
+ 1,
Seq(
new SourceStatus("source1", Some(LongOffset(0).toString)),
new SourceStatus("source2", Some(LongOffset(1).toString))),
@@ -187,6 +197,7 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter {
test("QueryTerminated serialization") {
val queryTerminatedInfo = new ContinuousQueryInfo(
"name",
+ 1,
Seq(
new SourceStatus("source1", Some(LongOffset(0).toString)),
new SourceStatus("source2", Some(LongOffset(1).toString))),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
index c1e4970b3a..f81608bdb8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
@@ -59,23 +59,15 @@ class ContinuousQueryManagerSuite extends StreamTest with BeforeAndAfter {
assert(spark.streams.active.toSet === queries.toSet)
val (q1, q2, q3) = (queries(0), queries(1), queries(2))
- assert(spark.streams.get(q1.name).eq(q1))
- assert(spark.streams.get(q2.name).eq(q2))
- assert(spark.streams.get(q3.name).eq(q3))
- intercept[IllegalArgumentException] {
- spark.streams.get("non-existent-name")
- }
-
+ assert(spark.streams.get(q1.id).eq(q1))
+ assert(spark.streams.get(q2.id).eq(q2))
+ assert(spark.streams.get(q3.id).eq(q3))
+ assert(spark.streams.get(-1) === null) // non-existent id
q1.stop()
assert(spark.streams.active.toSet === Set(q2, q3))
- val ex1 = withClue("no error while getting non-active query") {
- intercept[IllegalArgumentException] {
- spark.streams.get(q1.name)
- }
- }
- assert(ex1.getMessage.contains(q1.name), "error does not contain name of query to be fetched")
- assert(spark.streams.get(q2.name).eq(q2))
+ assert(spark.streams.get(q1.id) === null)
+ assert(spark.streams.get(q2.id).eq(q2))
m2.addData(0) // q2 should terminate with error
@@ -83,12 +75,7 @@ class ContinuousQueryManagerSuite extends StreamTest with BeforeAndAfter {
require(!q2.isActive)
require(q2.exception.isDefined)
}
- withClue("no error while getting non-active query") {
- intercept[IllegalArgumentException] {
- spark.streams.get(q2.name).eq(q2)
- }
- }
-
+ assert(spark.streams.get(q2.id) === null)
assert(spark.streams.active.toSet === Set(q3))
}
}
@@ -227,7 +214,7 @@ class ContinuousQueryManagerSuite extends StreamTest with BeforeAndAfter {
}
- /** Run a body of code by defining a query each on multiple datasets */
+ /** Run a body of code by defining a query on each dataset */
private def withQueriesOn(datasets: Dataset[_]*)(body: Seq[ContinuousQuery] => Unit): Unit = {
failAfter(streamingTimeout) {
val queries = withClue("Error starting queries") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
index 55424058f5..43a88576cf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
@@ -17,15 +17,56 @@
package org.apache.spark.sql.streaming
+import org.scalatest.BeforeAndAfter
+
import org.apache.spark.SparkException
import org.apache.spark.sql.execution.streaming.{CompositeOffset, LongOffset, MemoryStream, StreamExecution}
+import org.apache.spark.util.Utils
-class ContinuousQuerySuite extends StreamTest {
+class ContinuousQuerySuite extends StreamTest with BeforeAndAfter {
import AwaitTerminationTester._
import testImplicits._
+ after {
+ sqlContext.streams.active.foreach(_.stop())
+ }
+
+ test("names unique across active queries, ids unique across all started queries") {
+ val inputData = MemoryStream[Int]
+ val mapped = inputData.toDS().map { 6 / _}
+
+ def startQuery(queryName: String): ContinuousQuery = {
+ val metadataRoot = Utils.createTempDir(namePrefix = "streaming.checkpoint").getCanonicalPath
+ val writer = mapped.write
+ writer
+ .queryName(queryName)
+ .format("memory")
+ .option("checkpointLocation", metadataRoot)
+ .startStream()
+ }
+
+ val q1 = startQuery("q1")
+ assert(q1.name === "q1")
+
+ // Verify that another query with same name cannot be started
+ val e1 = intercept[IllegalArgumentException] {
+ startQuery("q1")
+ }
+ Seq("q1", "already active").foreach { s => assert(e1.getMessage.contains(s)) }
+
+ // Verify q1 was unaffected by the above exception and stop it
+ assert(q1.isActive)
+ q1.stop()
+
+ // Verify another query can be started with name q1, but will have different id
+ val q2 = startQuery("q1")
+ assert(q2.name === "q1")
+ assert(q2.id !== q1.id)
+ q2.stop()
+ }
+
testQuietly("lifecycle states and awaitTermination") {
val inputData = MemoryStream[Int]
val mapped = inputData.toDS().map { 6 / _}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 7f1e5fe613..cbfa6ff07d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -188,8 +188,8 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
new AssertOnQuery(condition, message)
}
- def apply(message: String)(condition: StreamExecution => Boolean): AssertOnQuery = {
- new AssertOnQuery(condition, message)
+ def apply(message: String)(condition: StreamExecution => Unit): AssertOnQuery = {
+ new AssertOnQuery(s => { condition; true }, message)
}
}
@@ -305,13 +305,13 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
spark
.streams
.startQuery(
- StreamExecution.nextName,
- metadataRoot,
+ None,
+ Some(metadataRoot),
stream,
sink,
outputMode,
- trigger,
- triggerClock)
+ trigger = trigger,
+ triggerClock = triggerClock)
.asInstanceOf[StreamExecution]
currentStream.microBatchThread.setUncaughtExceptionHandler(
new UncaughtExceptionHandler {