aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-06-17 21:58:10 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-06-17 21:58:10 -0700
commitd0ac0e6f433bfccf4ced3743a2526f67fdb5c38e (patch)
treeebbde41dbc5f86479f4eb54e4578ad26ca01483a
parent8c198e246d64b5779dc3a2625d06ec958553a20b (diff)
downloadspark-d0ac0e6f433bfccf4ced3743a2526f67fdb5c38e.tar.gz
spark-d0ac0e6f433bfccf4ced3743a2526f67fdb5c38e.tar.bz2
spark-d0ac0e6f433bfccf4ced3743a2526f67fdb5c38e.zip
[SPARK-16020][SQL] Fix complete mode aggregation with console sink
## What changes were proposed in this pull request? We cannot use `limit` on DataFrame in ConsoleSink because it will use a wrong planner. This PR just collects `DataFrame` and calls `show` on a batch DataFrame based on the result. This is fine since ConsoleSink is only for debugging. ## How was this patch tested? Manually confirmed ConsoleSink now works with complete mode aggregation. Author: Shixiong Zhu <shixiong@databricks.com> Closes #13740 from zsxwing/complete-console.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ConsoleSinkSuite.scala99
3 files changed, 105 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala
index e641e09b56..2571b59be5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala
@@ -30,6 +30,9 @@ trait Sink {
* Adds a batch of data to this sink. The data for a given `batchId` is deterministic and if
* this method is called more than once with the same batchId (which will happen in the case of
* failures), then `data` should only be added once.
+ *
+ * Note: You cannot apply any operators on `data` except consuming it (e.g., `collect/foreach`).
+ * Otherwise, you may get a wrong result.
*/
def addBatch(batchId: Long, data: DataFrame): Unit
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
index 2ec2a3c3c4..e8b9712d19 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
@@ -45,7 +45,9 @@ class ConsoleSink(options: Map[String, String]) extends Sink with Logging {
println(batchIdStr)
println("-------------------------------------------")
// scalastyle:off println
- data.show(numRowsToShow, isTruncated)
+ data.sparkSession.createDataFrame(
+ data.sparkSession.sparkContext.parallelize(data.collect()), data.schema)
+ .show(numRowsToShow, isTruncated)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ConsoleSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ConsoleSinkSuite.scala
new file mode 100644
index 0000000000..e853d8c465
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ConsoleSinkSuite.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{ByteArrayOutputStream, PrintStream}
+import java.nio.charset.StandardCharsets.UTF_8
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql.streaming.StreamTest
+
+class ConsoleSinkSuite extends StreamTest with BeforeAndAfter {
+
+ import testImplicits._
+
+ after {
+ sqlContext.streams.active.foreach(_.stop())
+ }
+
+ test("SPARK-16020 Complete mode aggregation with console sink") {
+ withTempDir { checkpointLocation =>
+ val origOut = System.out
+ val stdout = new ByteArrayOutputStream()
+ try {
+ // Hook Java System.out.println
+ System.setOut(new PrintStream(stdout))
+ // Hook Scala println
+ Console.withOut(stdout) {
+ val input = MemoryStream[String]
+ val df = input.toDF().groupBy("value").count()
+ val query = df.writeStream
+ .format("console")
+ .outputMode("complete")
+ .option("checkpointLocation", checkpointLocation.getAbsolutePath)
+ .start()
+ input.addData("a")
+ query.processAllAvailable()
+ input.addData("a", "b")
+ query.processAllAvailable()
+ input.addData("a", "b", "c")
+ query.processAllAvailable()
+ query.stop()
+ }
+ System.out.flush()
+ } finally {
+ System.setOut(origOut)
+ }
+
+ val expected = """-------------------------------------------
+ |Batch: 0
+ |-------------------------------------------
+ |+-----+-----+
+ ||value|count|
+ |+-----+-----+
+ || a| 1|
+ |+-----+-----+
+ |
+ |-------------------------------------------
+ |Batch: 1
+ |-------------------------------------------
+ |+-----+-----+
+ ||value|count|
+ |+-----+-----+
+ || a| 2|
+ || b| 1|
+ |+-----+-----+
+ |
+ |-------------------------------------------
+ |Batch: 2
+ |-------------------------------------------
+ |+-----+-----+
+ ||value|count|
+ |+-----+-----+
+ || a| 3|
+ || b| 2|
+ || c| 1|
+ |+-----+-----+
+ |
+ |""".stripMargin
+ assert(expected === new String(stdout.toByteArray, UTF_8))
+ }
+ }
+
+}