diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-12-14 13:36:41 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-12-14 13:36:41 -0800 |
commit | 1ac6567bdb03d7cc5c5f3473827a102280cb1030 (patch) | |
tree | be8018d4e321c590298727fdc73369c3dfcbbd71 /sql/core/src/test | |
parent | 5d799473696a15fddd54ec71a93b6f8cb169810c (diff) | |
download | spark-1ac6567bdb03d7cc5c5f3473827a102280cb1030.tar.gz spark-1ac6567bdb03d7cc5c5f3473827a102280cb1030.tar.bz2 spark-1ac6567bdb03d7cc5c5f3473827a102280cb1030.zip |
[SPARK-18852][SS] StreamingQuery.lastProgress should be null when recentProgress is empty
## What changes were proposed in this pull request?
Right now `StreamingQuery.lastProgress` throws NoSuchElementException and it's hard to be used in Python since Python user will just see Py4jError.
This PR just makes it return null instead.
## How was this patch tested?
`test("lastProgress should be null when recentProgress is empty")`
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #16273 from zsxwing/SPARK-18852.
Diffstat (limited to 'sql/core/src/test')
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala | 9 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala | 21 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala (renamed from sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala) | 10 |
3 files changed, 31 insertions, 9 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala index d188319fe3..1742a5474c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala @@ -32,6 +32,7 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkException import org.apache.spark.sql.Dataset import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.streaming.util.BlockingSource import org.apache.spark.util.Utils class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter { @@ -217,7 +218,7 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter { test("SPARK-18811: Source resolution should not block main thread") { failAfter(streamingTimeout) { - StreamingQueryManagerSuite.latch = new CountDownLatch(1) + BlockingSource.latch = new CountDownLatch(1) withTempDir { tempDir => // if source resolution was happening on the main thread, it would block the start call, // now it should only be blocking the stream execution thread @@ -231,7 +232,7 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter { eventually(Timeout(streamingTimeout)) { assert(sq.status.message.contains("Initializing sources")) } - StreamingQueryManagerSuite.latch.countDown() + BlockingSource.latch.countDown() sq.stop() } } @@ -321,7 +322,3 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter { (inputData, mapped) } } - -object StreamingQueryManagerSuite { - var latch: CountDownLatch = null -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index afd788ce3d..b052bd9e6a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.streaming -import scala.collection.JavaConverters._ +import java.util.concurrent.CountDownLatch import org.apache.commons.lang3.RandomStringUtils import org.scalactic.TolerantNumerics @@ -32,6 +32,7 @@ import org.apache.spark.SparkException import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.util.BlockingSource import org.apache.spark.util.ManualClock @@ -312,6 +313,24 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { ) } + test("lastProgress should be null when recentProgress is empty") { + BlockingSource.latch = new CountDownLatch(1) + withTempDir { tempDir => + val sq = spark.readStream + .format("org.apache.spark.sql.streaming.util.BlockingSource") + .load() + .writeStream + .format("org.apache.spark.sql.streaming.util.BlockingSource") + .option("checkpointLocation", tempDir.toString) + .start() + // Creating source is blocked so recentProgress is empty and lastProgress should be null + assert(sq.lastProgress === null) + // Release the latch and stop the query + BlockingSource.latch.countDown() + sq.stop() + } + } + test("codahale metrics") { val inputData = MemoryStream[Int] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala index b0adf76814..19ab2ff13e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala @@ -17,10 +17,12 @@ package org.apache.spark.sql.streaming.util +import java.util.concurrent.CountDownLatch + import org.apache.spark.sql.{SQLContext, _} import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Sink, Source} import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider} -import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryManagerSuite} +import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.{IntegerType, StructField, StructType} /** Dummy provider: returns a SourceProvider with a blocking `createSource` call. */ @@ -42,7 +44,7 @@ class BlockingSource extends StreamSourceProvider with StreamSinkProvider { schema: Option[StructType], providerName: String, parameters: Map[String, String]): Source = { - StreamingQueryManagerSuite.latch.await() + BlockingSource.latch.await() new Source { override def schema: StructType = fakeSchema override def getOffset: Option[Offset] = Some(new LongOffset(0)) @@ -64,3 +66,7 @@ class BlockingSource extends StreamSourceProvider with StreamSinkProvider { } } } + +object BlockingSource { + var latch: CountDownLatch = null +} |