From a6a18a4573515e76d78534f1a19fcc2c3819f6c5 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 13 Jun 2016 12:47:47 -0700 Subject: [HOTFIX][MINOR][SQL] Revert " Standardize 'continuous queries' to 'streaming D… MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit d32e227787338a08741d8064f5dd2db1d60ddc63. Broke build - https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Compile/job/spark-branch-2.0-compile-maven-hadoop-2.3/326/console Author: Tathagata Das Closes #13645 from tdas/build-break. --- .../org/apache/spark/sql/DataFrameWriter.scala | 47 +++++------ .../apache/spark/sql/streaming/StreamTest.scala | 4 +- .../test/DataFrameReaderWriterSuite.scala | 90 +++++++++------------- 3 files changed, 64 insertions(+), 77 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 392e3c1e4e..afae0786b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -56,7 +56,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { def mode(saveMode: SaveMode): DataFrameWriter[T] = { // mode() is used for non-continuous queries // outputMode() is used for continuous queries - assertNotStreaming("mode() can only be called on non-streaming Datasets/DataFrames") + assertNotStreaming("mode() can only be called on non-continuous queries") this.mode = saveMode this } @@ -73,7 +73,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { def mode(saveMode: String): DataFrameWriter[T] = { // mode() is used for non-continuous queries // outputMode() is used for continuous queries - assertNotStreaming("mode() can only be called on non-streaming Datasets/DataFrames") + assertNotStreaming("mode() can only be called on non-continuous queries") this.mode = saveMode.toLowerCase match { case "overwrite" => SaveMode.Overwrite case "append" => SaveMode.Append @@ -86,33 +86,33 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { } /** - * Specifies how data of a streaming Dataset/DataFrame is written to a streaming sink. - * - `OutputMode.Append()`: only the new rows in the streaming Dataset/DataFrame will be + * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. + * - `OutputMode.Append()`: only the new rows in the streaming DataFrame/Dataset will be * written to the sink - * - `OutputMode.Complete()`: all the rows in the streaming Dataset/DataFrame will be written + * - `OutputMode.Complete()`: all the rows in the streaming DataFrame/Dataset will be written * to the sink every time these is some updates * * @since 2.0.0 */ @Experimental def outputMode(outputMode: OutputMode): DataFrameWriter[T] = { - assertStreaming("outputMode() can only be called on streaming Datasets/DataFrames") + assertStreaming("outputMode() can only be called on continuous queries") this.outputMode = outputMode this } /** - * Specifies how data of a streaming Dataset/DataFrame is written to a streaming sink. - * - `append`: only the new rows in the streaming Dataset/DataFrame will be written to + * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. + * - `append`: only the new rows in the streaming DataFrame/Dataset will be written to * the sink - * - `complete`: all the rows in the streaming Dataset/DataFrame will be written to the sink + * - `complete`: all the rows in the streaming DataFrame/Dataset will be written to the sink * every time these is some updates * * @since 2.0.0 */ @Experimental def outputMode(outputMode: String): DataFrameWriter[T] = { - assertStreaming("outputMode() can only be called on streaming Datasets/DataFrames") + assertStreaming("outputMode() can only be called on continuous queries") this.outputMode = outputMode.toLowerCase match { case "append" => OutputMode.Append @@ -150,7 +150,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { */ @Experimental def trigger(trigger: Trigger): DataFrameWriter[T] = { - assertStreaming("trigger() can only be called on streaming Datasets/DataFrames") + assertStreaming("trigger() can only be called on continuous queries") this.trigger = trigger this } @@ -284,7 +284,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { */ def save(): Unit = { assertNotBucketed("save") - assertNotStreaming("save() can only be called on non-streaming Datasets/DataFrames") + assertNotStreaming("save() can only be called on non-continuous queries") val dataSource = DataSource( df.sparkSession, className = source, @@ -304,7 +304,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { */ @Experimental def queryName(queryName: String): DataFrameWriter[T] = { - assertStreaming("queryName() can only be called on streaming Datasets/DataFrames") + assertStreaming("queryName() can only be called on continuous queries") this.extraOptions += ("queryName" -> queryName) this } @@ -333,7 +333,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { @Experimental def startStream(): ContinuousQuery = { assertNotBucketed("startStream") - assertStreaming("startStream() can only be called on streaming Datasets/DataFrames") + assertStreaming("startStream() can only be called on continuous queries") if (source == "memory") { val queryName = @@ -434,7 +434,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { def foreach(writer: ForeachWriter[T]): ContinuousQuery = { assertNotPartitioned("foreach") assertNotBucketed("foreach") - assertStreaming("foreach() can only be called on streaming Datasets/DataFrames.") + assertStreaming( + "foreach() can only be called on streaming Datasets/DataFrames.") val queryName = extraOptions.getOrElse("queryName", StreamExecution.nextName) val sink = new ForeachSink[T](ds.sparkSession.sparkContext.clean(writer))(ds.exprEnc) @@ -501,7 +502,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { private def insertInto(tableIdent: TableIdentifier): Unit = { assertNotBucketed("insertInto") - assertNotStreaming("insertInto() can only be called on non-streaming Datasets/DataFrames") + assertNotStreaming("insertInto() can only be called on non-continuous queries") val partitions = normalizedParCols.map(_.map(col => col -> (Option.empty[String])).toMap) val overwrite = mode == SaveMode.Overwrite @@ -620,7 +621,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { } private def saveAsTable(tableIdent: TableIdentifier): Unit = { - assertNotStreaming("saveAsTable() can only be called on non-streaming Datasets/DataFrames") + assertNotStreaming("saveAsTable() can only be called on non-continuous queries") val tableExists = df.sparkSession.sessionState.catalog.tableExists(tableIdent) @@ -663,7 +664,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { def jdbc(url: String, table: String, connectionProperties: Properties): Unit = { assertNotPartitioned("jdbc") assertNotBucketed("jdbc") - assertNotStreaming("jdbc() can only be called on non-streaming Datasets/DataFrames") + assertNotStreaming("jdbc() can only be called on non-continuous queries") val props = new Properties() extraOptions.foreach { case (key, value) => @@ -722,7 +723,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * @since 1.4.0 */ def json(path: String): Unit = { - assertNotStreaming("json() can only be called on non-streaming Datasets/DataFrames") + assertNotStreaming("json() can only be called on non-continuous queries") format("json").save(path) } @@ -742,7 +743,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * @since 1.4.0 */ def parquet(path: String): Unit = { - assertNotStreaming("parquet() can only be called on non-streaming Datasets/DataFrames") + assertNotStreaming("parquet() can only be called on non-continuous queries") format("parquet").save(path) } @@ -762,7 +763,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * @note Currently, this method can only be used after enabling Hive support */ def orc(path: String): Unit = { - assertNotStreaming("orc() can only be called on non-streaming Datasets/DataFrames") + assertNotStreaming("orc() can only be called on non-continuous queries") format("orc").save(path) } @@ -786,7 +787,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * @since 1.6.0 */ def text(path: String): Unit = { - assertNotStreaming("text() can only be called on non-streaming Datasets/DataFrames") + assertNotStreaming("text() can only be called on non-continuous queries") format("text").save(path) } @@ -816,7 +817,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * @since 2.0.0 */ def csv(path: String): Unit = { - assertNotStreaming("csv() can only be called on non-streaming Datasets/DataFrames") + assertNotStreaming("csv() can only be called on non-continuous queries") format("csv").save(path) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index fabb8ba6c6..7f1e5fe613 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -70,7 +70,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { /** How long to wait for an active stream to catch up when checking a result. */ val streamingTimeout = 10.seconds - /** A trait for actions that can be performed while testing a streaming DataSet/DataFrame. */ + /** A trait for actions that can be performed while testing a streaming DataFrame. */ trait StreamAction /** A trait to mark actions that require the stream to be actively running. */ @@ -194,7 +194,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { } /** - * Executes the specified actions on the given streaming DataSet/DataFrame and provides helpful + * Executes the specified actions on the given streaming DataFrame and provides helpful * error messages in the case of failures or incorrect answers. * * Note that if the stream is not explicitly started before an action that requires it to be diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala index 51aa53287c..6e0d66ae7f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataFrameReaderWriterSuite.scala @@ -371,80 +371,66 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter { private def newTextInput = Utils.createTempDir(namePrefix = "text").getCanonicalPath - test("check trigger() can only be called on streaming Datasets/DataFrames") { + test("check trigger() can only be called on continuous queries") { val df = spark.read.text(newTextInput) val w = df.write.option("checkpointLocation", newMetadataDir) val e = intercept[AnalysisException](w.trigger(ProcessingTime("10 seconds"))) - assert(e.getMessage == "trigger() can only be called on streaming Datasets/DataFrames;") + assert(e.getMessage == "trigger() can only be called on continuous queries;") } - test("check queryName() can only be called on streaming Datasets/DataFrames") { + test("check queryName() can only be called on continuous queries") { val df = spark.read.text(newTextInput) val w = df.write.option("checkpointLocation", newMetadataDir) val e = intercept[AnalysisException](w.queryName("queryName")) - assert(e.getMessage == "queryName() can only be called on streaming Datasets/DataFrames;") + assert(e.getMessage == "queryName() can only be called on continuous queries;") } - test("check startStream() can only be called on streaming Datasets/DataFrames") { + test("check startStream() can only be called on continuous queries") { val df = spark.read.text(newTextInput) val w = df.write.option("checkpointLocation", newMetadataDir) val e = intercept[AnalysisException](w.startStream()) - assert(e.getMessage == "startStream() can only be called on streaming Datasets/DataFrames;") + assert(e.getMessage == "startStream() can only be called on continuous queries;") } - test("check startStream(path) can only be called on streaming Datasets/DataFrames") { + test("check startStream(path) can only be called on continuous queries") { val df = spark.read.text(newTextInput) val w = df.write.option("checkpointLocation", newMetadataDir) val e = intercept[AnalysisException](w.startStream("non_exist_path")) - assert(e.getMessage == "startStream() can only be called on streaming Datasets/DataFrames;") + assert(e.getMessage == "startStream() can only be called on continuous queries;") } - test("check foreach() can only be called on streaming Datasets/DataFrames") { - val df = spark.read.text(newTextInput) - val w = df.write.option("checkpointLocation", newMetadataDir) - val foreachWriter = new ForeachWriter[String] { - override def open(partitionId: Long, version: Long): Boolean = false - override def process(value: String): Unit = {} - override def close(errorOrNull: Throwable): Unit = {} - } - val e = intercept[AnalysisException](w.foreach(foreachWriter)) - Seq("foreach()", "streaming Datasets/DataFrames").foreach { s => - assert(e.getMessage.toLowerCase.contains(s.toLowerCase)) - } - } - - test("check mode(SaveMode) can only be called on non-streaming Datasets/DataFrames") { + test("check mode(SaveMode) can only be called on non-continuous queries") { val df = spark.read .format("org.apache.spark.sql.streaming.test") .stream() val w = df.write val e = intercept[AnalysisException](w.mode(SaveMode.Append)) - assert(e.getMessage == "mode() can only be called on non-streaming Datasets/DataFrames;") + assert(e.getMessage == "mode() can only be called on non-continuous queries;") } - test("check mode(string) can only be called on non-streaming Datasets/DataFrames") { + test("check mode(string) can only be called on non-continuous queries") { val df = spark.read .format("org.apache.spark.sql.streaming.test") .stream() val w = df.write val e = intercept[AnalysisException](w.mode("append")) - assert(e.getMessage == "mode() can only be called on non-streaming Datasets/DataFrames;") + assert(e.getMessage == "mode() can only be called on non-continuous queries;") } - test("check outputMode(OutputMode) can only be called on streaming Datasets/DataFrames") { + test("check outputMode(OutputMode) can only be called on continuous queries") { val df = spark.read.text(newTextInput) val w = df.write.option("checkpointLocation", newMetadataDir) val e = intercept[AnalysisException](w.outputMode(OutputMode.Append)) - Seq("outputmode", "streaming Datasets/DataFrames").foreach { s => + Seq("outputmode", "continuous queries").foreach { s => assert(e.getMessage.toLowerCase.contains(s.toLowerCase)) } } - test("check outputMode(string) can only be called on streaming Datasets/DataFrames") { + test("check outputMode(string) can only be called on continuous queries") { val df = spark.read.text(newTextInput) val w = df.write.option("checkpointLocation", newMetadataDir) val e = intercept[AnalysisException](w.outputMode("append")) - Seq("outputmode", "streaming Datasets/DataFrames").foreach { s => + Seq("outputmode", "continuous queries").foreach { s => assert(e.getMessage.toLowerCase.contains(s.toLowerCase)) } } @@ -464,7 +450,7 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter { testError("Xyz") } - test("check bucketBy() can only be called on non-streaming Datasets/DataFrames") { + test("check bucketBy() can only be called on non-continuous queries") { val df = spark.read .format("org.apache.spark.sql.streaming.test") .stream() @@ -473,7 +459,7 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter { assert(e.getMessage == "'startStream' does not support bucketing right now;") } - test("check sortBy() can only be called on non-streaming Datasets/DataFrames;") { + test("check sortBy() can only be called on non-continuous queries;") { val df = spark.read .format("org.apache.spark.sql.streaming.test") .stream() @@ -482,94 +468,94 @@ class DataFrameReaderWriterSuite extends StreamTest with BeforeAndAfter { assert(e.getMessage == "'startStream' does not support bucketing right now;") } - test("check save(path) can only be called on non-streaming Datasets/DataFrames") { + test("check save(path) can only be called on non-continuous queries") { val df = spark.read .format("org.apache.spark.sql.streaming.test") .stream() val w = df.write val e = intercept[AnalysisException](w.save("non_exist_path")) - assert(e.getMessage == "save() can only be called on non-streaming Datasets/DataFrames;") + assert(e.getMessage == "save() can only be called on non-continuous queries;") } - test("check save() can only be called on non-streaming Datasets/DataFrames") { + test("check save() can only be called on non-continuous queries") { val df = spark.read .format("org.apache.spark.sql.streaming.test") .stream() val w = df.write val e = intercept[AnalysisException](w.save()) - assert(e.getMessage == "save() can only be called on non-streaming Datasets/DataFrames;") + assert(e.getMessage == "save() can only be called on non-continuous queries;") } - test("check insertInto() can only be called on non-streaming Datasets/DataFrames") { + test("check insertInto() can only be called on non-continuous queries") { val df = spark.read .format("org.apache.spark.sql.streaming.test") .stream() val w = df.write val e = intercept[AnalysisException](w.insertInto("non_exsit_table")) - assert(e.getMessage == "insertInto() can only be called on non-streaming Datasets/DataFrames;") + assert(e.getMessage == "insertInto() can only be called on non-continuous queries;") } - test("check saveAsTable() can only be called on non-streaming Datasets/DataFrames") { + test("check saveAsTable() can only be called on non-continuous queries") { val df = spark.read .format("org.apache.spark.sql.streaming.test") .stream() val w = df.write val e = intercept[AnalysisException](w.saveAsTable("non_exsit_table")) - assert(e.getMessage == "saveAsTable() can only be called on non-streaming Datasets/DataFrames;") + assert(e.getMessage == "saveAsTable() can only be called on non-continuous queries;") } - test("check jdbc() can only be called on non-streaming Datasets/DataFrames") { + test("check jdbc() can only be called on non-continuous queries") { val df = spark.read .format("org.apache.spark.sql.streaming.test") .stream() val w = df.write val e = intercept[AnalysisException](w.jdbc(null, null, null)) - assert(e.getMessage == "jdbc() can only be called on non-streaming Datasets/DataFrames;") + assert(e.getMessage == "jdbc() can only be called on non-continuous queries;") } - test("check json() can only be called on non-streaming Datasets/DataFrames") { + test("check json() can only be called on non-continuous queries") { val df = spark.read .format("org.apache.spark.sql.streaming.test") .stream() val w = df.write val e = intercept[AnalysisException](w.json("non_exist_path")) - assert(e.getMessage == "json() can only be called on non-streaming Datasets/DataFrames;") + assert(e.getMessage == "json() can only be called on non-continuous queries;") } - test("check parquet() can only be called on non-streaming Datasets/DataFrames") { + test("check parquet() can only be called on non-continuous queries") { val df = spark.read .format("org.apache.spark.sql.streaming.test") .stream() val w = df.write val e = intercept[AnalysisException](w.parquet("non_exist_path")) - assert(e.getMessage == "parquet() can only be called on non-streaming Datasets/DataFrames;") + assert(e.getMessage == "parquet() can only be called on non-continuous queries;") } - test("check orc() can only be called on non-streaming Datasets/DataFrames") { + test("check orc() can only be called on non-continuous queries") { val df = spark.read .format("org.apache.spark.sql.streaming.test") .stream() val w = df.write val e = intercept[AnalysisException](w.orc("non_exist_path")) - assert(e.getMessage == "orc() can only be called on non-streaming Datasets/DataFrames;") + assert(e.getMessage == "orc() can only be called on non-continuous queries;") } - test("check text() can only be called on non-streaming Datasets/DataFrames") { + test("check text() can only be called on non-continuous queries") { val df = spark.read .format("org.apache.spark.sql.streaming.test") .stream() val w = df.write val e = intercept[AnalysisException](w.text("non_exist_path")) - assert(e.getMessage == "text() can only be called on non-streaming Datasets/DataFrames;") + assert(e.getMessage == "text() can only be called on non-continuous queries;") } - test("check csv() can only be called on non-streaming Datasets/DataFrames") { + test("check csv() can only be called on non-continuous queries") { val df = spark.read .format("org.apache.spark.sql.streaming.test") .stream() val w = df.write val e = intercept[AnalysisException](w.csv("non_exist_path")) - assert(e.getMessage == "csv() can only be called on non-streaming Datasets/DataFrames;") + assert(e.getMessage == "csv() can only be called on non-continuous queries;") } test("check foreach() does not support partitioning or bucketing") { -- cgit v1.2.3