aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2016-02-02 10:13:54 -0800
committerMichael Armbrust <michael@databricks.com>2016-02-02 10:13:54 -0800
commit12a20c144f14e80ef120ddcfb0b455a805a2da23 (patch)
tree9debd487706ae360a4b4324e631a53cf8ab16ff5 /sql
parent22ba21348b28d8b1909ccde6fe17fb9e68531e5a (diff)
downloadspark-12a20c144f14e80ef120ddcfb0b455a805a2da23.tar.gz
spark-12a20c144f14e80ef120ddcfb0b455a805a2da23.tar.bz2
spark-12a20c144f14e80ef120ddcfb0b455a805a2da23.zip
[SPARK-10820][SQL] Support for the continuous execution of structured queries
This is a follow up to 9aadcffabd226557174f3ff566927f873c71672e that extends Spark SQL to allow users to _repeatedly_ optimize and execute structured queries. A `ContinuousQuery` can be expressed using SQL, DataFrames or Datasets. The purpose of this PR is only to add some initial infrastructure which will be extended in subsequent PRs. ## User-facing API - `sqlContext.streamFrom` and `df.streamTo` return builder objects that are analogous to the `read/write` interfaces already available to executing queries in a batch-oriented fashion. - `ContinuousQuery` provides an interface for interacting with a query that is currently executing in the background. ## Internal Interfaces - `StreamExecution` - executes streaming queries in micro-batches The following are currently internal, but public APIs will be provided in a future release. - `Source` - an interface for providers of continually arriving data. A source must have a notion of an `Offset` that monotonically tracks what data has arrived. For fault tolerance, a source must be able to replay data given a start offset. - `Sink` - an interface that accepts the results of a continuously executing query. Also responsible for tracking the offset that should be resumed from in the case of a failure. ## Testing - `MemoryStream` and `MemorySink` - simple implementations of source and sink that keep all data in memory and have methods for simulating durability failures - `StreamTest` - a framework for performing actions and checking invariants on a continuous query Author: Michael Armbrust <michael@databricks.com> Author: Tathagata Das <tathagata.das1565@gmail.com> Author: Josh Rosen <rosenville@gmail.com> Closes #11006 from marmbrus/structured-streaming.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala30
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataStreamReader.scala127
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataStreamWriter.scala134
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala33
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Batch.scala26
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala67
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala33
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala37
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala47
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala36
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala211
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala67
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala34
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala138
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala21
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala74
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala346
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/DataStreamReaderSuite.scala166
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySourceStressSuite.scala33
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala98
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala84
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala2
24 files changed, 1828 insertions, 32 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala
new file mode 100644
index 0000000000..1c2c0290fc
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+/**
+ * A handle to a query that is executing continuously in the background as new data arrives.
+ */
+trait ContinuousQuery {
+
+ /**
+ * Stops the execution of this query if it is running. This method blocks until the threads
+ * performing execution has stopped.
+ */
+ def stop(): Unit
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 518f9dcf94..6de17e5924 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -1691,6 +1691,14 @@ class DataFrame private[sql](
def write: DataFrameWriter = new DataFrameWriter(this)
/**
+ * :: Experimental ::
+ * Interface for starting a streaming query that will continually output results to the specified
+ * external sink as new data arrives.
+ */
+ @Experimental
+ def streamTo: DataStreamWriter = new DataStreamWriter(this)
+
+ /**
* Returns the content of the [[DataFrame]] as a RDD of JSON strings.
* @group rdd
* @since 1.3.0
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataStreamReader.scala
new file mode 100644
index 0000000000..2febc93fa4
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataStreamReader.scala
@@ -0,0 +1,127 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.Logging
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.execution.datasources.ResolvedDataSource
+import org.apache.spark.sql.execution.streaming.StreamingRelation
+import org.apache.spark.sql.types.StructType
+
+/**
+ * :: Experimental ::
+ * An interface to reading streaming data. Use `sqlContext.streamFrom` to access these methods.
+ *
+ * {{{
+ * val df = sqlContext.streamFrom
+ * .format("...")
+ * .open()
+ * }}}
+ */
+@Experimental
+class DataStreamReader private[sql](sqlContext: SQLContext) extends Logging {
+
+ /**
+ * Specifies the input data source format.
+ *
+ * @since 2.0.0
+ */
+ def format(source: String): DataStreamReader = {
+ this.source = source
+ this
+ }
+
+ /**
+ * Specifies the input schema. Some data streams (e.g. JSON) can infer the input schema
+ * automatically from data. By specifying the schema here, the underlying data stream can
+ * skip the schema inference step, and thus speed up data reading.
+ *
+ * @since 2.0.0
+ */
+ def schema(schema: StructType): DataStreamReader = {
+ this.userSpecifiedSchema = Option(schema)
+ this
+ }
+
+ /**
+ * Adds an input option for the underlying data stream.
+ *
+ * @since 2.0.0
+ */
+ def option(key: String, value: String): DataStreamReader = {
+ this.extraOptions += (key -> value)
+ this
+ }
+
+ /**
+ * (Scala-specific) Adds input options for the underlying data stream.
+ *
+ * @since 2.0.0
+ */
+ def options(options: scala.collection.Map[String, String]): DataStreamReader = {
+ this.extraOptions ++= options
+ this
+ }
+
+ /**
+ * Adds input options for the underlying data stream.
+ *
+ * @since 2.0.0
+ */
+ def options(options: java.util.Map[String, String]): DataStreamReader = {
+ this.options(options.asScala)
+ this
+ }
+
+ /**
+ * Loads streaming input in as a [[DataFrame]], for data streams that don't require a path (e.g.
+ * external key-value stores).
+ *
+ * @since 2.0.0
+ */
+ def open(): DataFrame = {
+ val resolved = ResolvedDataSource.createSource(
+ sqlContext,
+ userSpecifiedSchema = userSpecifiedSchema,
+ providerName = source,
+ options = extraOptions.toMap)
+ DataFrame(sqlContext, StreamingRelation(resolved))
+ }
+
+ /**
+ * Loads input in as a [[DataFrame]], for data streams that read from some path.
+ *
+ * @since 2.0.0
+ */
+ def open(path: String): DataFrame = {
+ option("path", path).open()
+ }
+
+ ///////////////////////////////////////////////////////////////////////////////////////
+ // Builder pattern config options
+ ///////////////////////////////////////////////////////////////////////////////////////
+
+ private var source: String = sqlContext.conf.defaultDataSourceName
+
+ private var userSpecifiedSchema: Option[StructType] = None
+
+ private var extraOptions = new scala.collection.mutable.HashMap[String, String]
+
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataStreamWriter.scala
new file mode 100644
index 0000000000..b325d48fcb
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataStreamWriter.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.execution.datasources.ResolvedDataSource
+import org.apache.spark.sql.execution.streaming.StreamExecution
+
+/**
+ * :: Experimental ::
+ * Interface used to start a streaming query query execution.
+ *
+ * @since 2.0.0
+ */
+@Experimental
+final class DataStreamWriter private[sql](df: DataFrame) {
+
+ /**
+ * Specifies the underlying output data source. Built-in options include "parquet", "json", etc.
+ *
+ * @since 2.0.0
+ */
+ def format(source: String): DataStreamWriter = {
+ this.source = source
+ this
+ }
+
+ /**
+ * Adds an output option for the underlying data source.
+ *
+ * @since 2.0.0
+ */
+ def option(key: String, value: String): DataStreamWriter = {
+ this.extraOptions += (key -> value)
+ this
+ }
+
+ /**
+ * (Scala-specific) Adds output options for the underlying data source.
+ *
+ * @since 2.0.0
+ */
+ def options(options: scala.collection.Map[String, String]): DataStreamWriter = {
+ this.extraOptions ++= options
+ this
+ }
+
+ /**
+ * Adds output options for the underlying data source.
+ *
+ * @since 2.0.0
+ */
+ def options(options: java.util.Map[String, String]): DataStreamWriter = {
+ this.options(options.asScala)
+ this
+ }
+
+ /**
+ * Partitions the output by the given columns on the file system. If specified, the output is
+ * laid out on the file system similar to Hive's partitioning scheme.\
+ * @since 2.0.0
+ */
+ @scala.annotation.varargs
+ def partitionBy(colNames: String*): DataStreamWriter = {
+ this.partitioningColumns = colNames
+ this
+ }
+
+ /**
+ * Starts the execution of the streaming query, which will continually output results to the given
+ * path as new data arrives. The returned [[ContinuousQuery]] object can be used to interact with
+ * the stream.
+ * @since 2.0.0
+ */
+ def start(path: String): ContinuousQuery = {
+ this.extraOptions += ("path" -> path)
+ start()
+ }
+
+ /**
+ * Starts the execution of the streaming query, which will continually output results to the given
+ * path as new data arrives. The returned [[ContinuousQuery]] object can be used to interact with
+ * the stream.
+ *
+ * @since 2.0.0
+ */
+ def start(): ContinuousQuery = {
+ val sink = ResolvedDataSource.createSink(
+ df.sqlContext,
+ source,
+ extraOptions.toMap,
+ normalizedParCols)
+
+ new StreamExecution(df.sqlContext, df.logicalPlan, sink)
+ }
+
+ private def normalizedParCols: Seq[String] = {
+ partitioningColumns.map { col =>
+ df.logicalPlan.output
+ .map(_.name)
+ .find(df.sqlContext.analyzer.resolver(_, col))
+ .getOrElse(throw new AnalysisException(s"Partition column $col not found in existing " +
+ s"columns (${df.logicalPlan.output.map(_.name).mkString(", ")})"))
+ }
+ }
+
+ ///////////////////////////////////////////////////////////////////////////////////////
+ // Builder pattern config options
+ ///////////////////////////////////////////////////////////////////////////////////////
+
+ private var source: String = df.sqlContext.conf.defaultDataSourceName
+
+ private var extraOptions = new scala.collection.mutable.HashMap[String, String]
+
+ private var partitioningColumns: Seq[String] = Nil
+
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index ef993c3eda..13700be068 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -594,6 +594,14 @@ class SQLContext private[sql](
@Experimental
def read: DataFrameReader = new DataFrameReader(this)
+
+ /**
+ * :: Experimental ::
+ * Returns a [[DataStreamReader]] than can be used to access data continuously as it arrives.
+ */
+ @Experimental
+ def streamFrom: DataStreamReader = new DataStreamReader(this)
+
/**
* :: Experimental ::
* Creates an external table from the given path and returns the corresponding DataFrame.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
index cc8dcf5930..e3065ac5f8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
@@ -29,11 +29,11 @@ import org.apache.hadoop.util.StringUtils
import org.apache.spark.Logging
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{CalendarIntervalType, StructType}
import org.apache.spark.util.Utils
-
case class ResolvedDataSource(provider: Class[_], relation: BaseRelation)
@@ -92,6 +92,37 @@ object ResolvedDataSource extends Logging {
}
}
+ def createSource(
+ sqlContext: SQLContext,
+ userSpecifiedSchema: Option[StructType],
+ providerName: String,
+ options: Map[String, String]): Source = {
+ val provider = lookupDataSource(providerName).newInstance() match {
+ case s: StreamSourceProvider => s
+ case _ =>
+ throw new UnsupportedOperationException(
+ s"Data source $providerName does not support streamed reading")
+ }
+
+ provider.createSource(sqlContext, options, userSpecifiedSchema)
+ }
+
+ def createSink(
+ sqlContext: SQLContext,
+ providerName: String,
+ options: Map[String, String],
+ partitionColumns: Seq[String]): Sink = {
+ val provider = lookupDataSource(providerName).newInstance() match {
+ case s: StreamSinkProvider => s
+ case _ =>
+ throw new UnsupportedOperationException(
+ s"Data source $providerName does not support streamed writing")
+ }
+
+ provider.createSink(sqlContext, options, partitionColumns)
+ }
+
+
/** Create a [[ResolvedDataSource]] for reading data in. */
def apply(
sqlContext: SQLContext,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Batch.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Batch.scala
new file mode 100644
index 0000000000..1f25eb8fc5
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Batch.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.sql.DataFrame
+
+/**
+ * Used to pass a batch of data through a streaming query execution along with an indication
+ * of progress in the stream.
+ */
+class Batch(val end: Offset, val data: DataFrame)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala
new file mode 100644
index 0000000000..d2cb20ef8b
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.util.Try
+
+/**
+ * An ordered collection of offsets, used to track the progress of processing data from one or more
+ * [[Source]]s that are present in a streaming query. This is similar to simplified, single-instance
+ * vector clock that must progress linearly forward.
+ */
+case class CompositeOffset(offsets: Seq[Option[Offset]]) extends Offset {
+ /**
+ * Returns a negative integer, zero, or a positive integer as this object is less than, equal to,
+ * or greater than the specified object.
+ */
+ override def compareTo(other: Offset): Int = other match {
+ case otherComposite: CompositeOffset if otherComposite.offsets.size == offsets.size =>
+ val comparisons = offsets.zip(otherComposite.offsets).map {
+ case (Some(a), Some(b)) => a compareTo b
+ case (None, None) => 0
+ case (None, _) => -1
+ case (_, None) => 1
+ }
+ val nonZeroSigns = comparisons.map(sign).filter(_ != 0).toSet
+ nonZeroSigns.size match {
+ case 0 => 0 // if both empty or only 0s
+ case 1 => nonZeroSigns.head // if there are only (0s and 1s) or (0s and -1s)
+ case _ => // there are both 1s and -1s
+ throw new IllegalArgumentException(
+ s"Invalid comparison between non-linear histories: $this <=> $other")
+ }
+ case _ =>
+ throw new IllegalArgumentException(s"Cannot compare $this <=> $other")
+ }
+
+ private def sign(num: Int): Int = num match {
+ case i if i < 0 => -1
+ case i if i == 0 => 0
+ case i if i > 0 => 1
+ }
+}
+
+object CompositeOffset {
+ /**
+ * Returns a [[CompositeOffset]] with a variable sequence of offsets.
+ * `nulls` in the sequence are converted to `None`s.
+ */
+ def fill(offsets: Offset*): CompositeOffset = {
+ CompositeOffset(offsets.map(Option(_)))
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
new file mode 100644
index 0000000000..008195af38
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+/**
+ * A simple offset for sources that produce a single linear stream of data.
+ */
+case class LongOffset(offset: Long) extends Offset {
+
+ override def compareTo(other: Offset): Int = other match {
+ case l: LongOffset => offset.compareTo(l.offset)
+ case _ =>
+ throw new IllegalArgumentException(s"Invalid comparison of $getClass with ${other.getClass}")
+ }
+
+ def +(increment: Long): LongOffset = new LongOffset(offset + increment)
+ def -(decrement: Long): LongOffset = new LongOffset(offset - decrement)
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala
new file mode 100644
index 0000000000..0f5d6445b1
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+/**
+ * A offset is a monotonically increasing metric used to track progress in the computation of a
+ * stream. An [[Offset]] must be comparable, and the result of `compareTo` must be consistent
+ * with `equals` and `hashcode`.
+ */
+trait Offset extends Serializable {
+
+ /**
+ * Returns a negative integer, zero, or a positive integer as this object is less than, equal to,
+ * or greater than the specified object.
+ */
+ def compareTo(other: Offset): Int
+
+ def >(other: Offset): Boolean = compareTo(other) > 0
+ def <(other: Offset): Boolean = compareTo(other) < 0
+ def <=(other: Offset): Boolean = compareTo(other) <= 0
+ def >=(other: Offset): Boolean = compareTo(other) >= 0
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala
new file mode 100644
index 0000000000..1bd71b6b02
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+/**
+ * An interface for systems that can collect the results of a streaming query.
+ *
+ * When new data is produced by a query, a [[Sink]] must be able to transactionally collect the
+ * data and update the [[Offset]]. In the case of a failure, the sink will be recreated
+ * and must be able to return the [[Offset]] for all of the data that is made durable.
+ * This contract allows Spark to process data with exactly-once semantics, even in the case
+ * of failures that require the computation to be restarted.
+ */
+trait Sink {
+ /**
+ * Returns the [[Offset]] for all data that is currently present in the sink, if any. This
+ * function will be called by Spark when restarting execution in order to determine at which point
+ * in the input stream computation should be resumed from.
+ */
+ def currentOffset: Option[Offset]
+
+ /**
+ * Accepts a new batch of data as well as a [[Offset]] that denotes how far in the input
+ * data computation has progressed to. When computation restarts after a failure, it is important
+ * that a [[Sink]] returns the same [[Offset]] as the most recent batch of data that
+ * has been persisted durrably. Note that this does not necessarily have to be the
+ * [[Offset]] for the most recent batch of data that was given to the sink. For example,
+ * it is valid to buffer data before persisting, as long as the [[Offset]] is stored
+ * transactionally as data is eventually persisted.
+ */
+ def addBatch(batch: Batch): Unit
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
new file mode 100644
index 0000000000..25922979ac
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A source of continually arriving data for a streaming query. A [[Source]] must have a
+ * monotonically increasing notion of progress that can be represented as an [[Offset]]. Spark
+ * will regularly query each [[Source]] to see if any more data is available.
+ */
+trait Source {
+
+ /** Returns the schema of the data from this source */
+ def schema: StructType
+
+ /**
+ * Returns the next batch of data that is available after `start`, if any is available.
+ */
+ def getNextBatch(start: Option[Offset]): Option[Batch]
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
new file mode 100644
index 0000000000..ebebb82971
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.lang.Thread.UncaughtExceptionHandler
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.{ContinuousQuery, DataFrame, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.execution.QueryExecution
+
+/**
+ * Manages the execution of a streaming Spark SQL query that is occurring in a separate thread.
+ * Unlike a standard query, a streaming query executes repeatedly each time new data arrives at any
+ * [[Source]] present in the query plan. Whenever new data arrives, a [[QueryExecution]] is created
+ * and the results are committed transactionally to the given [[Sink]].
+ */
+class StreamExecution(
+ sqlContext: SQLContext,
+ private[sql] val logicalPlan: LogicalPlan,
+ val sink: Sink) extends ContinuousQuery with Logging {
+
+ /** An monitor used to wait/notify when batches complete. */
+ private val awaitBatchLock = new Object
+
+ @volatile
+ private var batchRun = false
+
+ /** Minimum amount of time in between the start of each batch. */
+ private val minBatchTime = 10
+
+ /** Tracks how much data we have processed from each input source. */
+ private[sql] val streamProgress = new StreamProgress
+
+ /** All stream sources present the query plan. */
+ private val sources =
+ logicalPlan.collect { case s: StreamingRelation => s.source }
+
+ // Start the execution at the current offsets stored in the sink. (i.e. avoid reprocessing data
+ // that we have already processed).
+ {
+ sink.currentOffset match {
+ case Some(c: CompositeOffset) =>
+ val storedProgress = c.offsets
+ val sources = logicalPlan collect {
+ case StreamingRelation(source, _) => source
+ }
+
+ assert(sources.size == storedProgress.size)
+ sources.zip(storedProgress).foreach { case (source, offset) =>
+ offset.foreach(streamProgress.update(source, _))
+ }
+ case None => // We are starting this stream for the first time.
+ case _ => throw new IllegalArgumentException("Expected composite offset from sink")
+ }
+ }
+
+ logInfo(s"Stream running at $streamProgress")
+
+ /** When false, signals to the microBatchThread that it should stop running. */
+ @volatile private var shouldRun = true
+
+ /** The thread that runs the micro-batches of this stream. */
+ private[sql] val microBatchThread = new Thread("stream execution thread") {
+ override def run(): Unit = {
+ SQLContext.setActive(sqlContext)
+ while (shouldRun) {
+ attemptBatch()
+ Thread.sleep(minBatchTime) // TODO: Could be tighter
+ }
+ }
+ }
+ microBatchThread.setDaemon(true)
+ microBatchThread.setUncaughtExceptionHandler(
+ new UncaughtExceptionHandler {
+ override def uncaughtException(t: Thread, e: Throwable): Unit = {
+ streamDeathCause = e
+ }
+ })
+ microBatchThread.start()
+
+ @volatile
+ private[sql] var lastExecution: QueryExecution = null
+ @volatile
+ private[sql] var streamDeathCause: Throwable = null
+
+ /**
+ * Checks to see if any new data is present in any of the sources. When new data is available,
+ * a batch is executed and passed to the sink, updating the currentOffsets.
+ */
+ private def attemptBatch(): Unit = {
+ val startTime = System.nanoTime()
+
+ // A list of offsets that need to be updated if this batch is successful.
+ // Populated while walking the tree.
+ val newOffsets = new ArrayBuffer[(Source, Offset)]
+ // A list of attributes that will need to be updated.
+ var replacements = new ArrayBuffer[(Attribute, Attribute)]
+ // Replace sources in the logical plan with data that has arrived since the last batch.
+ val withNewSources = logicalPlan transform {
+ case StreamingRelation(source, output) =>
+ val prevOffset = streamProgress.get(source)
+ val newBatch = source.getNextBatch(prevOffset)
+
+ newBatch.map { batch =>
+ newOffsets += ((source, batch.end))
+ val newPlan = batch.data.logicalPlan
+
+ assert(output.size == newPlan.output.size)
+ replacements ++= output.zip(newPlan.output)
+ newPlan
+ }.getOrElse {
+ LocalRelation(output)
+ }
+ }
+
+ // Rewire the plan to use the new attributes that were returned by the source.
+ val replacementMap = AttributeMap(replacements)
+ val newPlan = withNewSources transformAllExpressions {
+ case a: Attribute if replacementMap.contains(a) => replacementMap(a)
+ }
+
+ if (newOffsets.nonEmpty) {
+ val optimizerStart = System.nanoTime()
+
+ lastExecution = new QueryExecution(sqlContext, newPlan)
+ val executedPlan = lastExecution.executedPlan
+ val optimizerTime = (System.nanoTime() - optimizerStart).toDouble / 1000000
+ logDebug(s"Optimized batch in ${optimizerTime}ms")
+
+ streamProgress.synchronized {
+ // Update the offsets and calculate a new composite offset
+ newOffsets.foreach(streamProgress.update)
+ val newStreamProgress = logicalPlan.collect {
+ case StreamingRelation(source, _) => streamProgress.get(source)
+ }
+ val batchOffset = CompositeOffset(newStreamProgress)
+
+ // Construct the batch and send it to the sink.
+ val nextBatch = new Batch(batchOffset, new DataFrame(sqlContext, newPlan))
+ sink.addBatch(nextBatch)
+ }
+
+ batchRun = true
+ awaitBatchLock.synchronized {
+ // Wake up any threads that are waiting for the stream to progress.
+ awaitBatchLock.notifyAll()
+ }
+
+ val batchTime = (System.nanoTime() - startTime).toDouble / 1000000
+ logInfo(s"Compete up to $newOffsets in ${batchTime}ms")
+ }
+
+ logDebug(s"Waiting for data, current: $streamProgress")
+ }
+
+ /**
+ * Signals to the thread executing micro-batches that it should stop running after the next
+ * batch. This method blocks until the thread stops running.
+ */
+ def stop(): Unit = {
+ shouldRun = false
+ if (microBatchThread.isAlive) { microBatchThread.join() }
+ }
+
+ /**
+ * Blocks the current thread until processing for data from the given `source` has reached at
+ * least the given `Offset`. This method is indented for use primarily when writing tests.
+ */
+ def awaitOffset(source: Source, newOffset: Offset): Unit = {
+ def notDone = streamProgress.synchronized {
+ !streamProgress.contains(source) || streamProgress(source) < newOffset
+ }
+
+ while (notDone) {
+ logInfo(s"Waiting until $newOffset at $source")
+ awaitBatchLock.synchronized { awaitBatchLock.wait(100) }
+ }
+ logDebug(s"Unblocked at $newOffset for $source")
+ }
+
+ override def toString: String =
+ s"""
+ |=== Streaming Query ===
+ |CurrentOffsets: $streamProgress
+ |Thread State: ${microBatchThread.getState}
+ |${if (streamDeathCause != null) stackTraceToString(streamDeathCause) else ""}
+ |
+ |$logicalPlan
+ """.stripMargin
+}
+
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
new file mode 100644
index 0000000000..0ded1d7152
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.collection.mutable
+
+/**
+ * A helper class that looks like a Map[Source, Offset].
+ */
+class StreamProgress {
+ private val currentOffsets = new mutable.HashMap[Source, Offset]
+
+ private[streaming] def update(source: Source, newOffset: Offset): Unit = {
+ currentOffsets.get(source).foreach(old =>
+ assert(newOffset > old, s"Stream going backwards $newOffset -> $old"))
+ currentOffsets.put(source, newOffset)
+ }
+
+ private[streaming] def update(newOffset: (Source, Offset)): Unit =
+ update(newOffset._1, newOffset._2)
+
+ private[streaming] def apply(source: Source): Offset = currentOffsets(source)
+ private[streaming] def get(source: Source): Option[Offset] = currentOffsets.get(source)
+ private[streaming] def contains(source: Source): Boolean = currentOffsets.contains(source)
+
+ private[streaming] def ++(updates: Map[Source, Offset]): StreamProgress = {
+ val updated = new StreamProgress
+ currentOffsets.foreach(updated.update)
+ updates.foreach(updated.update)
+ updated
+ }
+
+ /**
+ * Used to create a new copy of this [[StreamProgress]]. While this class is currently mutable,
+ * it should be copied before being passed to user code.
+ */
+ private[streaming] def copy(): StreamProgress = {
+ val copied = new StreamProgress
+ currentOffsets.foreach(copied.update)
+ copied
+ }
+
+ override def toString: String =
+ currentOffsets.map { case (k, v) => s"$k: $v"}.mkString("{", ",", "}")
+
+ override def equals(other: Any): Boolean = other match {
+ case s: StreamProgress => currentOffsets == s.currentOffsets
+ case _ => false
+ }
+
+ override def hashCode: Int = currentOffsets.hashCode()
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
new file mode 100644
index 0000000000..e35c444348
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.LeafNode
+
+object StreamingRelation {
+ def apply(source: Source): StreamingRelation =
+ StreamingRelation(source, source.schema.toAttributes)
+}
+
+/**
+ * Used to link a streaming [[Source]] of data into a
+ * [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]].
+ */
+case class StreamingRelation(source: Source, output: Seq[Attribute]) extends LeafNode {
+ override def toString: String = source.toString
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
new file mode 100644
index 0000000000..e6a0842936
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{Logging, SparkEnv}
+import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Row, SQLContext}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.{encoderFor, RowEncoder}
+import org.apache.spark.sql.types.StructType
+
+object MemoryStream {
+ protected val currentBlockId = new AtomicInteger(0)
+ protected val memoryStreamId = new AtomicInteger(0)
+
+ def apply[A : Encoder](implicit sqlContext: SQLContext): MemoryStream[A] =
+ new MemoryStream[A](memoryStreamId.getAndIncrement(), sqlContext)
+}
+
+/**
+ * A [[Source]] that produces value stored in memory as they are added by the user. This [[Source]]
+ * is primarily intended for use in unit tests as it can only replay data when the object is still
+ * available.
+ */
+case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
+ extends Source with Logging {
+ protected val encoder = encoderFor[A]
+ protected val logicalPlan = StreamingRelation(this)
+ protected val output = logicalPlan.output
+ protected val batches = new ArrayBuffer[Dataset[A]]
+ protected var currentOffset: LongOffset = new LongOffset(-1)
+
+ protected def blockManager = SparkEnv.get.blockManager
+
+ def schema: StructType = encoder.schema
+
+ def getCurrentOffset: Offset = currentOffset
+
+ def toDS()(implicit sqlContext: SQLContext): Dataset[A] = {
+ new Dataset(sqlContext, logicalPlan)
+ }
+
+ def toDF()(implicit sqlContext: SQLContext): DataFrame = {
+ new DataFrame(sqlContext, logicalPlan)
+ }
+
+ def addData(data: TraversableOnce[A]): Offset = {
+ import sqlContext.implicits._
+ this.synchronized {
+ currentOffset = currentOffset + 1
+ val ds = data.toVector.toDS()
+ logDebug(s"Adding ds: $ds")
+ batches.append(ds)
+ currentOffset
+ }
+ }
+
+ override def getNextBatch(start: Option[Offset]): Option[Batch] = synchronized {
+ val newBlocks =
+ batches.drop(
+ start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1)
+
+ if (newBlocks.nonEmpty) {
+ logDebug(s"Running [$start, $currentOffset] on blocks ${newBlocks.mkString(", ")}")
+ val df = newBlocks
+ .map(_.toDF())
+ .reduceOption(_ unionAll _)
+ .getOrElse(sqlContext.emptyDataFrame)
+
+ Some(new Batch(currentOffset, df))
+ } else {
+ None
+ }
+ }
+
+ override def toString: String = s"MemoryStream[${output.mkString(",")}]"
+}
+
+/**
+ * A sink that stores the results in memory. This [[Sink]] is primarily intended for use in unit
+ * tests and does not provide durablility.
+ */
+class MemorySink(schema: StructType) extends Sink with Logging {
+ /** An order list of batches that have been written to this [[Sink]]. */
+ private var batches = new ArrayBuffer[Batch]()
+
+ /** Used to convert an [[InternalRow]] to an external [[Row]] for comparison in testing. */
+ private val externalRowConverter = RowEncoder(schema)
+
+ override def currentOffset: Option[Offset] = synchronized {
+ batches.lastOption.map(_.end)
+ }
+
+ override def addBatch(nextBatch: Batch): Unit = synchronized {
+ batches.append(nextBatch)
+ }
+
+ /** Returns all rows that are stored in this [[Sink]]. */
+ def allData: Seq[Row] = synchronized {
+ batches
+ .map(_.data)
+ .reduceOption(_ unionAll _)
+ .map(_.collect().toSeq)
+ .getOrElse(Seq.empty)
+ }
+
+ /**
+ * Atomically drops the most recent `num` batches and resets the [[StreamProgress]] to the
+ * corresponding point in the input. This function can be used when testing to simulate data
+ * that has been lost due to buffering.
+ */
+ def dropBatches(num: Int): Unit = synchronized {
+ batches.dropRight(num)
+ }
+
+ override def toString: String = synchronized {
+ batches.map(b => s"${b.end}: ${b.data.collect().mkString(" ")}").mkString("\n")
+ }
+}
+
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 8911ad370a..299fc6efbb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
import org.apache.spark.sql.execution.{FileRelation, RDDConversions}
import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.util.SerializableConfiguration
@@ -124,6 +125,26 @@ trait SchemaRelationProvider {
}
/**
+ * Implemented by objects that can produce a streaming [[Source]] for a specific format or system.
+ */
+trait StreamSourceProvider {
+ def createSource(
+ sqlContext: SQLContext,
+ parameters: Map[String, String],
+ schema: Option[StructType]): Source
+}
+
+/**
+ * Implemented by objects that can produce a streaming [[Sink]] for a specific format or system.
+ */
+trait StreamSinkProvider {
+ def createSink(
+ sqlContext: SQLContext,
+ parameters: Map[String, String],
+ partitionColumns: Seq[String]): Sink
+}
+
+/**
* ::Experimental::
* Implemented by objects that produce relations for a specific kind of data source
* with a given schema and partitioned columns. When Spark SQL is given a DDL operation with a
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
index ce12f788b7..405e5891ac 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -304,27 +304,7 @@ object QueryTest {
def checkAnswer(df: DataFrame, expectedAnswer: Seq[Row]): Option[String] = {
val isSorted = df.logicalPlan.collect { case s: logical.Sort => s }.nonEmpty
- // We need to call prepareRow recursively to handle schemas with struct types.
- def prepareRow(row: Row): Row = {
- Row.fromSeq(row.toSeq.map {
- case null => null
- case d: java.math.BigDecimal => BigDecimal(d)
- // Convert array to Seq for easy equality check.
- case b: Array[_] => b.toSeq
- case r: Row => prepareRow(r)
- case o => o
- })
- }
- def prepareAnswer(answer: Seq[Row]): Seq[Row] = {
- // Converts data to types that we can do equality comparison using Scala collections.
- // For BigDecimal type, the Scala type has a better definition of equality test (similar to
- // Java's java.math.BigDecimal.compareTo).
- // For binary arrays, we convert it to Seq to avoid of calling java.util.Arrays.equals for
- // equality test.
- val converted: Seq[Row] = answer.map(prepareRow)
- if (!isSorted) converted.sortBy(_.toString()) else converted
- }
val sparkAnswer = try df.collect().toSeq catch {
case e: Exception =>
val errorMessage =
@@ -338,22 +318,56 @@ object QueryTest {
return Some(errorMessage)
}
- if (prepareAnswer(expectedAnswer) != prepareAnswer(sparkAnswer)) {
- val errorMessage =
+ sameRows(expectedAnswer, sparkAnswer, isSorted).map { results =>
s"""
|Results do not match for query:
|${df.queryExecution}
|== Results ==
- |${sideBySide(
- s"== Correct Answer - ${expectedAnswer.size} ==" +:
- prepareAnswer(expectedAnswer).map(_.toString()),
- s"== Spark Answer - ${sparkAnswer.size} ==" +:
- prepareAnswer(sparkAnswer).map(_.toString())).mkString("\n")}
- """.stripMargin
- return Some(errorMessage)
+ |$results
+ """.stripMargin
}
+ }
+
+
+ def prepareAnswer(answer: Seq[Row], isSorted: Boolean): Seq[Row] = {
+ // Converts data to types that we can do equality comparison using Scala collections.
+ // For BigDecimal type, the Scala type has a better definition of equality test (similar to
+ // Java's java.math.BigDecimal.compareTo).
+ // For binary arrays, we convert it to Seq to avoid of calling java.util.Arrays.equals for
+ // equality test.
+ val converted: Seq[Row] = answer.map(prepareRow)
+ if (!isSorted) converted.sortBy(_.toString()) else converted
+ }
- return None
+ // We need to call prepareRow recursively to handle schemas with struct types.
+ def prepareRow(row: Row): Row = {
+ Row.fromSeq(row.toSeq.map {
+ case null => null
+ case d: java.math.BigDecimal => BigDecimal(d)
+ // Convert array to Seq for easy equality check.
+ case b: Array[_] => b.toSeq
+ case r: Row => prepareRow(r)
+ case o => o
+ })
+ }
+
+ def sameRows(
+ expectedAnswer: Seq[Row],
+ sparkAnswer: Seq[Row],
+ isSorted: Boolean = false): Option[String] = {
+ if (prepareAnswer(expectedAnswer, isSorted) != prepareAnswer(sparkAnswer, isSorted)) {
+ val errorMessage =
+ s"""
+ |== Results ==
+ |${sideBySide(
+ s"== Correct Answer - ${expectedAnswer.size} ==" +:
+ prepareAnswer(expectedAnswer, isSorted).map(_.toString()),
+ s"== Spark Answer - ${sparkAnswer.size} ==" +:
+ prepareAnswer(sparkAnswer, isSorted).map(_.toString())).mkString("\n")}
+ """.stripMargin
+ return Some(errorMessage)
+ }
+ None
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
new file mode 100644
index 0000000000..f45abbf249
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.lang.Thread.UncaughtExceptionHandler
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
+
+import org.scalatest.concurrent.Timeouts
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder, RowEncoder}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.execution.streaming._
+
+/**
+ * A framework for implementing tests for streaming queries and sources.
+ *
+ * A test consists of a set of steps (expressed as a `StreamAction`) that are executed in order,
+ * blocking as necessary to let the stream catch up. For example, the following adds some data to
+ * a stream, blocking until it can verify that the correct values are eventually produced.
+ *
+ * {{{
+ * val inputData = MemoryStream[Int]
+ val mapped = inputData.toDS().map(_ + 1)
+
+ testStream(mapped)(
+ AddData(inputData, 1, 2, 3),
+ CheckAnswer(2, 3, 4))
+ * }}}
+ *
+ * Note that while we do sleep to allow the other thread to progress without spinning,
+ * `StreamAction` checks should not depend on the amount of time spent sleeping. Instead they
+ * should check the actual progress of the stream before verifying the required test condition.
+ *
+ * Currently it is assumed that all streaming queries will eventually complete in 10 seconds to
+ * avoid hanging forever in the case of failures. However, individual suites can change this
+ * by overriding `streamingTimeout`.
+ */
+trait StreamTest extends QueryTest with Timeouts {
+
+ implicit class RichSource(s: Source) {
+ def toDF(): DataFrame = new DataFrame(sqlContext, StreamingRelation(s))
+ }
+
+ /** How long to wait for an active stream to catch up when checking a result. */
+ val streamingTimout = 10.seconds
+
+ /** A trait for actions that can be performed while testing a streaming DataFrame. */
+ trait StreamAction
+
+ /** A trait to mark actions that require the stream to be actively running. */
+ trait StreamMustBeRunning
+
+ /**
+ * Adds the given data to the stream. Subsuquent check answers will block until this data has
+ * been processed.
+ */
+ object AddData {
+ def apply[A](source: MemoryStream[A], data: A*): AddDataMemory[A] =
+ AddDataMemory(source, data)
+ }
+
+ /** A trait that can be extended when testing other sources. */
+ trait AddData extends StreamAction {
+ def source: Source
+
+ /**
+ * Called to trigger adding the data. Should return the offset that will denote when this
+ * new data has been processed.
+ */
+ def addData(): Offset
+ }
+
+ case class AddDataMemory[A](source: MemoryStream[A], data: Seq[A]) extends AddData {
+ override def toString: String = s"AddData to $source: ${data.mkString(",")}"
+
+ override def addData(): Offset = {
+ source.addData(data)
+ }
+ }
+
+ /**
+ * Checks to make sure that the current data stored in the sink matches the `expectedAnswer`.
+ * This operation automatically blocks untill all added data has been processed.
+ */
+ object CheckAnswer {
+ def apply[A : Encoder](data: A*): CheckAnswerRows = {
+ val encoder = encoderFor[A]
+ val toExternalRow = RowEncoder(encoder.schema)
+ CheckAnswerRows(data.map(d => toExternalRow.fromRow(encoder.toRow(d))))
+ }
+
+ def apply(rows: Row*): CheckAnswerRows = CheckAnswerRows(rows)
+ }
+
+ case class CheckAnswerRows(expectedAnswer: Seq[Row])
+ extends StreamAction with StreamMustBeRunning {
+ override def toString: String = s"CheckAnswer: ${expectedAnswer.mkString(",")}"
+ }
+
+ case class DropBatches(num: Int) extends StreamAction
+
+ /** Stops the stream. It must currently be running. */
+ case object StopStream extends StreamAction with StreamMustBeRunning
+
+ /** Starts the stream, resuming if data has already been processed. It must not be running. */
+ case object StartStream extends StreamAction
+
+ /** Signals that a failure is expected and should not kill the test. */
+ case object ExpectFailure extends StreamAction
+
+ /** A helper for running actions on a Streaming Dataset. See `checkAnswer(DataFrame)`. */
+ def testStream(stream: Dataset[_])(actions: StreamAction*): Unit =
+ testStream(stream.toDF())(actions: _*)
+
+ /**
+ * Executes the specified actions on the the given streaming DataFrame and provides helpful
+ * error messages in the case of failures or incorrect answers.
+ *
+ * Note that if the stream is not explictly started before an action that requires it to be
+ * running then it will be automatically started before performing any other actions.
+ */
+ def testStream(stream: DataFrame)(actions: StreamAction*): Unit = {
+ var pos = 0
+ var currentPlan: LogicalPlan = stream.logicalPlan
+ var currentStream: StreamExecution = null
+ val awaiting = new mutable.HashMap[Source, Offset]()
+ val sink = new MemorySink(stream.schema)
+
+ @volatile
+ var streamDeathCause: Throwable = null
+
+ // If the test doesn't manually start the stream, we do it automatically at the beginning.
+ val startedManually =
+ actions.takeWhile(!_.isInstanceOf[StreamMustBeRunning]).contains(StartStream)
+ val startedTest = if (startedManually) actions else StartStream +: actions
+
+ def testActions = actions.zipWithIndex.map {
+ case (a, i) =>
+ if ((pos == i && startedManually) || (pos == (i + 1) && !startedManually)) {
+ "=> " + a.toString
+ } else {
+ " " + a.toString
+ }
+ }.mkString("\n")
+
+ def currentOffsets =
+ if (currentStream != null) currentStream.streamProgress.toString else "not started"
+
+ def threadState =
+ if (currentStream != null && currentStream.microBatchThread.isAlive) "alive" else "dead"
+ def testState =
+ s"""
+ |== Progress ==
+ |$testActions
+ |
+ |== Stream ==
+ |Stream state: $currentOffsets
+ |Thread state: $threadState
+ |${if (streamDeathCause != null) stackTraceToString(streamDeathCause) else ""}
+ |
+ |== Sink ==
+ |$sink
+ |
+ |== Plan ==
+ |${if (currentStream != null) currentStream.lastExecution else ""}
+ """
+
+ def checkState(check: Boolean, error: String) = if (!check) {
+ fail(
+ s"""
+ |Invalid State: $error
+ |$testState
+ """.stripMargin)
+ }
+
+ val testThread = Thread.currentThread()
+
+ try {
+ startedTest.foreach { action =>
+ action match {
+ case StartStream =>
+ checkState(currentStream == null, "stream already running")
+
+ currentStream = new StreamExecution(sqlContext, stream.logicalPlan, sink)
+ currentStream.microBatchThread.setUncaughtExceptionHandler(
+ new UncaughtExceptionHandler {
+ override def uncaughtException(t: Thread, e: Throwable): Unit = {
+ streamDeathCause = e
+ testThread.interrupt()
+ }
+ })
+
+ case StopStream =>
+ checkState(currentStream != null, "can not stop a stream that is not running")
+ currentStream.stop()
+ currentStream = null
+
+ case DropBatches(num) =>
+ checkState(currentStream == null, "dropping batches while running leads to corruption")
+ sink.dropBatches(num)
+
+ case ExpectFailure =>
+ try failAfter(streamingTimout) {
+ while (streamDeathCause == null) {
+ Thread.sleep(100)
+ }
+ } catch {
+ case _: InterruptedException =>
+ case _: org.scalatest.exceptions.TestFailedDueToTimeoutException =>
+ fail(
+ s"""
+ |Timed out while waiting for failure.
+ |$testState
+ """.stripMargin)
+ }
+
+ currentStream = null
+ streamDeathCause = null
+
+ case a: AddData =>
+ awaiting.put(a.source, a.addData())
+
+ case CheckAnswerRows(expectedAnswer) =>
+ checkState(currentStream != null, "stream not running")
+
+ // Block until all data added has been processed
+ awaiting.foreach { case (source, offset) =>
+ failAfter(streamingTimout) {
+ currentStream.awaitOffset(source, offset)
+ }
+ }
+
+ val allData = try sink.allData catch {
+ case e: Exception =>
+ fail(
+ s"""
+ |Exception while getting data from sink $e
+ |$testState
+ """.stripMargin)
+ }
+
+ QueryTest.sameRows(expectedAnswer, allData).foreach {
+ error => fail(
+ s"""
+ |$error
+ |$testState
+ """.stripMargin)
+ }
+ }
+ pos += 1
+ }
+ } catch {
+ case _: InterruptedException if streamDeathCause != null =>
+ fail(
+ s"""
+ |Stream Thread Died
+ |$testState
+ """.stripMargin)
+ case _: org.scalatest.exceptions.TestFailedDueToTimeoutException =>
+ fail(
+ s"""
+ |Timed out waiting for stream
+ |$testState
+ """.stripMargin)
+ } finally {
+ if (currentStream != null && currentStream.microBatchThread.isAlive) {
+ currentStream.stop()
+ }
+ }
+ }
+
+ /**
+ * Creates a stress test that randomly starts/stops/adds data/checks the result.
+ *
+ * @param ds a dataframe that executes + 1 on a stream of integers, returning the result.
+ * @param addData and add data action that adds the given numbers to the stream, encoding them
+ * as needed
+ */
+ def runStressTest(
+ ds: Dataset[Int],
+ addData: Seq[Int] => StreamAction,
+ iterations: Int = 100): Unit = {
+ implicit val intEncoder = ExpressionEncoder[Int]()
+ var dataPos = 0
+ var running = true
+ val actions = new ArrayBuffer[StreamAction]()
+
+ def addCheck() = { actions += CheckAnswer(1 to dataPos: _*) }
+
+ def addRandomData() = {
+ val numItems = Random.nextInt(10)
+ val data = dataPos until (dataPos + numItems)
+ dataPos += numItems
+ actions += addData(data)
+ }
+
+ (1 to iterations).foreach { i =>
+ val rand = Random.nextDouble()
+ if(!running) {
+ rand match {
+ case r if r < 0.7 => // AddData
+ addRandomData()
+
+ case _ => // StartStream
+ actions += StartStream
+ running = true
+ }
+ } else {
+ rand match {
+ case r if r < 0.1 =>
+ addCheck()
+
+ case r if r < 0.7 => // AddData
+ addRandomData()
+
+ case _ => // StartStream
+ actions += StopStream
+ running = false
+ }
+ }
+ }
+ if(!running) { actions += StartStream }
+ addCheck()
+ testStream(ds)(actions: _*)
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataStreamReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataStreamReaderSuite.scala
new file mode 100644
index 0000000000..1dab6ebf1b
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataStreamReaderSuite.scala
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.test
+
+import org.apache.spark.sql.{AnalysisException, SQLContext, StreamTest}
+import org.apache.spark.sql.execution.streaming.{Batch, Offset, Sink, Source}
+import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider}
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
+
+object LastOptions {
+ var parameters: Map[String, String] = null
+ var schema: Option[StructType] = null
+ var partitionColumns: Seq[String] = Nil
+}
+
+/** Dummy provider: returns no-op source/sink and records options in [[LastOptions]]. */
+class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
+ override def createSource(
+ sqlContext: SQLContext,
+ parameters: Map[String, String],
+ schema: Option[StructType]): Source = {
+ LastOptions.parameters = parameters
+ LastOptions.schema = schema
+ new Source {
+ override def getNextBatch(start: Option[Offset]): Option[Batch] = None
+ override def schema: StructType = StructType(StructField("a", IntegerType) :: Nil)
+ }
+ }
+
+ override def createSink(
+ sqlContext: SQLContext,
+ parameters: Map[String, String],
+ partitionColumns: Seq[String]): Sink = {
+ LastOptions.parameters = parameters
+ LastOptions.partitionColumns = partitionColumns
+ new Sink {
+ override def addBatch(batch: Batch): Unit = {}
+ override def currentOffset: Option[Offset] = None
+ }
+ }
+}
+
+class DataStreamReaderWriterSuite extends StreamTest with SharedSQLContext {
+ import testImplicits._
+
+ test("resolve default source") {
+ sqlContext.streamFrom
+ .format("org.apache.spark.sql.streaming.test")
+ .open()
+ .streamTo
+ .format("org.apache.spark.sql.streaming.test")
+ .start()
+ .stop()
+ }
+
+ test("resolve full class") {
+ sqlContext.streamFrom
+ .format("org.apache.spark.sql.streaming.test.DefaultSource")
+ .open()
+ .streamTo
+ .format("org.apache.spark.sql.streaming.test")
+ .start()
+ .stop()
+ }
+
+ test("options") {
+ val map = new java.util.HashMap[String, String]
+ map.put("opt3", "3")
+
+ val df = sqlContext.streamFrom
+ .format("org.apache.spark.sql.streaming.test")
+ .option("opt1", "1")
+ .options(Map("opt2" -> "2"))
+ .options(map)
+ .open()
+
+ assert(LastOptions.parameters("opt1") == "1")
+ assert(LastOptions.parameters("opt2") == "2")
+ assert(LastOptions.parameters("opt3") == "3")
+
+ LastOptions.parameters = null
+
+ df.streamTo
+ .format("org.apache.spark.sql.streaming.test")
+ .option("opt1", "1")
+ .options(Map("opt2" -> "2"))
+ .options(map)
+ .start()
+ .stop()
+
+ assert(LastOptions.parameters("opt1") == "1")
+ assert(LastOptions.parameters("opt2") == "2")
+ assert(LastOptions.parameters("opt3") == "3")
+ }
+
+ test("partitioning") {
+ val df = sqlContext.streamFrom
+ .format("org.apache.spark.sql.streaming.test")
+ .open()
+
+ df.streamTo
+ .format("org.apache.spark.sql.streaming.test")
+ .start()
+ .stop()
+ assert(LastOptions.partitionColumns == Nil)
+
+ df.streamTo
+ .format("org.apache.spark.sql.streaming.test")
+ .partitionBy("a")
+ .start()
+ .stop()
+ assert(LastOptions.partitionColumns == Seq("a"))
+
+
+ withSQLConf("spark.sql.caseSensitive" -> "false") {
+ df.streamTo
+ .format("org.apache.spark.sql.streaming.test")
+ .partitionBy("A")
+ .start()
+ .stop()
+ assert(LastOptions.partitionColumns == Seq("a"))
+ }
+
+ intercept[AnalysisException] {
+ df.streamTo
+ .format("org.apache.spark.sql.streaming.test")
+ .partitionBy("b")
+ .start()
+ .stop()
+ }
+ }
+
+ test("stream paths") {
+ val df = sqlContext.streamFrom
+ .format("org.apache.spark.sql.streaming.test")
+ .open("/test")
+
+ assert(LastOptions.parameters("path") == "/test")
+
+ LastOptions.parameters = null
+
+ df.streamTo
+ .format("org.apache.spark.sql.streaming.test")
+ .start("/test")
+ .stop()
+
+ assert(LastOptions.parameters("path") == "/test")
+ }
+
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySourceStressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySourceStressSuite.scala
new file mode 100644
index 0000000000..81760d2aa8
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySourceStressSuite.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.sql.StreamTest
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.test.SharedSQLContext
+
+class MemorySourceStressSuite extends StreamTest with SharedSQLContext {
+ import testImplicits._
+
+ test("memory stress test") {
+ val input = MemoryStream[Int]
+ val mapped = input.toDS().map(_ + 1)
+
+ runStressTest(mapped, AddData(input, _: _*))
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala
new file mode 100644
index 0000000000..989465826d
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.execution.streaming.{CompositeOffset, LongOffset, Offset}
+
+trait OffsetSuite extends SparkFunSuite {
+ /** Creates test to check all the comparisons of offsets given a `one` that is less than `two`. */
+ def compare(one: Offset, two: Offset): Unit = {
+ test(s"comparision $one <=> $two") {
+ assert(one < two)
+ assert(one <= two)
+ assert(one <= one)
+ assert(two > one)
+ assert(two >= one)
+ assert(one >= one)
+ assert(one == one)
+ assert(two == two)
+ assert(one != two)
+ assert(two != one)
+ }
+ }
+
+ /** Creates test to check that non-equality comparisons throw exception. */
+ def compareInvalid(one: Offset, two: Offset): Unit = {
+ test(s"invalid comparison $one <=> $two") {
+ intercept[IllegalArgumentException] {
+ assert(one < two)
+ }
+
+ intercept[IllegalArgumentException] {
+ assert(one <= two)
+ }
+
+ intercept[IllegalArgumentException] {
+ assert(one > two)
+ }
+
+ intercept[IllegalArgumentException] {
+ assert(one >= two)
+ }
+
+ assert(!(one == two))
+ assert(!(two == one))
+ assert(one != two)
+ assert(two != one)
+ }
+ }
+}
+
+class LongOffsetSuite extends OffsetSuite {
+ val one = LongOffset(1)
+ val two = LongOffset(2)
+ compare(one, two)
+}
+
+class CompositeOffsetSuite extends OffsetSuite {
+ compare(
+ one = CompositeOffset(Some(LongOffset(1)) :: Nil),
+ two = CompositeOffset(Some(LongOffset(2)) :: Nil))
+
+ compare(
+ one = CompositeOffset(None :: Nil),
+ two = CompositeOffset(Some(LongOffset(2)) :: Nil))
+
+ compareInvalid( // sizes must be same
+ one = CompositeOffset(Nil),
+ two = CompositeOffset(Some(LongOffset(2)) :: Nil))
+
+ compare(
+ one = CompositeOffset.fill(LongOffset(0), LongOffset(1)),
+ two = CompositeOffset.fill(LongOffset(1), LongOffset(2)))
+
+ compare(
+ one = CompositeOffset.fill(LongOffset(1), LongOffset(1)),
+ two = CompositeOffset.fill(LongOffset(1), LongOffset(2)))
+
+ compareInvalid(
+ one = CompositeOffset.fill(LongOffset(2), LongOffset(1)), // vector time inconsistent
+ two = CompositeOffset.fill(LongOffset(1), LongOffset(2)))
+}
+
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
new file mode 100644
index 0000000000..fbb1792596
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.sql.{Row, StreamTest}
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.test.SharedSQLContext
+
+class StreamSuite extends StreamTest with SharedSQLContext {
+
+ import testImplicits._
+
+ test("map with recovery") {
+ val inputData = MemoryStream[Int]
+ val mapped = inputData.toDS().map(_ + 1)
+
+ testStream(mapped)(
+ AddData(inputData, 1, 2, 3),
+ StartStream,
+ CheckAnswer(2, 3, 4),
+ StopStream,
+ AddData(inputData, 4, 5, 6),
+ StartStream,
+ CheckAnswer(2, 3, 4, 5, 6, 7))
+ }
+
+ test("join") {
+ // Make a table and ensure it will be broadcast.
+ val smallTable = Seq((1, "one"), (2, "two"), (4, "four")).toDF("number", "word")
+
+ // Join the input stream with a table.
+ val inputData = MemoryStream[Int]
+ val joined = inputData.toDS().toDF().join(smallTable, $"value" === $"number")
+
+ testStream(joined)(
+ AddData(inputData, 1, 2, 3),
+ CheckAnswer(Row(1, 1, "one"), Row(2, 2, "two")),
+ AddData(inputData, 4),
+ CheckAnswer(Row(1, 1, "one"), Row(2, 2, "two"), Row(4, 4, "four")))
+ }
+
+ test("union two streams") {
+ val inputData1 = MemoryStream[Int]
+ val inputData2 = MemoryStream[Int]
+
+ val unioned = inputData1.toDS().union(inputData2.toDS())
+
+ testStream(unioned)(
+ AddData(inputData1, 1, 3, 5),
+ CheckAnswer(1, 3, 5),
+ AddData(inputData2, 2, 4, 6),
+ CheckAnswer(1, 2, 3, 4, 5, 6),
+ StopStream,
+ AddData(inputData1, 7),
+ StartStream,
+ AddData(inputData2, 8),
+ CheckAnswer(1, 2, 3, 4, 5, 6, 7, 8))
+ }
+
+ test("sql queries") {
+ val inputData = MemoryStream[Int]
+ inputData.toDF().registerTempTable("stream")
+ val evens = sql("SELECT * FROM stream WHERE value % 2 = 0")
+
+ testStream(evens)(
+ AddData(inputData, 1, 2, 3, 4),
+ CheckAnswer(2, 4))
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
index e7b3765487..c341191c70 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
@@ -36,7 +36,7 @@ trait SharedSQLContext extends SQLTestUtils {
/**
* The [[TestSQLContext]] to use for all tests in this suite.
*/
- protected def sqlContext: SQLContext = _ctx
+ protected implicit def sqlContext: SQLContext = _ctx
/**
* Initialize the [[TestSQLContext]].