aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test
diff options
context:
space:
mode:
authorBurak Yavuz <brkyvz@gmail.com>2016-12-09 22:49:51 -0800
committerShixiong Zhu <shixiong@databricks.com>2016-12-09 22:49:51 -0800
commit63c9159870ee274c68e24360594ca01d476b9ace (patch)
tree9c7a726e27027701219e5c69f8bc63e3b182f811 /sql/core/src/test
parent3e11d5bfef2f05bd6d42c4d6188eae6d63c963ef (diff)
downloadspark-63c9159870ee274c68e24360594ca01d476b9ace.tar.gz
spark-63c9159870ee274c68e24360594ca01d476b9ace.tar.bz2
spark-63c9159870ee274c68e24360594ca01d476b9ace.zip
[SPARK-18811] StreamSource resolution should happen in stream execution thread
## What changes were proposed in this pull request? When you start a stream, if we are trying to resolve the source of the stream, for example if we need to resolve partition columns, this could take a long time. This long execution time should not block the main thread where `query.start()` was called on. It should happen in the stream execution thread possibly before starting any triggers. ## How was this patch tested? Unit test added. Made sure test fails with no code changes. Author: Burak Yavuz <brkyvz@gmail.com> Closes #16238 from brkyvz/SPARK-18811.
Diffstat (limited to 'sql/core/src/test')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala28
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala66
2 files changed, 94 insertions, 0 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
index 268b8ff7b4..d188319fe3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.streaming
+import java.util.concurrent.CountDownLatch
+
import scala.concurrent.Future
import scala.util.Random
import scala.util.control.NonFatal
@@ -213,6 +215,28 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
}
}
+ test("SPARK-18811: Source resolution should not block main thread") {
+ failAfter(streamingTimeout) {
+ StreamingQueryManagerSuite.latch = new CountDownLatch(1)
+ withTempDir { tempDir =>
+ // if source resolution was happening on the main thread, it would block the start call,
+ // now it should only be blocking the stream execution thread
+ val sq = spark.readStream
+ .format("org.apache.spark.sql.streaming.util.BlockingSource")
+ .load()
+ .writeStream
+ .format("org.apache.spark.sql.streaming.util.BlockingSource")
+ .option("checkpointLocation", tempDir.toString)
+ .start()
+ eventually(Timeout(streamingTimeout)) {
+ assert(sq.status.message.contains("Initializing sources"))
+ }
+ StreamingQueryManagerSuite.latch.countDown()
+ sq.stop()
+ }
+ }
+ }
+
/** Run a body of code by defining a query on each dataset */
private def withQueriesOn(datasets: Dataset[_]*)(body: Seq[StreamingQuery] => Unit): Unit = {
@@ -297,3 +321,7 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
(inputData, mapped)
}
}
+
+object StreamingQueryManagerSuite {
+ var latch: CountDownLatch = null
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala
new file mode 100644
index 0000000000..b0adf76814
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.util
+
+import org.apache.spark.sql.{SQLContext, _}
+import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Sink, Source}
+import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider}
+import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryManagerSuite}
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
+
+/** Dummy provider: returns a SourceProvider with a blocking `createSource` call. */
+class BlockingSource extends StreamSourceProvider with StreamSinkProvider {
+
+ private val fakeSchema = StructType(StructField("a", IntegerType) :: Nil)
+
+ override def sourceSchema(
+ spark: SQLContext,
+ schema: Option[StructType],
+ providerName: String,
+ parameters: Map[String, String]): (String, StructType) = {
+ ("dummySource", fakeSchema)
+ }
+
+ override def createSource(
+ spark: SQLContext,
+ metadataPath: String,
+ schema: Option[StructType],
+ providerName: String,
+ parameters: Map[String, String]): Source = {
+ StreamingQueryManagerSuite.latch.await()
+ new Source {
+ override def schema: StructType = fakeSchema
+ override def getOffset: Option[Offset] = Some(new LongOffset(0))
+ override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+ import spark.implicits._
+ Seq[Int]().toDS().toDF()
+ }
+ override def stop() {}
+ }
+ }
+
+ override def createSink(
+ spark: SQLContext,
+ parameters: Map[String, String],
+ partitionColumns: Seq[String],
+ outputMode: OutputMode): Sink = {
+ new Sink {
+ override def addBatch(batchId: Long, data: DataFrame): Unit = {}
+ }
+ }
+}