aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/test')
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/metadata1
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/03
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/13
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/1.deltabin0 -> 46 bytes
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/2.deltabin0 -> 46 bytes
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/1.deltabin0 -> 79 bytes
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/2.deltabin0 -> 79 bytes
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/1.deltabin0 -> 79 bytes
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/2.deltabin0 -> 79 bytes
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/3/1.deltabin0 -> 73 bytes
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/3/2.deltabin0 -> 79 bytes
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/4/1.deltabin0 -> 79 bytes
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/4/2.deltabin0 -> 46 bytes
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/5/1.deltabin0 -> 46 bytes
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/5/2.deltabin0 -> 46 bytes
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/6/1.deltabin0 -> 46 bytes
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/6/2.deltabin0 -> 79 bytes
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/7/1.deltabin0 -> 46 bytes
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/7/2.deltabin0 -> 79 bytes
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/1.deltabin0 -> 46 bytes
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/2.deltabin0 -> 46 bytes
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/1.deltabin0 -> 46 bytes
-rw-r--r--sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/2.deltabin0 -> 79 bytes
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala38
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala101
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala22
27 files changed, 151 insertions, 27 deletions
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/metadata b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/metadata
new file mode 100644
index 0000000000..3492220e36
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/metadata
@@ -0,0 +1 @@
+{"id":"dddc5e7f-1e71-454c-8362-de184444fb5a"} \ No newline at end of file
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/0 b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/0
new file mode 100644
index 0000000000..cbde042e79
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/0
@@ -0,0 +1,3 @@
+v1
+{"batchWatermarkMs":0,"batchTimestampMs":1489180207737}
+0 \ No newline at end of file
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/1 b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/1
new file mode 100644
index 0000000000..10b5774746
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/offsets/1
@@ -0,0 +1,3 @@
+v1
+{"batchWatermarkMs":0,"batchTimestampMs":1489180209261}
+2 \ No newline at end of file
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/1.delta
new file mode 100644
index 0000000000..6352978051
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/1.delta
Binary files differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/2.delta
new file mode 100644
index 0000000000..6352978051
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/0/2.delta
Binary files differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/1.delta
new file mode 100644
index 0000000000..7dc49cb3e4
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/1.delta
Binary files differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/2.delta
new file mode 100644
index 0000000000..8b566e81f4
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/1/2.delta
Binary files differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/1.delta
new file mode 100644
index 0000000000..ca2a7ed033
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/1.delta
Binary files differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/2.delta
new file mode 100644
index 0000000000..361f2db605
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/2/2.delta
Binary files differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/3/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/3/1.delta
new file mode 100644
index 0000000000..4c8804c61a
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/3/1.delta
Binary files differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/3/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/3/2.delta
new file mode 100644
index 0000000000..7d3e07fe03
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/3/2.delta
Binary files differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/4/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/4/1.delta
new file mode 100644
index 0000000000..fe521b8c07
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/4/1.delta
Binary files differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/4/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/4/2.delta
new file mode 100644
index 0000000000..6352978051
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/4/2.delta
Binary files differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/5/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/5/1.delta
new file mode 100644
index 0000000000..6352978051
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/5/1.delta
Binary files differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/5/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/5/2.delta
new file mode 100644
index 0000000000..6352978051
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/5/2.delta
Binary files differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/6/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/6/1.delta
new file mode 100644
index 0000000000..6352978051
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/6/1.delta
Binary files differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/6/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/6/2.delta
new file mode 100644
index 0000000000..e69925caba
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/6/2.delta
Binary files differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/7/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/7/1.delta
new file mode 100644
index 0000000000..6352978051
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/7/1.delta
Binary files differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/7/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/7/2.delta
new file mode 100644
index 0000000000..36397a3dda
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/7/2.delta
Binary files differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/1.delta
new file mode 100644
index 0000000000..6352978051
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/1.delta
Binary files differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/2.delta
new file mode 100644
index 0000000000..6352978051
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/8/2.delta
Binary files differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/1.delta
new file mode 100644
index 0000000000..6352978051
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/1.delta
Binary files differ
diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/2.delta
new file mode 100644
index 0000000000..0c9b6ac5c8
--- /dev/null
+++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.1.0/state/0/9/2.delta
Binary files differ
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
index f7f0dade87..dc556322be 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
@@ -21,6 +21,7 @@ import java.io.File
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.util.stringToFile
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext {
@@ -29,12 +30,37 @@ class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext {
case class StringOffset(override val json: String) extends Offset
test("OffsetSeqMetadata - deserialization") {
- assert(OffsetSeqMetadata(0, 0) === OffsetSeqMetadata("""{}"""))
- assert(OffsetSeqMetadata(1, 0) === OffsetSeqMetadata("""{"batchWatermarkMs":1}"""))
- assert(OffsetSeqMetadata(0, 2) === OffsetSeqMetadata("""{"batchTimestampMs":2}"""))
- assert(
- OffsetSeqMetadata(1, 2) ===
- OffsetSeqMetadata("""{"batchWatermarkMs":1,"batchTimestampMs":2}"""))
+ val key = SQLConf.SHUFFLE_PARTITIONS.key
+
+ def getConfWith(shufflePartitions: Int): Map[String, String] = {
+ Map(key -> shufflePartitions.toString)
+ }
+
+ // None set
+ assert(OffsetSeqMetadata(0, 0, Map.empty) === OffsetSeqMetadata("""{}"""))
+
+ // One set
+ assert(OffsetSeqMetadata(1, 0, Map.empty) === OffsetSeqMetadata("""{"batchWatermarkMs":1}"""))
+ assert(OffsetSeqMetadata(0, 2, Map.empty) === OffsetSeqMetadata("""{"batchTimestampMs":2}"""))
+ assert(OffsetSeqMetadata(0, 0, getConfWith(shufflePartitions = 2)) ===
+ OffsetSeqMetadata(s"""{"conf": {"$key":2}}"""))
+
+ // Two set
+ assert(OffsetSeqMetadata(1, 2, Map.empty) ===
+ OffsetSeqMetadata("""{"batchWatermarkMs":1,"batchTimestampMs":2}"""))
+ assert(OffsetSeqMetadata(1, 0, getConfWith(shufflePartitions = 3)) ===
+ OffsetSeqMetadata(s"""{"batchWatermarkMs":1,"conf": {"$key":3}}"""))
+ assert(OffsetSeqMetadata(0, 2, getConfWith(shufflePartitions = 3)) ===
+ OffsetSeqMetadata(s"""{"batchTimestampMs":2,"conf": {"$key":3}}"""))
+
+ // All set
+ assert(OffsetSeqMetadata(1, 2, getConfWith(shufflePartitions = 3)) ===
+ OffsetSeqMetadata(s"""{"batchWatermarkMs":1,"batchTimestampMs":2,"conf": {"$key":3}}"""))
+
+ // Drop unknown fields
+ assert(OffsetSeqMetadata(1, 2, getConfWith(shufflePartitions = 3)) ===
+ OffsetSeqMetadata(
+ s"""{"batchWatermarkMs":1,"batchTimestampMs":2,"conf": {"$key":3}},"unknown":1"""))
}
test("OffsetSeqLog - serialization - deserialization") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 6dfcd8baba..e867fc40f7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -17,17 +17,20 @@
package org.apache.spark.sql.streaming
-import java.io.{InterruptedIOException, IOException}
+import java.io.{File, InterruptedIOException, IOException}
import java.util.concurrent.{CountDownLatch, TimeoutException, TimeUnit}
import scala.reflect.ClassTag
import scala.util.control.ControlThrowable
+import org.apache.commons.io.FileUtils
+
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.StreamSourceProvider
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
@@ -389,6 +392,102 @@ class StreamSuite extends StreamTest {
query.stop()
assert(query.exception.isEmpty)
}
+
+ test("SPARK-19873: streaming aggregation with change in number of partitions") {
+ val inputData = MemoryStream[(Int, Int)]
+ val agg = inputData.toDS().groupBy("_1").count()
+
+ testStream(agg, OutputMode.Complete())(
+ AddData(inputData, (1, 0), (2, 0)),
+ StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "2")),
+ CheckAnswer((1, 1), (2, 1)),
+ StopStream,
+ AddData(inputData, (3, 0), (2, 0)),
+ StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "5")),
+ CheckAnswer((1, 1), (2, 2), (3, 1)),
+ StopStream,
+ AddData(inputData, (3, 0), (1, 0)),
+ StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "1")),
+ CheckAnswer((1, 2), (2, 2), (3, 2)))
+ }
+
+ test("recover from a Spark v2.1 checkpoint") {
+ var inputData: MemoryStream[Int] = null
+ var query: DataStreamWriter[Row] = null
+
+ def prepareMemoryStream(): Unit = {
+ inputData = MemoryStream[Int]
+ inputData.addData(1, 2, 3, 4)
+ inputData.addData(3, 4, 5, 6)
+ inputData.addData(5, 6, 7, 8)
+
+ query = inputData
+ .toDF()
+ .groupBy($"value")
+ .agg(count("*"))
+ .writeStream
+ .outputMode("complete")
+ .format("memory")
+ }
+
+ // Get an existing checkpoint generated by Spark v2.1.
+ // v2.1 does not record # shuffle partitions in the offset metadata.
+ val resourceUri =
+ this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
+ val checkpointDir = new File(resourceUri)
+
+ // 1 - Test if recovery from the checkpoint is successful.
+ prepareMemoryStream()
+ withTempDir { dir =>
+ // Copy the checkpoint to a temp dir to prevent changes to the original.
+ // Not doing this will lead to the test passing on the first run, but fail subsequent runs.
+ FileUtils.copyDirectory(checkpointDir, dir)
+
+ // Checkpoint data was generated by a query with 10 shuffle partitions.
+ // In order to test reading from the checkpoint, the checkpoint must have two or more batches,
+ // since the last batch may be rerun.
+ withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
+ var streamingQuery: StreamingQuery = null
+ try {
+ streamingQuery =
+ query.queryName("counts").option("checkpointLocation", dir.getCanonicalPath).start()
+ streamingQuery.processAllAvailable()
+ inputData.addData(9)
+ streamingQuery.processAllAvailable()
+
+ QueryTest.checkAnswer(spark.table("counts").toDF(),
+ Row("1", 1) :: Row("2", 1) :: Row("3", 2) :: Row("4", 2) ::
+ Row("5", 2) :: Row("6", 2) :: Row("7", 1) :: Row("8", 1) :: Row("9", 1) :: Nil)
+ } finally {
+ if (streamingQuery ne null) {
+ streamingQuery.stop()
+ }
+ }
+ }
+ }
+
+ // 2 - Check recovery with wrong num shuffle partitions
+ prepareMemoryStream()
+ withTempDir { dir =>
+ FileUtils.copyDirectory(checkpointDir, dir)
+
+ // Since the number of partitions is greater than 10, should throw exception.
+ withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "15") {
+ var streamingQuery: StreamingQuery = null
+ try {
+ intercept[StreamingQueryException] {
+ streamingQuery =
+ query.queryName("badQuery").option("checkpointLocation", dir.getCanonicalPath).start()
+ streamingQuery.processAllAvailable()
+ }
+ } finally {
+ if (streamingQuery ne null) {
+ streamingQuery.stop()
+ }
+ }
+ }
+ }
+ }
}
abstract class FakeSource extends StreamSourceProvider {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
index f05e9d1fda..b49efa6890 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
@@ -239,16 +239,6 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
}
}
- test("SPARK-19268: Adaptive query execution should be disallowed") {
- withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
- val e = intercept[AnalysisException] {
- MemoryStream[Int].toDS.writeStream.queryName("test-query").format("memory").start()
- }
- assert(e.getMessage.contains(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key) &&
- e.getMessage.contains("not supported"))
- }
- }
-
/** Run a body of code by defining a query on each dataset */
private def withQueriesOn(datasets: Dataset[_]*)(body: Seq[StreamingQuery] => Unit): Unit = {
failAfter(streamingTimeout) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index f61dcdcbcf..341ab0eb92 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit
import scala.concurrent.duration._
import org.apache.hadoop.fs.Path
+import org.mockito.Matchers.{any, eq => meq}
import org.mockito.Mockito._
import org.scalatest.BeforeAndAfter
@@ -370,21 +371,22 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
.option("checkpointLocation", checkpointLocationURI.toString)
.trigger(ProcessingTime(10.seconds))
.start()
+ q.processAllAvailable()
q.stop()
verify(LastOptions.mockStreamSourceProvider).createSource(
- spark.sqlContext,
- s"$checkpointLocationURI/sources/0",
- None,
- "org.apache.spark.sql.streaming.test",
- Map.empty)
+ any(),
+ meq(s"$checkpointLocationURI/sources/0"),
+ meq(None),
+ meq("org.apache.spark.sql.streaming.test"),
+ meq(Map.empty))
verify(LastOptions.mockStreamSourceProvider).createSource(
- spark.sqlContext,
- s"$checkpointLocationURI/sources/1",
- None,
- "org.apache.spark.sql.streaming.test",
- Map.empty)
+ any(),
+ meq(s"$checkpointLocationURI/sources/1"),
+ meq(None),
+ meq("org.apache.spark.sql.streaming.test"),
+ meq(Map.empty))
}
private def newTextInput = Utils.createTempDir(namePrefix = "text").getCanonicalPath