From 63c9159870ee274c68e24360594ca01d476b9ace Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Fri, 9 Dec 2016 22:49:51 -0800 Subject: [SPARK-18811] StreamSource resolution should happen in stream execution thread ## What changes were proposed in this pull request? When you start a stream, if we are trying to resolve the source of the stream, for example if we need to resolve partition columns, this could take a long time. This long execution time should not block the main thread where `query.start()` was called on. It should happen in the stream execution thread possibly before starting any triggers. ## How was this patch tested? Unit test added. Made sure test fails with no code changes. Author: Burak Yavuz Closes #16238 from brkyvz/SPARK-18811. --- .../sql/streaming/StreamingQueryManagerSuite.scala | 28 +++++++++ .../spark/sql/streaming/util/DefaultSource.scala | 66 ++++++++++++++++++++++ 2 files changed, 94 insertions(+) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala (limited to 'sql/core/src/test') diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala index 268b8ff7b4..d188319fe3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.streaming +import java.util.concurrent.CountDownLatch + import scala.concurrent.Future import scala.util.Random import scala.util.control.NonFatal @@ -213,6 +215,28 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter { } } + test("SPARK-18811: Source resolution should not block main thread") { + failAfter(streamingTimeout) { + StreamingQueryManagerSuite.latch = new CountDownLatch(1) + withTempDir { tempDir => + // if source resolution was happening on the main thread, it would block the start call, + // now it should only be blocking the stream execution thread + val sq = spark.readStream + .format("org.apache.spark.sql.streaming.util.BlockingSource") + .load() + .writeStream + .format("org.apache.spark.sql.streaming.util.BlockingSource") + .option("checkpointLocation", tempDir.toString) + .start() + eventually(Timeout(streamingTimeout)) { + assert(sq.status.message.contains("Initializing sources")) + } + StreamingQueryManagerSuite.latch.countDown() + sq.stop() + } + } + } + /** Run a body of code by defining a query on each dataset */ private def withQueriesOn(datasets: Dataset[_]*)(body: Seq[StreamingQuery] => Unit): Unit = { @@ -297,3 +321,7 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter { (inputData, mapped) } } + +object StreamingQueryManagerSuite { + var latch: CountDownLatch = null +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala new file mode 100644 index 0000000000..b0adf76814 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming.util + +import org.apache.spark.sql.{SQLContext, _} +import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Sink, Source} +import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider} +import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryManagerSuite} +import org.apache.spark.sql.types.{IntegerType, StructField, StructType} + +/** Dummy provider: returns a SourceProvider with a blocking `createSource` call. */ +class BlockingSource extends StreamSourceProvider with StreamSinkProvider { + + private val fakeSchema = StructType(StructField("a", IntegerType) :: Nil) + + override def sourceSchema( + spark: SQLContext, + schema: Option[StructType], + providerName: String, + parameters: Map[String, String]): (String, StructType) = { + ("dummySource", fakeSchema) + } + + override def createSource( + spark: SQLContext, + metadataPath: String, + schema: Option[StructType], + providerName: String, + parameters: Map[String, String]): Source = { + StreamingQueryManagerSuite.latch.await() + new Source { + override def schema: StructType = fakeSchema + override def getOffset: Option[Offset] = Some(new LongOffset(0)) + override def getBatch(start: Option[Offset], end: Offset): DataFrame = { + import spark.implicits._ + Seq[Int]().toDS().toDF() + } + override def stop() {} + } + } + + override def createSink( + spark: SQLContext, + parameters: Map[String, String], + partitionColumns: Seq[String], + outputMode: OutputMode): Sink = { + new Sink { + override def addBatch(batchId: Long, data: DataFrame): Unit = {} + } + } +} -- cgit v1.2.3