aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-11-15 15:12:30 -0800
committerMichael Armbrust <michael@databricks.com>2016-11-15 15:12:30 -0800
commit1ae4652b7e1f77a984b8459c778cb06c814192c5 (patch)
tree7a9c1d234eba7cf5212fd9d15c85aa03d590817e /sql
parent5bcb9a7ff4bdd7dac75481a951cd7da2133a2e2d (diff)
downloadspark-1ae4652b7e1f77a984b8459c778cb06c814192c5.tar.gz
spark-1ae4652b7e1f77a984b8459c778cb06c814192c5.tar.bz2
spark-1ae4652b7e1f77a984b8459c778cb06c814192c5.zip
[SPARK-18440][STRUCTURED STREAMING] Pass correct query execution to FileFormatWriter
## What changes were proposed in this pull request? SPARK-18012 refactored the file write path in FileStreamSink using FileFormatWriter which always uses the default non-streaming QueryExecution to perform the writes. This is wrong for FileStreamSink, because the streaming QueryExecution (i.e. IncrementalExecution) should be used for correctly incrementalizing aggregation. The addition of watermarks in SPARK-18124, file stream sink should logically supports aggregation + watermark + append mode. But actually it fails with ``` 16:23:07.389 ERROR org.apache.spark.sql.execution.streaming.StreamExecution: Query query-0 terminated with error java.lang.AssertionError: assertion failed: No plan for EventTimeWatermark timestamp#7: timestamp, interval 10 seconds +- LocalRelation [timestamp#7] at scala.Predef$.assert(Predef.scala:170) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74) ``` This PR fixes it by passing the correct query execution. ## How was this patch tested? New unit test Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #15885 from tdas/SPARK-18440.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala78
4 files changed, 79 insertions, 12 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index 4e4b0e48cd..d560ad5709 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.{SQLExecution, UnsafeKVExternalSorter}
+import org.apache.spark.sql.execution.{QueryExecution, SQLExecution, UnsafeKVExternalSorter}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.util.{SerializableConfiguration, Utils}
import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
@@ -85,7 +85,7 @@ object FileFormatWriter extends Logging {
*/
def write(
sparkSession: SparkSession,
- plan: LogicalPlan,
+ queryExecution: QueryExecution,
fileFormat: FileFormat,
committer: FileCommitProtocol,
outputSpec: OutputSpec,
@@ -101,8 +101,7 @@ object FileFormatWriter extends Logging {
FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath))
val partitionSet = AttributeSet(partitionColumns)
- val dataColumns = plan.output.filterNot(partitionSet.contains)
- val queryExecution = Dataset.ofRows(sparkSession, plan).queryExecution
+ val dataColumns = queryExecution.logical.output.filterNot(partitionSet.contains)
// Note: prepareWrite has side effect. It sets "job".
val outputWriterFactory =
@@ -112,7 +111,7 @@ object FileFormatWriter extends Logging {
uuid = UUID.randomUUID().toString,
serializableHadoopConf = new SerializableConfiguration(job.getConfiguration),
outputWriterFactory = outputWriterFactory,
- allColumns = plan.output,
+ allColumns = queryExecution.logical.output,
partitionColumns = partitionColumns,
nonPartitionColumns = dataColumns,
bucketSpec = bucketSpec,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index 28975e1546..a9bde903b3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -100,7 +100,7 @@ case class InsertIntoHadoopFsRelationCommand(
FileFormatWriter.write(
sparkSession = sparkSession,
- plan = query,
+ queryExecution = Dataset.ofRows(sparkSession, query).queryExecution,
fileFormat = fileFormat,
committer = committer,
outputSpec = FileFormatWriter.OutputSpec(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index f1c5f9ab50..0dbe2a71ed 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -77,7 +77,7 @@ class FileStreamSink(
FileFormatWriter.write(
sparkSession = sparkSession,
- plan = data.logicalPlan,
+ queryExecution = data.queryExecution,
fileFormat = fileFormat,
committer = committer,
outputSpec = FileFormatWriter.OutputSpec(path, Map.empty),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index fa97d9292e..09613ef9e4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -21,13 +21,14 @@ import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.streaming.{MemoryStream, MetadataLogFileIndex}
+import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.util.Utils
class FileStreamSinkSuite extends StreamTest {
import testImplicits._
- test("FileStreamSink - unpartitioned writing and batch reading") {
+ test("unpartitioned writing and batch reading") {
val inputData = MemoryStream[Int]
val df = inputData.toDF()
@@ -59,7 +60,7 @@ class FileStreamSinkSuite extends StreamTest {
}
}
- test("FileStreamSink - partitioned writing and batch reading") {
+ test("partitioned writing and batch reading") {
val inputData = MemoryStream[Int]
val ds = inputData.toDS()
@@ -142,16 +143,83 @@ class FileStreamSinkSuite extends StreamTest {
}
}
- test("FileStreamSink - parquet") {
+ // This tests whether FileStreamSink works with aggregations. Specifically, it tests
+ // whether the the correct streaming QueryExecution (i.e. IncrementalExecution) is used to
+ // to execute the trigger for writing data to file sink. See SPARK-18440 for more details.
+ test("writing with aggregation") {
+
+ // Since FileStreamSink currently only supports append mode, we will test FileStreamSink
+ // with aggregations using event time windows and watermark, which allows
+ // aggregation + append mode.
+ val inputData = MemoryStream[Long]
+ val inputDF = inputData.toDF.toDF("time")
+ val outputDf = inputDF
+ .selectExpr("CAST(time AS timestamp) AS timestamp")
+ .withWatermark("timestamp", "10 seconds")
+ .groupBy(window($"timestamp", "5 seconds"))
+ .count()
+ .select("window.start", "window.end", "count")
+
+ val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
+ val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath
+
+ var query: StreamingQuery = null
+
+ try {
+ query =
+ outputDf.writeStream
+ .option("checkpointLocation", checkpointDir)
+ .format("parquet")
+ .start(outputDir)
+
+
+ def addTimestamp(timestampInSecs: Int*): Unit = {
+ inputData.addData(timestampInSecs.map(_ * 1L): _*)
+ failAfter(streamingTimeout) {
+ query.processAllAvailable()
+ }
+ }
+
+ def check(expectedResult: ((Long, Long), Long)*): Unit = {
+ val outputDf = spark.read.parquet(outputDir)
+ .selectExpr(
+ "CAST(start as BIGINT) AS start",
+ "CAST(end as BIGINT) AS end",
+ "count")
+ checkDataset(
+ outputDf.as[(Long, Long, Long)],
+ expectedResult.map(x => (x._1._1, x._1._2, x._2)): _*)
+ }
+
+ addTimestamp(100) // watermark = None before this, watermark = 100 - 10 = 90 after this
+ check() // nothing emitted yet
+
+ addTimestamp(104, 123) // watermark = 90 before this, watermark = 123 - 10 = 113 after this
+ check() // nothing emitted yet
+
+ addTimestamp(140) // wm = 113 before this, emit results on 100-105, wm = 130 after this
+ check((100L, 105L) -> 2L)
+
+ addTimestamp(150) // wm = 130s before this, emit results on 120-125, wm = 150 after this
+ check((100L, 105L) -> 2L, (120L, 125L) -> 1L)
+
+ } finally {
+ if (query != null) {
+ query.stop()
+ }
+ }
+ }
+
+ test("parquet") {
testFormat(None) // should not throw error as default format parquet when not specified
testFormat(Some("parquet"))
}
- test("FileStreamSink - text") {
+ test("text") {
testFormat(Some("text"))
}
- test("FileStreamSink - json") {
+ test("json") {
testFormat(Some("json"))
}