diff options
Diffstat (limited to 'sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala')
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala | 104 |
1 files changed, 99 insertions, 5 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala index c1bab9b577..00efe21d39 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala @@ -17,6 +17,11 @@ package org.apache.spark.sql.streaming.test +import java.util.concurrent.TimeUnit + +import scala.concurrent.duration._ + +import org.mockito.Mockito._ import org.scalatest.BeforeAndAfter import org.apache.spark.sql._ @@ -27,22 +32,50 @@ import org.apache.spark.sql.types.{IntegerType, StructField, StructType} import org.apache.spark.util.Utils object LastOptions { + + var mockStreamSourceProvider = mock(classOf[StreamSourceProvider]) + var mockStreamSinkProvider = mock(classOf[StreamSinkProvider]) var parameters: Map[String, String] = null var schema: Option[StructType] = null var partitionColumns: Seq[String] = Nil + + def clear(): Unit = { + parameters = null + schema = null + partitionColumns = null + reset(mockStreamSourceProvider) + reset(mockStreamSinkProvider) + } } /** Dummy provider: returns no-op source/sink and records options in [[LastOptions]]. */ class DefaultSource extends StreamSourceProvider with StreamSinkProvider { + + private val fakeSchema = StructType(StructField("a", IntegerType) :: Nil) + + override def sourceSchema( + sqlContext: SQLContext, + schema: Option[StructType], + providerName: String, + parameters: Map[String, String]): (String, StructType) = { + LastOptions.parameters = parameters + LastOptions.schema = schema + LastOptions.mockStreamSourceProvider.sourceSchema(sqlContext, schema, providerName, parameters) + ("dummySource", fakeSchema) + } + override def createSource( sqlContext: SQLContext, + metadataPath: String, schema: Option[StructType], providerName: String, parameters: Map[String, String]): Source = { LastOptions.parameters = parameters LastOptions.schema = schema + LastOptions.mockStreamSourceProvider.createSource( + sqlContext, metadataPath, schema, providerName, parameters) new Source { - override def schema: StructType = StructType(StructField("a", IntegerType) :: Nil) + override def schema: StructType = fakeSchema override def getOffset: Option[Offset] = Some(new LongOffset(0)) @@ -60,6 +93,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider { partitionColumns: Seq[String]): Sink = { LastOptions.parameters = parameters LastOptions.partitionColumns = partitionColumns + LastOptions.mockStreamSinkProvider.createSink(sqlContext, parameters, partitionColumns) new Sink { override def addBatch(batchId: Long, data: DataFrame): Unit = {} } @@ -69,7 +103,8 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider { class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with BeforeAndAfter { import testImplicits._ - private def newMetadataDir = Utils.createTempDir("streaming.metadata").getCanonicalPath + private def newMetadataDir = + Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath after { sqlContext.streams.active.foreach(_.stop()) @@ -112,7 +147,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B assert(LastOptions.parameters("opt2") == "2") assert(LastOptions.parameters("opt3") == "3") - LastOptions.parameters = null + LastOptions.clear() df.write .format("org.apache.spark.sql.streaming.test") @@ -176,7 +211,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B assert(LastOptions.parameters("path") == "/test") - LastOptions.parameters = null + LastOptions.clear() df.write .format("org.apache.spark.sql.streaming.test") @@ -199,7 +234,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B assert(LastOptions.parameters("boolOpt") == "false") assert(LastOptions.parameters("doubleOpt") == "6.7") - LastOptions.parameters = null + LastOptions.clear() df.write .format("org.apache.spark.sql.streaming.test") .option("intOpt", 56) @@ -274,4 +309,63 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B assert(activeStreamNames.contains("name")) sqlContext.streams.active.foreach(_.stop()) } + + test("trigger") { + val df = sqlContext.read + .format("org.apache.spark.sql.streaming.test") + .stream("/test") + + var q = df.write + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", newMetadataDir) + .trigger(ProcessingTime(10.seconds)) + .startStream() + q.stop() + + assert(q.asInstanceOf[StreamExecution].trigger == ProcessingTime(10000)) + + q = df.write + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", newMetadataDir) + .trigger(ProcessingTime.create(100, TimeUnit.SECONDS)) + .startStream() + q.stop() + + assert(q.asInstanceOf[StreamExecution].trigger == ProcessingTime(100000)) + } + + test("source metadataPath") { + LastOptions.clear() + + val checkpointLocation = newMetadataDir + + val df1 = sqlContext.read + .format("org.apache.spark.sql.streaming.test") + .stream() + + val df2 = sqlContext.read + .format("org.apache.spark.sql.streaming.test") + .stream() + + val q = df1.union(df2).write + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", checkpointLocation) + .trigger(ProcessingTime(10.seconds)) + .startStream() + q.stop() + + verify(LastOptions.mockStreamSourceProvider).createSource( + sqlContext, + checkpointLocation + "/sources/0", + None, + "org.apache.spark.sql.streaming.test", + Map.empty) + + verify(LastOptions.mockStreamSourceProvider).createSource( + sqlContext, + checkpointLocation + "/sources/1", + None, + "org.apache.spark.sql.streaming.test", + Map.empty) + } } |