diff options
author | Michael Armbrust <michael@databricks.com> | 2016-03-23 13:02:40 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2016-03-23 13:03:25 -0700 |
commit | 6bc4be64f86afcb38e4444c80c9400b7b6b745de (patch) | |
tree | b4a671d489eef1e850590cf078764fc77e392870 /sql/core/src/test/scala/org/apache | |
parent | 919bf321987712d9143cae3c4e064fcb077ded1f (diff) | |
download | spark-6bc4be64f86afcb38e4444c80c9400b7b6b745de.tar.gz spark-6bc4be64f86afcb38e4444c80c9400b7b6b745de.tar.bz2 spark-6bc4be64f86afcb38e4444c80c9400b7b6b745de.zip |
[SPARK-14078] Streaming Parquet Based FileSink
This PR adds a new `Sink` implementation that writes out Parquet files. In order to correctly handle partial failures while maintaining exactly once semantics, the files for each batch are written out to a unique directory and then atomically appended to a metadata log. When a parquet based `DataSource` is initialized for reading, we first check for this log directory and use it instead of file listing when present.
Unit tests are added, as well as a stress test that checks the answer after non-deterministic injected failures.
Author: Michael Armbrust <michael@databricks.com>
Closes #11897 from marmbrus/fileSink.
Diffstat (limited to 'sql/core/src/test/scala/org/apache')
3 files changed, 180 insertions, 0 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala index 4ddc218455..9ed5686d97 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala @@ -27,6 +27,8 @@ import org.apache.spark.sql.test.SharedSQLContext class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { + private implicit def toOption[A](a: A): Option[A] = Option(a) + test("basic") { withTempDir { temp => val metadataLog = new HDFSMetadataLog[String](sqlContext, temp.getAbsolutePath) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala new file mode 100644 index 0000000000..7f31611383 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import org.apache.spark.sql.StreamTest +import org.apache.spark.sql.execution.streaming.MemoryStream +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.util.Utils + +class FileStreamSinkSuite extends StreamTest with SharedSQLContext { + import testImplicits._ + + test("unpartitioned writing") { + val inputData = MemoryStream[Int] + val df = inputData.toDF() + + val outputDir = Utils.createTempDir("stream.output").getCanonicalPath + val checkpointDir = Utils.createTempDir("stream.checkpoint").getCanonicalPath + + val query = + df.write + .format("parquet") + .option("checkpointLocation", checkpointDir) + .startStream(outputDir) + + inputData.addData(1, 2, 3) + failAfter(streamingTimeout) { query.processAllAvailable() } + + val outputDf = sqlContext.read.parquet(outputDir).as[Int] + checkDataset( + outputDf, + 1, 2, 3) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala new file mode 100644 index 0000000000..5a1bfb3a00 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.io.File +import java.util.UUID + +import scala.util.Random +import scala.util.control.NonFatal + +import org.apache.spark.sql.{ContinuousQuery, ContinuousQueryException, StreamTest} +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.util.Utils + +/** + * A stress test for streamign queries that read and write files. This test constists of + * two threads: + * - one that writes out `numRecords` distinct integers to files of random sizes (the total + * number of records is fixed but each files size / creation time is random). + * - another that continually restarts a buggy streaming query (i.e. fails with 5% probability on + * any partition). + * + * At the end, the resulting files are loaded and the answer is checked. + */ +class FileStressSuite extends StreamTest with SharedSQLContext { + import testImplicits._ + + test("fault tolerance stress test") { + val numRecords = 10000 + val inputDir = Utils.createTempDir("stream.input").getCanonicalPath + val stagingDir = Utils.createTempDir("stream.staging").getCanonicalPath + val outputDir = Utils.createTempDir("stream.output").getCanonicalPath + val checkpoint = Utils.createTempDir("stream.checkpoint").getCanonicalPath + + @volatile + var continue = true + @volatile + var stream: ContinuousQuery = null + + val writer = new Thread("stream writer") { + override def run(): Unit = { + var i = numRecords + while (i > 0) { + val count = Random.nextInt(100) + var j = 0 + var string = "" + while (j < count && i > 0) { + if (i % 10000 == 0) { logError(s"Wrote record $i") } + string = string + i + "\n" + j += 1 + i -= 1 + } + + val uuid = UUID.randomUUID().toString + val fileName = new File(stagingDir, uuid) + stringToFile(fileName, string) + fileName.renameTo(new File(inputDir, uuid)) + val sleep = Random.nextInt(100) + Thread.sleep(sleep) + } + + logError("== DONE WRITING ==") + var done = false + while (!done) { + try { + stream.processAllAvailable() + done = true + } catch { + case NonFatal(_) => + } + } + + continue = false + stream.stop() + } + } + writer.start() + + val input = sqlContext.read.format("text").stream(inputDir) + def startStream(): ContinuousQuery = input + .repartition(5) + .as[String] + .mapPartitions { iter => + val rand = Random.nextInt(100) + if (rand < 5) { sys.error("failure") } + iter.map(_.toLong) + } + .write + .format("parquet") + .option("checkpointLocation", checkpoint) + .startStream(outputDir) + + var failures = 0 + val streamThread = new Thread("stream runner") { + while (continue) { + if (failures % 10 == 0) { logError(s"Query restart #$failures") } + stream = startStream() + + try { + stream.awaitTermination() + } catch { + case ce: ContinuousQueryException => + failures += 1 + } + } + } + + streamThread.join() + + logError(s"Stream restarted $failures times.") + assert(sqlContext.read.parquet(outputDir).distinct().count() == numRecords) + } +} |