aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-12-12 22:31:22 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2016-12-12 22:31:22 -0800
commit417e45c58484a6b984ad2ce9ba8f47aa0a9983fd (patch)
treeb28f5af089b06dcaabec8ab32444172fc5860686 /sql/core
parentbc59951babbe4d7d5265a5dbccd50ea84ad74592 (diff)
downloadspark-417e45c58484a6b984ad2ce9ba8f47aa0a9983fd.tar.gz
spark-417e45c58484a6b984ad2ce9ba8f47aa0a9983fd.tar.bz2
spark-417e45c58484a6b984ad2ce9ba8f47aa0a9983fd.zip
[SPARK-18796][SS] StreamingQueryManager should not block when starting a query
## What changes were proposed in this pull request? Major change in this PR: - Add `pendingQueryNames` and `pendingQueryIds` to track that are going to start but not yet put into `activeQueries` so that we don't need to hold a lock when starting a query. Minor changes: - Fix a potential NPE when the user sets `checkpointLocation` using SQLConf but doesn't specify a query name. - Add missing docs in `StreamingQueryListener` ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #16220 from zsxwing/SPARK-18796.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala148
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala56
4 files changed, 158 insertions, 58 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 48eee42a29..9fe6819837 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -223,7 +223,8 @@ class StreamExecution(
sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics)
}
- postEvent(new QueryStartedEvent(id, runId, name)) // Assumption: Does not throw exception.
+ // `postEvent` does not throw non fatal exception.
+ postEvent(new QueryStartedEvent(id, runId, name))
// Unblock starting thread
startLatch.countDown()
@@ -286,7 +287,7 @@ class StreamExecution(
e,
committedOffsets.toOffsetSeq(sources, offsetSeqMetadata).toString,
availableOffsets.toOffsetSeq(sources, offsetSeqMetadata).toString)
- logError(s"Query $name terminated with error", e)
+ logError(s"Query $prettyIdString terminated with error", e)
updateStatusMessage(s"Terminated with exception: ${e.getMessage}")
// Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to
// handle them
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
index 6fc859d88d..817733286b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
@@ -83,6 +83,9 @@ object StreamingQueryListener {
/**
* :: Experimental ::
* Event representing the start of a query
+ * @param id An unique query id that persists across restarts. See `StreamingQuery.id()`.
+ * @param runId A query id that is unique for every start/restart. See `StreamingQuery.runId()`.
+ * @param name User-specified name of the query, null if not specified.
* @since 2.1.0
*/
@Experimental
@@ -94,6 +97,7 @@ object StreamingQueryListener {
/**
* :: Experimental ::
* Event representing any progress updates in a query.
+ * @param progress The query progress updates.
* @since 2.1.0
*/
@Experimental
@@ -103,7 +107,8 @@ object StreamingQueryListener {
* :: Experimental ::
* Event representing that termination of a query.
*
- * @param id The query id.
+ * @param id An unique query id that persists across restarts. See `StreamingQuery.id()`.
+ * @param runId A query id that is unique for every start/restart. See `StreamingQuery.runId()`.
* @param exception The exception message of the query if the query was terminated
* with an exception. Otherwise, it will be `None`.
* @since 2.1.0
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 52d079192d..6ebd70685e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.streaming
import java.util.UUID
-import java.util.concurrent.atomic.AtomicLong
+import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable
@@ -44,10 +44,13 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
private[sql] val stateStoreCoordinator =
StateStoreCoordinatorRef.forDriver(sparkSession.sparkContext.env)
private val listenerBus = new StreamingQueryListenerBus(sparkSession.sparkContext.listenerBus)
+
+ @GuardedBy("activeQueriesLock")
private val activeQueries = new mutable.HashMap[UUID, StreamingQuery]
private val activeQueriesLock = new Object
private val awaitTerminationLock = new Object
+ @GuardedBy("awaitTerminationLock")
private var lastTerminatedQuery: StreamingQuery = null
/**
@@ -181,8 +184,65 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
listenerBus.post(event)
}
+ private def createQuery(
+ userSpecifiedName: Option[String],
+ userSpecifiedCheckpointLocation: Option[String],
+ df: DataFrame,
+ sink: Sink,
+ outputMode: OutputMode,
+ useTempCheckpointLocation: Boolean,
+ recoverFromCheckpointLocation: Boolean,
+ trigger: Trigger,
+ triggerClock: Clock): StreamExecution = {
+ val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified =>
+ new Path(userSpecified).toUri.toString
+ }.orElse {
+ df.sparkSession.sessionState.conf.checkpointLocation.map { location =>
+ new Path(location, userSpecifiedName.getOrElse(UUID.randomUUID().toString)).toUri.toString
+ }
+ }.getOrElse {
+ if (useTempCheckpointLocation) {
+ Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
+ } else {
+ throw new AnalysisException(
+ "checkpointLocation must be specified either " +
+ """through option("checkpointLocation", ...) or """ +
+ s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", ...)""")
+ }
+ }
+
+ // If offsets have already been created, we trying to resume a query.
+ if (!recoverFromCheckpointLocation) {
+ val checkpointPath = new Path(checkpointLocation, "offsets")
+ val fs = checkpointPath.getFileSystem(df.sparkSession.sessionState.newHadoopConf())
+ if (fs.exists(checkpointPath)) {
+ throw new AnalysisException(
+ s"This query does not support recovering from checkpoint location. " +
+ s"Delete $checkpointPath to start over.")
+ }
+ }
+
+ val analyzedPlan = df.queryExecution.analyzed
+ df.queryExecution.assertAnalyzed()
+
+ if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {
+ UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode)
+ }
+
+ new StreamExecution(
+ sparkSession,
+ userSpecifiedName.orNull,
+ checkpointLocation,
+ analyzedPlan,
+ sink,
+ trigger,
+ triggerClock,
+ outputMode)
+ }
+
/**
* Start a [[StreamingQuery]].
+ *
* @param userSpecifiedName Query name optionally specified by the user.
* @param userSpecifiedCheckpointLocation Checkpoint location optionally specified by the user.
* @param df Streaming DataFrame.
@@ -206,72 +266,50 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
recoverFromCheckpointLocation: Boolean = true,
trigger: Trigger = ProcessingTime(0),
triggerClock: Clock = new SystemClock()): StreamingQuery = {
- activeQueriesLock.synchronized {
- val name = userSpecifiedName match {
- case Some(n) =>
- if (activeQueries.values.exists(_.name == userSpecifiedName.get)) {
- throw new IllegalArgumentException(
- s"Cannot start query with name $n as a query with that name is already active")
- }
- n
- case None => null
- }
- val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified =>
- new Path(userSpecified).toUri.toString
- }.orElse {
- df.sparkSession.sessionState.conf.checkpointLocation.map { location =>
- new Path(location, name).toUri.toString
- }
- }.getOrElse {
- if (useTempCheckpointLocation) {
- Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
- } else {
- throw new AnalysisException(
- "checkpointLocation must be specified either " +
- """through option("checkpointLocation", ...) or """ +
- s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", ...)""")
- }
- }
+ val query = createQuery(
+ userSpecifiedName,
+ userSpecifiedCheckpointLocation,
+ df,
+ sink,
+ outputMode,
+ useTempCheckpointLocation,
+ recoverFromCheckpointLocation,
+ trigger,
+ triggerClock)
- // If offsets have already been created, we trying to resume a query.
- if (!recoverFromCheckpointLocation) {
- val checkpointPath = new Path(checkpointLocation, "offsets")
- val fs = checkpointPath.getFileSystem(df.sparkSession.sessionState.newHadoopConf())
- if (fs.exists(checkpointPath)) {
- throw new AnalysisException(
- s"This query does not support recovering from checkpoint location. " +
- s"Delete $checkpointPath to start over.")
+ activeQueriesLock.synchronized {
+ // Make sure no other query with same name is active
+ userSpecifiedName.foreach { name =>
+ if (activeQueries.values.exists(_.name == name)) {
+ throw new IllegalArgumentException(
+ s"Cannot start query with name $name as a query with that name is already active")
}
}
- val analyzedPlan = df.queryExecution.analyzed
- df.queryExecution.assertAnalyzed()
-
- if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {
- UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode)
- }
-
- val query = new StreamExecution(
- sparkSession,
- name,
- checkpointLocation,
- analyzedPlan,
- sink,
- trigger,
- triggerClock,
- outputMode)
-
+ // Make sure no other query with same id is active
if (activeQueries.values.exists(_.id == query.id)) {
throw new IllegalStateException(
s"Cannot start query with id ${query.id} as another query with same id is " +
- s"already active. Perhaps you are attempting to restart a query from checkpoint" +
+ s"already active. Perhaps you are attempting to restart a query from checkpoint " +
s"that is already active.")
}
- query.start()
activeQueries.put(query.id, query)
- query
}
+ try {
+ // When starting a query, it will call `StreamingQueryListener.onQueryStarted` synchronously.
+ // As it's provided by the user and can run arbitrary codes, we must not hold any lock here.
+ // Otherwise, it's easy to cause dead-lock, or block too long if the user codes take a long
+ // time to finish.
+ query.start()
+ } catch {
+ case e: Throwable =>
+ activeQueriesLock.synchronized {
+ activeQueries -= query.id
+ }
+ throw e
+ }
+ query
}
/** Notify (by the StreamingQuery) that the query has been terminated */
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index 0eb95a0243..f4a62903eb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -27,6 +27,7 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider}
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, StreamingQuery, StreamTest}
import org.apache.spark.sql.types._
@@ -575,4 +576,59 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
sq.stop()
}
}
+
+ test("user specified checkpointLocation precedes SQLConf") {
+ import testImplicits._
+ withTempDir { checkpointPath =>
+ withTempPath { userCheckpointPath =>
+ assert(!userCheckpointPath.exists(), s"$userCheckpointPath should not exist")
+ withSQLConf(SQLConf.CHECKPOINT_LOCATION.key -> checkpointPath.getAbsolutePath) {
+ val queryName = "test_query"
+ val ds = MemoryStream[Int].toDS
+ ds.writeStream
+ .format("memory")
+ .queryName(queryName)
+ .option("checkpointLocation", userCheckpointPath.getAbsolutePath)
+ .start()
+ .stop()
+ assert(checkpointPath.listFiles().isEmpty,
+ "SQLConf path is used even if user specified checkpointLoc: " +
+ s"${checkpointPath.listFiles()} is not empty")
+ assert(userCheckpointPath.exists(),
+ s"The user specified checkpointLoc (userCheckpointPath) is not created")
+ }
+ }
+ }
+ }
+
+ test("use SQLConf checkpoint dir when checkpointLocation is not specified") {
+ import testImplicits._
+ withTempDir { checkpointPath =>
+ withSQLConf(SQLConf.CHECKPOINT_LOCATION.key -> checkpointPath.getAbsolutePath) {
+ val queryName = "test_query"
+ val ds = MemoryStream[Int].toDS
+ ds.writeStream.format("memory").queryName(queryName).start().stop()
+ // Should use query name to create a folder in `checkpointPath`
+ val queryCheckpointDir = new File(checkpointPath, queryName)
+ assert(queryCheckpointDir.exists(), s"$queryCheckpointDir doesn't exist")
+ assert(
+ checkpointPath.listFiles().size === 1,
+ s"${checkpointPath.listFiles().toList} has 0 or more than 1 files ")
+ }
+ }
+ }
+
+ test("use SQLConf checkpoint dir when checkpointLocation is not specified without query name") {
+ import testImplicits._
+ withTempDir { checkpointPath =>
+ withSQLConf(SQLConf.CHECKPOINT_LOCATION.key -> checkpointPath.getAbsolutePath) {
+ val ds = MemoryStream[Int].toDS
+ ds.writeStream.format("console").start().stop()
+ // Should create a random folder in `checkpointPath`
+ assert(
+ checkpointPath.listFiles().size === 1,
+ s"${checkpointPath.listFiles().toList} has 0 or more than 1 files ")
+ }
+ }
+ }
}