diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-12-12 22:31:22 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2016-12-12 22:31:22 -0800 |
commit | 417e45c58484a6b984ad2ce9ba8f47aa0a9983fd (patch) | |
tree | b28f5af089b06dcaabec8ab32444172fc5860686 /sql/core | |
parent | bc59951babbe4d7d5265a5dbccd50ea84ad74592 (diff) | |
download | spark-417e45c58484a6b984ad2ce9ba8f47aa0a9983fd.tar.gz spark-417e45c58484a6b984ad2ce9ba8f47aa0a9983fd.tar.bz2 spark-417e45c58484a6b984ad2ce9ba8f47aa0a9983fd.zip |
[SPARK-18796][SS] StreamingQueryManager should not block when starting a query
## What changes were proposed in this pull request?
Major change in this PR:
- Add `pendingQueryNames` and `pendingQueryIds` to track that are going to start but not yet put into `activeQueries` so that we don't need to hold a lock when starting a query.
Minor changes:
- Fix a potential NPE when the user sets `checkpointLocation` using SQLConf but doesn't specify a query name.
- Add missing docs in `StreamingQueryListener`
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #16220 from zsxwing/SPARK-18796.
Diffstat (limited to 'sql/core')
4 files changed, 158 insertions, 58 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 48eee42a29..9fe6819837 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -223,7 +223,8 @@ class StreamExecution( sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics) } - postEvent(new QueryStartedEvent(id, runId, name)) // Assumption: Does not throw exception. + // `postEvent` does not throw non fatal exception. + postEvent(new QueryStartedEvent(id, runId, name)) // Unblock starting thread startLatch.countDown() @@ -286,7 +287,7 @@ class StreamExecution( e, committedOffsets.toOffsetSeq(sources, offsetSeqMetadata).toString, availableOffsets.toOffsetSeq(sources, offsetSeqMetadata).toString) - logError(s"Query $name terminated with error", e) + logError(s"Query $prettyIdString terminated with error", e) updateStatusMessage(s"Terminated with exception: ${e.getMessage}") // Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to // handle them diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala index 6fc859d88d..817733286b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala @@ -83,6 +83,9 @@ object StreamingQueryListener { /** * :: Experimental :: * Event representing the start of a query + * @param id An unique query id that persists across restarts. See `StreamingQuery.id()`. + * @param runId A query id that is unique for every start/restart. See `StreamingQuery.runId()`. + * @param name User-specified name of the query, null if not specified. * @since 2.1.0 */ @Experimental @@ -94,6 +97,7 @@ object StreamingQueryListener { /** * :: Experimental :: * Event representing any progress updates in a query. + * @param progress The query progress updates. * @since 2.1.0 */ @Experimental @@ -103,7 +107,8 @@ object StreamingQueryListener { * :: Experimental :: * Event representing that termination of a query. * - * @param id The query id. + * @param id An unique query id that persists across restarts. See `StreamingQuery.id()`. + * @param runId A query id that is unique for every start/restart. See `StreamingQuery.runId()`. * @param exception The exception message of the query if the query was terminated * with an exception. Otherwise, it will be `None`. * @since 2.1.0 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index 52d079192d..6ebd70685e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.streaming import java.util.UUID -import java.util.concurrent.atomic.AtomicLong +import javax.annotation.concurrent.GuardedBy import scala.collection.mutable @@ -44,10 +44,13 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) { private[sql] val stateStoreCoordinator = StateStoreCoordinatorRef.forDriver(sparkSession.sparkContext.env) private val listenerBus = new StreamingQueryListenerBus(sparkSession.sparkContext.listenerBus) + + @GuardedBy("activeQueriesLock") private val activeQueries = new mutable.HashMap[UUID, StreamingQuery] private val activeQueriesLock = new Object private val awaitTerminationLock = new Object + @GuardedBy("awaitTerminationLock") private var lastTerminatedQuery: StreamingQuery = null /** @@ -181,8 +184,65 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) { listenerBus.post(event) } + private def createQuery( + userSpecifiedName: Option[String], + userSpecifiedCheckpointLocation: Option[String], + df: DataFrame, + sink: Sink, + outputMode: OutputMode, + useTempCheckpointLocation: Boolean, + recoverFromCheckpointLocation: Boolean, + trigger: Trigger, + triggerClock: Clock): StreamExecution = { + val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified => + new Path(userSpecified).toUri.toString + }.orElse { + df.sparkSession.sessionState.conf.checkpointLocation.map { location => + new Path(location, userSpecifiedName.getOrElse(UUID.randomUUID().toString)).toUri.toString + } + }.getOrElse { + if (useTempCheckpointLocation) { + Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath + } else { + throw new AnalysisException( + "checkpointLocation must be specified either " + + """through option("checkpointLocation", ...) or """ + + s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", ...)""") + } + } + + // If offsets have already been created, we trying to resume a query. + if (!recoverFromCheckpointLocation) { + val checkpointPath = new Path(checkpointLocation, "offsets") + val fs = checkpointPath.getFileSystem(df.sparkSession.sessionState.newHadoopConf()) + if (fs.exists(checkpointPath)) { + throw new AnalysisException( + s"This query does not support recovering from checkpoint location. " + + s"Delete $checkpointPath to start over.") + } + } + + val analyzedPlan = df.queryExecution.analyzed + df.queryExecution.assertAnalyzed() + + if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) { + UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode) + } + + new StreamExecution( + sparkSession, + userSpecifiedName.orNull, + checkpointLocation, + analyzedPlan, + sink, + trigger, + triggerClock, + outputMode) + } + /** * Start a [[StreamingQuery]]. + * * @param userSpecifiedName Query name optionally specified by the user. * @param userSpecifiedCheckpointLocation Checkpoint location optionally specified by the user. * @param df Streaming DataFrame. @@ -206,72 +266,50 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) { recoverFromCheckpointLocation: Boolean = true, trigger: Trigger = ProcessingTime(0), triggerClock: Clock = new SystemClock()): StreamingQuery = { - activeQueriesLock.synchronized { - val name = userSpecifiedName match { - case Some(n) => - if (activeQueries.values.exists(_.name == userSpecifiedName.get)) { - throw new IllegalArgumentException( - s"Cannot start query with name $n as a query with that name is already active") - } - n - case None => null - } - val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified => - new Path(userSpecified).toUri.toString - }.orElse { - df.sparkSession.sessionState.conf.checkpointLocation.map { location => - new Path(location, name).toUri.toString - } - }.getOrElse { - if (useTempCheckpointLocation) { - Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath - } else { - throw new AnalysisException( - "checkpointLocation must be specified either " + - """through option("checkpointLocation", ...) or """ + - s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", ...)""") - } - } + val query = createQuery( + userSpecifiedName, + userSpecifiedCheckpointLocation, + df, + sink, + outputMode, + useTempCheckpointLocation, + recoverFromCheckpointLocation, + trigger, + triggerClock) - // If offsets have already been created, we trying to resume a query. - if (!recoverFromCheckpointLocation) { - val checkpointPath = new Path(checkpointLocation, "offsets") - val fs = checkpointPath.getFileSystem(df.sparkSession.sessionState.newHadoopConf()) - if (fs.exists(checkpointPath)) { - throw new AnalysisException( - s"This query does not support recovering from checkpoint location. " + - s"Delete $checkpointPath to start over.") + activeQueriesLock.synchronized { + // Make sure no other query with same name is active + userSpecifiedName.foreach { name => + if (activeQueries.values.exists(_.name == name)) { + throw new IllegalArgumentException( + s"Cannot start query with name $name as a query with that name is already active") } } - val analyzedPlan = df.queryExecution.analyzed - df.queryExecution.assertAnalyzed() - - if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) { - UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode) - } - - val query = new StreamExecution( - sparkSession, - name, - checkpointLocation, - analyzedPlan, - sink, - trigger, - triggerClock, - outputMode) - + // Make sure no other query with same id is active if (activeQueries.values.exists(_.id == query.id)) { throw new IllegalStateException( s"Cannot start query with id ${query.id} as another query with same id is " + - s"already active. Perhaps you are attempting to restart a query from checkpoint" + + s"already active. Perhaps you are attempting to restart a query from checkpoint " + s"that is already active.") } - query.start() activeQueries.put(query.id, query) - query } + try { + // When starting a query, it will call `StreamingQueryListener.onQueryStarted` synchronously. + // As it's provided by the user and can run arbitrary codes, we must not hold any lock here. + // Otherwise, it's easy to cause dead-lock, or block too long if the user codes take a long + // time to finish. + query.start() + } catch { + case e: Throwable => + activeQueriesLock.synchronized { + activeQueries -= query.id + } + throw e + } + query } /** Notify (by the StreamingQuery) that the query has been terminated */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index 0eb95a0243..f4a62903eb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -27,6 +27,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.sql._ import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider} import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, StreamingQuery, StreamTest} import org.apache.spark.sql.types._ @@ -575,4 +576,59 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { sq.stop() } } + + test("user specified checkpointLocation precedes SQLConf") { + import testImplicits._ + withTempDir { checkpointPath => + withTempPath { userCheckpointPath => + assert(!userCheckpointPath.exists(), s"$userCheckpointPath should not exist") + withSQLConf(SQLConf.CHECKPOINT_LOCATION.key -> checkpointPath.getAbsolutePath) { + val queryName = "test_query" + val ds = MemoryStream[Int].toDS + ds.writeStream + .format("memory") + .queryName(queryName) + .option("checkpointLocation", userCheckpointPath.getAbsolutePath) + .start() + .stop() + assert(checkpointPath.listFiles().isEmpty, + "SQLConf path is used even if user specified checkpointLoc: " + + s"${checkpointPath.listFiles()} is not empty") + assert(userCheckpointPath.exists(), + s"The user specified checkpointLoc (userCheckpointPath) is not created") + } + } + } + } + + test("use SQLConf checkpoint dir when checkpointLocation is not specified") { + import testImplicits._ + withTempDir { checkpointPath => + withSQLConf(SQLConf.CHECKPOINT_LOCATION.key -> checkpointPath.getAbsolutePath) { + val queryName = "test_query" + val ds = MemoryStream[Int].toDS + ds.writeStream.format("memory").queryName(queryName).start().stop() + // Should use query name to create a folder in `checkpointPath` + val queryCheckpointDir = new File(checkpointPath, queryName) + assert(queryCheckpointDir.exists(), s"$queryCheckpointDir doesn't exist") + assert( + checkpointPath.listFiles().size === 1, + s"${checkpointPath.listFiles().toList} has 0 or more than 1 files ") + } + } + } + + test("use SQLConf checkpoint dir when checkpointLocation is not specified without query name") { + import testImplicits._ + withTempDir { checkpointPath => + withSQLConf(SQLConf.CHECKPOINT_LOCATION.key -> checkpointPath.getAbsolutePath) { + val ds = MemoryStream[Int].toDS + ds.writeStream.format("console").start().stop() + // Should create a random folder in `checkpointPath` + assert( + checkpointPath.listFiles().size === 1, + s"${checkpointPath.listFiles().toList} has 0 or more than 1 files ") + } + } + } } |