aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala104
1 files changed, 99 insertions, 5 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala
index c1bab9b577..00efe21d39 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala
@@ -17,6 +17,11 @@
package org.apache.spark.sql.streaming.test
+import java.util.concurrent.TimeUnit
+
+import scala.concurrent.duration._
+
+import org.mockito.Mockito._
import org.scalatest.BeforeAndAfter
import org.apache.spark.sql._
@@ -27,22 +32,50 @@ import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.util.Utils
object LastOptions {
+
+ var mockStreamSourceProvider = mock(classOf[StreamSourceProvider])
+ var mockStreamSinkProvider = mock(classOf[StreamSinkProvider])
var parameters: Map[String, String] = null
var schema: Option[StructType] = null
var partitionColumns: Seq[String] = Nil
+
+ def clear(): Unit = {
+ parameters = null
+ schema = null
+ partitionColumns = null
+ reset(mockStreamSourceProvider)
+ reset(mockStreamSinkProvider)
+ }
}
/** Dummy provider: returns no-op source/sink and records options in [[LastOptions]]. */
class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
+
+ private val fakeSchema = StructType(StructField("a", IntegerType) :: Nil)
+
+ override def sourceSchema(
+ sqlContext: SQLContext,
+ schema: Option[StructType],
+ providerName: String,
+ parameters: Map[String, String]): (String, StructType) = {
+ LastOptions.parameters = parameters
+ LastOptions.schema = schema
+ LastOptions.mockStreamSourceProvider.sourceSchema(sqlContext, schema, providerName, parameters)
+ ("dummySource", fakeSchema)
+ }
+
override def createSource(
sqlContext: SQLContext,
+ metadataPath: String,
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): Source = {
LastOptions.parameters = parameters
LastOptions.schema = schema
+ LastOptions.mockStreamSourceProvider.createSource(
+ sqlContext, metadataPath, schema, providerName, parameters)
new Source {
- override def schema: StructType = StructType(StructField("a", IntegerType) :: Nil)
+ override def schema: StructType = fakeSchema
override def getOffset: Option[Offset] = Some(new LongOffset(0))
@@ -60,6 +93,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
partitionColumns: Seq[String]): Sink = {
LastOptions.parameters = parameters
LastOptions.partitionColumns = partitionColumns
+ LastOptions.mockStreamSinkProvider.createSink(sqlContext, parameters, partitionColumns)
new Sink {
override def addBatch(batchId: Long, data: DataFrame): Unit = {}
}
@@ -69,7 +103,8 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with BeforeAndAfter {
import testImplicits._
- private def newMetadataDir = Utils.createTempDir("streaming.metadata").getCanonicalPath
+ private def newMetadataDir =
+ Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
after {
sqlContext.streams.active.foreach(_.stop())
@@ -112,7 +147,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
assert(LastOptions.parameters("opt2") == "2")
assert(LastOptions.parameters("opt3") == "3")
- LastOptions.parameters = null
+ LastOptions.clear()
df.write
.format("org.apache.spark.sql.streaming.test")
@@ -176,7 +211,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
assert(LastOptions.parameters("path") == "/test")
- LastOptions.parameters = null
+ LastOptions.clear()
df.write
.format("org.apache.spark.sql.streaming.test")
@@ -199,7 +234,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
assert(LastOptions.parameters("boolOpt") == "false")
assert(LastOptions.parameters("doubleOpt") == "6.7")
- LastOptions.parameters = null
+ LastOptions.clear()
df.write
.format("org.apache.spark.sql.streaming.test")
.option("intOpt", 56)
@@ -274,4 +309,63 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B
assert(activeStreamNames.contains("name"))
sqlContext.streams.active.foreach(_.stop())
}
+
+ test("trigger") {
+ val df = sqlContext.read
+ .format("org.apache.spark.sql.streaming.test")
+ .stream("/test")
+
+ var q = df.write
+ .format("org.apache.spark.sql.streaming.test")
+ .option("checkpointLocation", newMetadataDir)
+ .trigger(ProcessingTime(10.seconds))
+ .startStream()
+ q.stop()
+
+ assert(q.asInstanceOf[StreamExecution].trigger == ProcessingTime(10000))
+
+ q = df.write
+ .format("org.apache.spark.sql.streaming.test")
+ .option("checkpointLocation", newMetadataDir)
+ .trigger(ProcessingTime.create(100, TimeUnit.SECONDS))
+ .startStream()
+ q.stop()
+
+ assert(q.asInstanceOf[StreamExecution].trigger == ProcessingTime(100000))
+ }
+
+ test("source metadataPath") {
+ LastOptions.clear()
+
+ val checkpointLocation = newMetadataDir
+
+ val df1 = sqlContext.read
+ .format("org.apache.spark.sql.streaming.test")
+ .stream()
+
+ val df2 = sqlContext.read
+ .format("org.apache.spark.sql.streaming.test")
+ .stream()
+
+ val q = df1.union(df2).write
+ .format("org.apache.spark.sql.streaming.test")
+ .option("checkpointLocation", checkpointLocation)
+ .trigger(ProcessingTime(10.seconds))
+ .startStream()
+ q.stop()
+
+ verify(LastOptions.mockStreamSourceProvider).createSource(
+ sqlContext,
+ checkpointLocation + "/sources/0",
+ None,
+ "org.apache.spark.sql.streaming.test",
+ Map.empty)
+
+ verify(LastOptions.mockStreamSourceProvider).createSource(
+ sqlContext,
+ checkpointLocation + "/sources/1",
+ None,
+ "org.apache.spark.sql.streaming.test",
+ Map.empty)
+ }
}