aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorBurak Yavuz <brkyvz@gmail.com>2016-04-04 19:04:09 -0700
committerMichael Armbrust <michael@databricks.com>2016-04-04 19:04:09 -0700
commitba24d1ee9a1d97ca82282f3b811ec011c4285b99 (patch)
tree925850ccff2d0992ebf8db1c81181c806244e312 /sql
parent7201f033ce520259b6d07ea5ead92272cac92363 (diff)
downloadspark-ba24d1ee9a1d97ca82282f3b811ec011c4285b99.tar.gz
spark-ba24d1ee9a1d97ca82282f3b811ec011c4285b99.tar.bz2
spark-ba24d1ee9a1d97ca82282f3b811ec011c4285b99.zip
[SPARK-14287] isStreaming method for Dataset
With the addition of StreamExecution (ContinuousQuery) to Datasets, data will become unbounded. With unbounded data, the execution of some methods and operations will not make sense, e.g. `Dataset.count()`. A simple API is required to check whether the data in a Dataset is bounded or unbounded. This will allow users to check whether their Dataset is in streaming mode or not. ML algorithms may check if the data is unbounded and throw an exception for example. The implementation of this method is simple, however naming it is the challenge. Some possible names for this method are: - isStreaming - isContinuous - isBounded - isUnbounded I've gone with `isStreaming` for now. We can change it before Spark 2.0 if we decide to come up with a different name. For that reason I've marked it as `Experimental` Author: Burak Yavuz <brkyvz@gmail.com> Closes #12080 from brkyvz/is-streaming.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala15
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala18
2 files changed, 33 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 8dfe8ff702..db2134b020 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -47,6 +47,7 @@ import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation}
import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
import org.apache.spark.sql.execution.python.EvaluatePython
+import org.apache.spark.sql.execution.streaming.StreamingRelation
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
@@ -450,6 +451,20 @@ class Dataset[T] private[sql](
def isLocal: Boolean = logicalPlan.isInstanceOf[LocalRelation]
/**
+ * Returns true if this [[Dataset]] contains one or more sources that continuously
+ * return data as it arrives. A [[Dataset]] that reads data from a streaming source
+ * must be executed as a [[ContinuousQuery]] using the `startStream()` method in
+ * [[DataFrameWriter]]. Methods that return a single answer, (e.g., `count()` or
+ * `collect()`) will throw an [[AnalysisException]] when there is a streaming
+ * source present.
+ *
+ * @group basic
+ * @since 2.0.0
+ */
+ @Experimental
+ def isStreaming: Boolean = logicalPlan.find(_.isInstanceOf[StreamingRelation]).isDefined
+
+ /**
* Displays the [[Dataset]] in a tabular form. Strings more than 20 characters will be truncated,
* and all cells will be aligned right. For example:
* {{{
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 2aa90568c3..e8e801084f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -23,6 +23,7 @@ import java.sql.{Date, Timestamp}
import scala.language.postfixOps
import org.apache.spark.sql.catalyst.encoders.OuterScopes
+import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
@@ -602,6 +603,23 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
TupleClass(1, "a")
)
}
+
+ test("isStreaming returns false for static Dataset") {
+ val data = Seq(("a", 1), ("b", 2), ("c", 3)).toDS()
+ assert(!data.isStreaming, "static Dataset returned true for 'isStreaming'.")
+ }
+
+ test("isStreaming returns true for streaming Dataset") {
+ val data = MemoryStream[Int].toDS()
+ assert(data.isStreaming, "streaming Dataset returned false for 'isStreaming'.")
+ }
+
+ test("isStreaming returns true after static and streaming Dataset join") {
+ val static = Seq(("a", 1), ("b", 2), ("c", 3)).toDF("a", "b")
+ val streaming = MemoryStream[Int].toDS().toDF("b")
+ val df = streaming.join(static, Seq("b"))
+ assert(df.isStreaming, "streaming Dataset returned false for 'isStreaming'.")
+ }
}
case class OtherTuple(_1: String, _2: Int)