aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-06-13 12:47:47 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2016-06-13 12:47:47 -0700
commita6a18a4573515e76d78534f1a19fcc2c3819f6c5 (patch)
tree995c57ad0cc343fdcd759000cab4a6a51fb6834c
parentd32e227787338a08741d8064f5dd2db1d60ddc63 (diff)
downloadspark-a6a18a4573515e76d78534f1a19fcc2c3819f6c5.tar.gz
spark-a6a18a4573515e76d78534f1a19fcc2c3819f6c5.tar.bz2
spark-a6a18a4573515e76d78534f1a19fcc2c3819f6c5.zip
[HOTFIX][MINOR][SQL] Revert " Standardize 'continuous queries' to 'streaming D…
This reverts commit d32e227787338a08741d8064f5dd2db1d60ddc63. Broke build - https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Compile/job/spark-branch-2.0-compile-maven-hadoop-2.3/326/console Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #13645 from tdas/build-break.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala47
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala90
3 files changed, 64 insertions, 77 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 392e3c1e4e..afae0786b7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -56,7 +56,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
def mode(saveMode: SaveMode): DataFrameWriter[T] = {
// mode() is used for non-continuous queries
// outputMode() is used for continuous queries
- assertNotStreaming("mode() can only be called on non-streaming Datasets/DataFrames")
+ assertNotStreaming("mode() can only be called on non-continuous queries")
this.mode = saveMode
this
}
@@ -73,7 +73,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
def mode(saveMode: String): DataFrameWriter[T] = {
// mode() is used for non-continuous queries
// outputMode() is used for continuous queries
- assertNotStreaming("mode() can only be called on non-streaming Datasets/DataFrames")
+ assertNotStreaming("mode() can only be called on non-continuous queries")
this.mode = saveMode.toLowerCase match {
case "overwrite" => SaveMode.Overwrite
case "append" => SaveMode.Append
@@ -86,33 +86,33 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
}
/**
- * Specifies how data of a streaming Dataset/DataFrame is written to a streaming sink.
- * - `OutputMode.Append()`: only the new rows in the streaming Dataset/DataFrame will be
+ * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.
+ * - `OutputMode.Append()`: only the new rows in the streaming DataFrame/Dataset will be
* written to the sink
- * - `OutputMode.Complete()`: all the rows in the streaming Dataset/DataFrame will be written
+ * - `OutputMode.Complete()`: all the rows in the streaming DataFrame/Dataset will be written
* to the sink every time these is some updates
*
* @since 2.0.0
*/
@Experimental
def outputMode(outputMode: OutputMode): DataFrameWriter[T] = {
- assertStreaming("outputMode() can only be called on streaming Datasets/DataFrames")
+ assertStreaming("outputMode() can only be called on continuous queries")
this.outputMode = outputMode
this
}
/**
- * Specifies how data of a streaming Dataset/DataFrame is written to a streaming sink.
- * - `append`: only the new rows in the streaming Dataset/DataFrame will be written to
+ * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.
+ * - `append`: only the new rows in the streaming DataFrame/Dataset will be written to
* the sink
- * - `complete`: all the rows in the streaming Dataset/DataFrame will be written to the sink
+ * - `complete`: all the rows in the streaming DataFrame/Dataset will be written to the sink
* every time these is some updates
*
* @since 2.0.0
*/
@Experimental
def outputMode(outputMode: String): DataFrameWriter[T] = {
- assertStreaming("outputMode() can only be called on streaming Datasets/DataFrames")
+ assertStreaming("outputMode() can only be called on continuous queries")
this.outputMode = outputMode.toLowerCase match {
case "append" =>
OutputMode.Append
@@ -150,7 +150,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
*/
@Experimental
def trigger(trigger: Trigger): DataFrameWriter[T] = {
- assertStreaming("trigger() can only be called on streaming Datasets/DataFrames")
+ assertStreaming("trigger() can only be called on continuous queries")
this.trigger = trigger
this
}
@@ -284,7 +284,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
*/
def save(): Unit = {
assertNotBucketed("save")
- assertNotStreaming("save() can only be called on non-streaming Datasets/DataFrames")
+ assertNotStreaming("save() can only be called on non-continuous queries")
val dataSource = DataSource(
df.sparkSession,
className = source,
@@ -304,7 +304,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
*/
@Experimental
def queryName(queryName: String): DataFrameWriter[T] = {
- assertStreaming("queryName() can only be called on streaming Datasets/DataFrames")
+ assertStreaming("queryName() can only be called on continuous queries")
this.extraOptions += ("queryName" -> queryName)
this
}
@@ -333,7 +333,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
@Experimental
def startStream(): ContinuousQuery = {
assertNotBucketed("startStream")
- assertStreaming("startStream() can only be called on streaming Datasets/DataFrames")
+ assertStreaming("startStream() can only be called on continuous queries")
if (source == "memory") {
val queryName =
@@ -434,7 +434,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
def foreach(writer: ForeachWriter[T]): ContinuousQuery = {
assertNotPartitioned("foreach")
assertNotBucketed("foreach")
- assertStreaming("foreach() can only be called on streaming Datasets/DataFrames.")
+ assertStreaming(
+ "foreach() can only be called on streaming Datasets/DataFrames.")
val queryName = extraOptions.getOrElse("queryName", StreamExecution.nextName)
val sink = new ForeachSink[T](ds.sparkSession.sparkContext.clean(writer))(ds.exprEnc)
@@ -501,7 +502,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
private def insertInto(tableIdent: TableIdentifier): Unit = {
assertNotBucketed("insertInto")
- assertNotStreaming("insertInto() can only be called on non-streaming Datasets/DataFrames")
+ assertNotStreaming("insertInto() can only be called on non-continuous queries")
val partitions = normalizedParCols.map(_.map(col => col -> (Option.empty[String])).toMap)
val overwrite = mode == SaveMode.Overwrite
@@ -620,7 +621,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
}
private def saveAsTable(tableIdent: TableIdentifier): Unit = {
- assertNotStreaming("saveAsTable() can only be called on non-streaming Datasets/DataFrames")
+ assertNotStreaming("saveAsTable() can only be called on non-continuous queries")
val tableExists = df.sparkSession.sessionState.catalog.tableExists(tableIdent)
@@ -663,7 +664,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
def jdbc(url: String, table: String, connectionProperties: Properties): Unit = {
assertNotPartitioned("jdbc")
assertNotBucketed("jdbc")
- assertNotStreaming("jdbc() can only be called on non-streaming Datasets/DataFrames")
+ assertNotStreaming("jdbc() can only be called on non-continuous queries")
val props = new Properties()
extraOptions.foreach { case (key, value) =>
@@ -722,7 +723,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* @since 1.4.0
*/
def json(path: String): Unit = {
- assertNotStreaming("json() can only be called on non-streaming Datasets/DataFrames")
+ assertNotStreaming("json() can only be called on non-continuous queries")
format("json").save(path)
}
@@ -742,7 +743,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* @since 1.4.0
*/
def parquet(path: String): Unit = {
- assertNotStreaming("parquet() can only be called on non-streaming Datasets/DataFrames")
+ assertNotStreaming("parquet() can only be called on non-continuous queries")
format("parquet").save(path)
}
@@ -762,7 +763,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* @note Currently, this method can only be used after enabling Hive support
*/
def orc(path: String): Unit = {
- assertNotStreaming("orc() can only be called on non-streaming Datasets/DataFrames")
+ assertNotStreaming("orc() can only be called on non-continuous queries")
format("orc").save(path)
}
@@ -786,7 +787,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* @since 1.6.0
*/
def text(path: String): Unit = {
- assertNotStreaming("text() can only be called on non-streaming Datasets/DataFrames")
+ assertNotStreaming("text() can only be called on non-continuous queries")
format("text").save(path)
}
@@ -816,7 +817,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* @since 2.0.0
*/
def csv(path: String): Unit = {
- assertNotStreaming("csv() can only be called on non-streaming Datasets/DataFrames")
+ assertNotStreaming("csv() can only be called on non-continuous queries")
format("csv").save(path)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index fabb8ba6c6..7f1e5fe613 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -70,7 +70,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
/** How long to wait for an active stream to catch up when checking a result. */
val streamingTimeout = 10.seconds
- /** A trait for actions that can be performed while testing a streaming DataSet/DataFrame. */
+ /** A trait for actions that can be performed while testing a streaming DataFrame. */
trait StreamAction
/** A trait to mark actions that require the stream to be actively running. */
@@ -194,7 +194,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
}
/**
- * Executes the specified actions on the given streaming DataSet/DataFrame and provides helpful
+ * Executes the specified actions on the given streaming DataFrame and provides helpful
* error messages in the case of failures or incorrect answers.
*
* Note that if the stream is not explicitly started before an action that requires it to be
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
index 51aa53287c..6e0d66ae7f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala
@@ -371,80 +371,66 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter {
private def newTextInput = Utils.createTempDir(namePrefix = "text").getCanonicalPath
- test("check trigger() can only be called on streaming Datasets/DataFrames") {
+ test("check trigger() can only be called on continuous queries") {
val df = spark.read.text(newTextInput)
val w = df.write.option("checkpointLocation", newMetadataDir)
val e = intercept[AnalysisException](w.trigger(ProcessingTime("10 seconds")))
- assert(e.getMessage == "trigger() can only be called on streaming Datasets/DataFrames;")
+ assert(e.getMessage == "trigger() can only be called on continuous queries;")
}
- test("check queryName() can only be called on streaming Datasets/DataFrames") {
+ test("check queryName() can only be called on continuous queries") {
val df = spark.read.text(newTextInput)
val w = df.write.option("checkpointLocation", newMetadataDir)
val e = intercept[AnalysisException](w.queryName("queryName"))
- assert(e.getMessage == "queryName() can only be called on streaming Datasets/DataFrames;")
+ assert(e.getMessage == "queryName() can only be called on continuous queries;")
}
- test("check startStream() can only be called on streaming Datasets/DataFrames") {
+ test("check startStream() can only be called on continuous queries") {
val df = spark.read.text(newTextInput)
val w = df.write.option("checkpointLocation", newMetadataDir)
val e = intercept[AnalysisException](w.startStream())
- assert(e.getMessage == "startStream() can only be called on streaming Datasets/DataFrames;")
+ assert(e.getMessage == "startStream() can only be called on continuous queries;")
}
- test("check startStream(path) can only be called on streaming Datasets/DataFrames") {
+ test("check startStream(path) can only be called on continuous queries") {
val df = spark.read.text(newTextInput)
val w = df.write.option("checkpointLocation", newMetadataDir)
val e = intercept[AnalysisException](w.startStream("non_exist_path"))
- assert(e.getMessage == "startStream() can only be called on streaming Datasets/DataFrames;")
+ assert(e.getMessage == "startStream() can only be called on continuous queries;")
}
- test("check foreach() can only be called on streaming Datasets/DataFrames") {
- val df = spark.read.text(newTextInput)
- val w = df.write.option("checkpointLocation", newMetadataDir)
- val foreachWriter = new ForeachWriter[String] {
- override def open(partitionId: Long, version: Long): Boolean = false
- override def process(value: String): Unit = {}
- override def close(errorOrNull: Throwable): Unit = {}
- }
- val e = intercept[AnalysisException](w.foreach(foreachWriter))
- Seq("foreach()", "streaming Datasets/DataFrames").foreach { s =>
- assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
- }
- }
-
- test("check mode(SaveMode) can only be called on non-streaming Datasets/DataFrames") {
+ test("check mode(SaveMode) can only be called on non-continuous queries") {
val df = spark.read
.format("org.apache.spark.sql.streaming.test")
.stream()
val w = df.write
val e = intercept[AnalysisException](w.mode(SaveMode.Append))
- assert(e.getMessage == "mode() can only be called on non-streaming Datasets/DataFrames;")
+ assert(e.getMessage == "mode() can only be called on non-continuous queries;")
}
- test("check mode(string) can only be called on non-streaming Datasets/DataFrames") {
+ test("check mode(string) can only be called on non-continuous queries") {
val df = spark.read
.format("org.apache.spark.sql.streaming.test")
.stream()
val w = df.write
val e = intercept[AnalysisException](w.mode("append"))
- assert(e.getMessage == "mode() can only be called on non-streaming Datasets/DataFrames;")
+ assert(e.getMessage == "mode() can only be called on non-continuous queries;")
}
- test("check outputMode(OutputMode) can only be called on streaming Datasets/DataFrames") {
+ test("check outputMode(OutputMode) can only be called on continuous queries") {
val df = spark.read.text(newTextInput)
val w = df.write.option("checkpointLocation", newMetadataDir)
val e = intercept[AnalysisException](w.outputMode(OutputMode.Append))
- Seq("outputmode", "streaming Datasets/DataFrames").foreach { s =>
+ Seq("outputmode", "continuous queries").foreach { s =>
assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
}
}
- test("check outputMode(string) can only be called on streaming Datasets/DataFrames") {
+ test("check outputMode(string) can only be called on continuous queries") {
val df = spark.read.text(newTextInput)
val w = df.write.option("checkpointLocation", newMetadataDir)
val e = intercept[AnalysisException](w.outputMode("append"))
- Seq("outputmode", "streaming Datasets/DataFrames").foreach { s =>
+ Seq("outputmode", "continuous queries").foreach { s =>
assert(e.getMessage.toLowerCase.contains(s.toLowerCase))
}
}
@@ -464,7 +450,7 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter {
testError("Xyz")
}
- test("check bucketBy() can only be called on non-streaming Datasets/DataFrames") {
+ test("check bucketBy() can only be called on non-continuous queries") {
val df = spark.read
.format("org.apache.spark.sql.streaming.test")
.stream()
@@ -473,7 +459,7 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter {
assert(e.getMessage == "'startStream' does not support bucketing right now;")
}
- test("check sortBy() can only be called on non-streaming Datasets/DataFrames;") {
+ test("check sortBy() can only be called on non-continuous queries;") {
val df = spark.read
.format("org.apache.spark.sql.streaming.test")
.stream()
@@ -482,94 +468,94 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter {
assert(e.getMessage == "'startStream' does not support bucketing right now;")
}
- test("check save(path) can only be called on non-streaming Datasets/DataFrames") {
+ test("check save(path) can only be called on non-continuous queries") {
val df = spark.read
.format("org.apache.spark.sql.streaming.test")
.stream()
val w = df.write
val e = intercept[AnalysisException](w.save("non_exist_path"))
- assert(e.getMessage == "save() can only be called on non-streaming Datasets/DataFrames;")
+ assert(e.getMessage == "save() can only be called on non-continuous queries;")
}
- test("check save() can only be called on non-streaming Datasets/DataFrames") {
+ test("check save() can only be called on non-continuous queries") {
val df = spark.read
.format("org.apache.spark.sql.streaming.test")
.stream()
val w = df.write
val e = intercept[AnalysisException](w.save())
- assert(e.getMessage == "save() can only be called on non-streaming Datasets/DataFrames;")
+ assert(e.getMessage == "save() can only be called on non-continuous queries;")
}
- test("check insertInto() can only be called on non-streaming Datasets/DataFrames") {
+ test("check insertInto() can only be called on non-continuous queries") {
val df = spark.read
.format("org.apache.spark.sql.streaming.test")
.stream()
val w = df.write
val e = intercept[AnalysisException](w.insertInto("non_exsit_table"))
- assert(e.getMessage == "insertInto() can only be called on non-streaming Datasets/DataFrames;")
+ assert(e.getMessage == "insertInto() can only be called on non-continuous queries;")
}
- test("check saveAsTable() can only be called on non-streaming Datasets/DataFrames") {
+ test("check saveAsTable() can only be called on non-continuous queries") {
val df = spark.read
.format("org.apache.spark.sql.streaming.test")
.stream()
val w = df.write
val e = intercept[AnalysisException](w.saveAsTable("non_exsit_table"))
- assert(e.getMessage == "saveAsTable() can only be called on non-streaming Datasets/DataFrames;")
+ assert(e.getMessage == "saveAsTable() can only be called on non-continuous queries;")
}
- test("check jdbc() can only be called on non-streaming Datasets/DataFrames") {
+ test("check jdbc() can only be called on non-continuous queries") {
val df = spark.read
.format("org.apache.spark.sql.streaming.test")
.stream()
val w = df.write
val e = intercept[AnalysisException](w.jdbc(null, null, null))
- assert(e.getMessage == "jdbc() can only be called on non-streaming Datasets/DataFrames;")
+ assert(e.getMessage == "jdbc() can only be called on non-continuous queries;")
}
- test("check json() can only be called on non-streaming Datasets/DataFrames") {
+ test("check json() can only be called on non-continuous queries") {
val df = spark.read
.format("org.apache.spark.sql.streaming.test")
.stream()
val w = df.write
val e = intercept[AnalysisException](w.json("non_exist_path"))
- assert(e.getMessage == "json() can only be called on non-streaming Datasets/DataFrames;")
+ assert(e.getMessage == "json() can only be called on non-continuous queries;")
}
- test("check parquet() can only be called on non-streaming Datasets/DataFrames") {
+ test("check parquet() can only be called on non-continuous queries") {
val df = spark.read
.format("org.apache.spark.sql.streaming.test")
.stream()
val w = df.write
val e = intercept[AnalysisException](w.parquet("non_exist_path"))
- assert(e.getMessage == "parquet() can only be called on non-streaming Datasets/DataFrames;")
+ assert(e.getMessage == "parquet() can only be called on non-continuous queries;")
}
- test("check orc() can only be called on non-streaming Datasets/DataFrames") {
+ test("check orc() can only be called on non-continuous queries") {
val df = spark.read
.format("org.apache.spark.sql.streaming.test")
.stream()
val w = df.write
val e = intercept[AnalysisException](w.orc("non_exist_path"))
- assert(e.getMessage == "orc() can only be called on non-streaming Datasets/DataFrames;")
+ assert(e.getMessage == "orc() can only be called on non-continuous queries;")
}
- test("check text() can only be called on non-streaming Datasets/DataFrames") {
+ test("check text() can only be called on non-continuous queries") {
val df = spark.read
.format("org.apache.spark.sql.streaming.test")
.stream()
val w = df.write
val e = intercept[AnalysisException](w.text("non_exist_path"))
- assert(e.getMessage == "text() can only be called on non-streaming Datasets/DataFrames;")
+ assert(e.getMessage == "text() can only be called on non-continuous queries;")
}
- test("check csv() can only be called on non-streaming Datasets/DataFrames") {
+ test("check csv() can only be called on non-continuous queries") {
val df = spark.read
.format("org.apache.spark.sql.streaming.test")
.stream()
val w = df.write
val e = intercept[AnalysisException](w.csv("non_exist_path"))
- assert(e.getMessage == "csv() can only be called on non-streaming Datasets/DataFrames;")
+ assert(e.getMessage == "csv() can only be called on non-continuous queries;")
}
test("check foreach() does not support partitioning or bucketing") {