aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-12-14 13:36:41 -0800
committerShixiong Zhu <shixiong@databricks.com>2016-12-14 13:36:41 -0800
commit1ac6567bdb03d7cc5c5f3473827a102280cb1030 (patch)
treebe8018d4e321c590298727fdc73369c3dfcbbd71 /sql/core/src/test
parent5d799473696a15fddd54ec71a93b6f8cb169810c (diff)
downloadspark-1ac6567bdb03d7cc5c5f3473827a102280cb1030.tar.gz
spark-1ac6567bdb03d7cc5c5f3473827a102280cb1030.tar.bz2
spark-1ac6567bdb03d7cc5c5f3473827a102280cb1030.zip
[SPARK-18852][SS] StreamingQuery.lastProgress should be null when recentProgress is empty
## What changes were proposed in this pull request? Right now `StreamingQuery.lastProgress` throws NoSuchElementException and it's hard to be used in Python since Python user will just see Py4jError. This PR just makes it return null instead. ## How was this patch tested? `test("lastProgress should be null when recentProgress is empty")` Author: Shixiong Zhu <shixiong@databricks.com> Closes #16273 from zsxwing/SPARK-18852.
Diffstat (limited to 'sql/core/src/test')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala9
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala21
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala (renamed from sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala)10
3 files changed, 31 insertions, 9 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
index d188319fe3..1742a5474c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
@@ -32,6 +32,7 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark.SparkException
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.streaming.util.BlockingSource
import org.apache.spark.util.Utils
class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
@@ -217,7 +218,7 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
test("SPARK-18811: Source resolution should not block main thread") {
failAfter(streamingTimeout) {
- StreamingQueryManagerSuite.latch = new CountDownLatch(1)
+ BlockingSource.latch = new CountDownLatch(1)
withTempDir { tempDir =>
// if source resolution was happening on the main thread, it would block the start call,
// now it should only be blocking the stream execution thread
@@ -231,7 +232,7 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
eventually(Timeout(streamingTimeout)) {
assert(sq.status.message.contains("Initializing sources"))
}
- StreamingQueryManagerSuite.latch.countDown()
+ BlockingSource.latch.countDown()
sq.stop()
}
}
@@ -321,7 +322,3 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
(inputData, mapped)
}
}
-
-object StreamingQueryManagerSuite {
- var latch: CountDownLatch = null
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index afd788ce3d..b052bd9e6a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.streaming
-import scala.collection.JavaConverters._
+import java.util.concurrent.CountDownLatch
import org.apache.commons.lang3.RandomStringUtils
import org.scalactic.TolerantNumerics
@@ -32,6 +32,7 @@ import org.apache.spark.SparkException
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.util.BlockingSource
import org.apache.spark.util.ManualClock
@@ -312,6 +313,24 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
)
}
+ test("lastProgress should be null when recentProgress is empty") {
+ BlockingSource.latch = new CountDownLatch(1)
+ withTempDir { tempDir =>
+ val sq = spark.readStream
+ .format("org.apache.spark.sql.streaming.util.BlockingSource")
+ .load()
+ .writeStream
+ .format("org.apache.spark.sql.streaming.util.BlockingSource")
+ .option("checkpointLocation", tempDir.toString)
+ .start()
+ // Creating source is blocked so recentProgress is empty and lastProgress should be null
+ assert(sq.lastProgress === null)
+ // Release the latch and stop the query
+ BlockingSource.latch.countDown()
+ sq.stop()
+ }
+ }
+
test("codahale metrics") {
val inputData = MemoryStream[Int]
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala
index b0adf76814..19ab2ff13e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala
@@ -17,10 +17,12 @@
package org.apache.spark.sql.streaming.util
+import java.util.concurrent.CountDownLatch
+
import org.apache.spark.sql.{SQLContext, _}
import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Sink, Source}
import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider}
-import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryManagerSuite}
+import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
/** Dummy provider: returns a SourceProvider with a blocking `createSource` call. */
@@ -42,7 +44,7 @@ class BlockingSource extends StreamSourceProvider with StreamSinkProvider {
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): Source = {
- StreamingQueryManagerSuite.latch.await()
+ BlockingSource.latch.await()
new Source {
override def schema: StructType = fakeSchema
override def getOffset: Option[Offset] = Some(new LongOffset(0))
@@ -64,3 +66,7 @@ class BlockingSource extends StreamSourceProvider with StreamSinkProvider {
}
}
}
+
+object BlockingSource {
+ var latch: CountDownLatch = null
+}