From d0ac0e6f433bfccf4ced3743a2526f67fdb5c38e Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Fri, 17 Jun 2016 21:58:10 -0700 Subject: [SPARK-16020][SQL] Fix complete mode aggregation with console sink ## What changes were proposed in this pull request? We cannot use `limit` on DataFrame in ConsoleSink because it will use a wrong planner. This PR just collects `DataFrame` and calls `show` on a batch DataFrame based on the result. This is fine since ConsoleSink is only for debugging. ## How was this patch tested? Manually confirmed ConsoleSink now works with complete mode aggregation. Author: Shixiong Zhu Closes #13740 from zsxwing/complete-console. --- .../spark/sql/execution/streaming/Sink.scala | 3 + .../spark/sql/execution/streaming/console.scala | 4 +- .../sql/execution/streaming/ConsoleSinkSuite.scala | 99 ++++++++++++++++++++++ 3 files changed, 105 insertions(+), 1 deletion(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ConsoleSinkSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala index e641e09b56..2571b59be5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala @@ -30,6 +30,9 @@ trait Sink { * Adds a batch of data to this sink. The data for a given `batchId` is deterministic and if * this method is called more than once with the same batchId (which will happen in the case of * failures), then `data` should only be added once. + * + * Note: You cannot apply any operators on `data` except consuming it (e.g., `collect/foreach`). + * Otherwise, you may get a wrong result. */ def addBatch(batchId: Long, data: DataFrame): Unit } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala index 2ec2a3c3c4..e8b9712d19 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala @@ -45,7 +45,9 @@ class ConsoleSink(options: Map[String, String]) extends Sink with Logging { println(batchIdStr) println("-------------------------------------------") // scalastyle:off println - data.show(numRowsToShow, isTruncated) + data.sparkSession.createDataFrame( + data.sparkSession.sparkContext.parallelize(data.collect()), data.schema) + .show(numRowsToShow, isTruncated) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ConsoleSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ConsoleSinkSuite.scala new file mode 100644 index 0000000000..e853d8c465 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ConsoleSinkSuite.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.{ByteArrayOutputStream, PrintStream} +import java.nio.charset.StandardCharsets.UTF_8 + +import org.scalatest.BeforeAndAfter + +import org.apache.spark.sql.streaming.StreamTest + +class ConsoleSinkSuite extends StreamTest with BeforeAndAfter { + + import testImplicits._ + + after { + sqlContext.streams.active.foreach(_.stop()) + } + + test("SPARK-16020 Complete mode aggregation with console sink") { + withTempDir { checkpointLocation => + val origOut = System.out + val stdout = new ByteArrayOutputStream() + try { + // Hook Java System.out.println + System.setOut(new PrintStream(stdout)) + // Hook Scala println + Console.withOut(stdout) { + val input = MemoryStream[String] + val df = input.toDF().groupBy("value").count() + val query = df.writeStream + .format("console") + .outputMode("complete") + .option("checkpointLocation", checkpointLocation.getAbsolutePath) + .start() + input.addData("a") + query.processAllAvailable() + input.addData("a", "b") + query.processAllAvailable() + input.addData("a", "b", "c") + query.processAllAvailable() + query.stop() + } + System.out.flush() + } finally { + System.setOut(origOut) + } + + val expected = """------------------------------------------- + |Batch: 0 + |------------------------------------------- + |+-----+-----+ + ||value|count| + |+-----+-----+ + || a| 1| + |+-----+-----+ + | + |------------------------------------------- + |Batch: 1 + |------------------------------------------- + |+-----+-----+ + ||value|count| + |+-----+-----+ + || a| 2| + || b| 1| + |+-----+-----+ + | + |------------------------------------------- + |Batch: 2 + |------------------------------------------- + |+-----+-----+ + ||value|count| + |+-----+-----+ + || a| 3| + || b| 2| + || c| 1| + |+-----+-----+ + | + |""".stripMargin + assert(expected === new String(stdout.toByteArray, UTF_8)) + } + } + +} -- cgit v1.2.3