aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test/scala/org/apache
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2016-03-23 13:02:40 -0700
committerMichael Armbrust <michael@databricks.com>2016-03-23 13:03:25 -0700
commit6bc4be64f86afcb38e4444c80c9400b7b6b745de (patch)
treeb4a671d489eef1e850590cf078764fc77e392870 /sql/core/src/test/scala/org/apache
parent919bf321987712d9143cae3c4e064fcb077ded1f (diff)
downloadspark-6bc4be64f86afcb38e4444c80c9400b7b6b745de.tar.gz
spark-6bc4be64f86afcb38e4444c80c9400b7b6b745de.tar.bz2
spark-6bc4be64f86afcb38e4444c80c9400b7b6b745de.zip
[SPARK-14078] Streaming Parquet Based FileSink
This PR adds a new `Sink` implementation that writes out Parquet files. In order to correctly handle partial failures while maintaining exactly once semantics, the files for each batch are written out to a unique directory and then atomically appended to a metadata log. When a parquet based `DataSource` is initialized for reading, we first check for this log directory and use it instead of file listing when present. Unit tests are added, as well as a stress test that checks the answer after non-deterministic injected failures. Author: Michael Armbrust <michael@databricks.com> Closes #11897 from marmbrus/fileSink.
Diffstat (limited to 'sql/core/src/test/scala/org/apache')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala49
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala129
3 files changed, 180 insertions, 0 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
index 4ddc218455..9ed5686d97 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
@@ -27,6 +27,8 @@ import org.apache.spark.sql.test.SharedSQLContext
class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
+ private implicit def toOption[A](a: A): Option[A] = Option(a)
+
test("basic") {
withTempDir { temp =>
val metadataLog = new HDFSMetadataLog[String](sqlContext, temp.getAbsolutePath)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
new file mode 100644
index 0000000000..7f31611383
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.sql.StreamTest
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.util.Utils
+
+class FileStreamSinkSuite extends StreamTest with SharedSQLContext {
+ import testImplicits._
+
+ test("unpartitioned writing") {
+ val inputData = MemoryStream[Int]
+ val df = inputData.toDF()
+
+ val outputDir = Utils.createTempDir("stream.output").getCanonicalPath
+ val checkpointDir = Utils.createTempDir("stream.checkpoint").getCanonicalPath
+
+ val query =
+ df.write
+ .format("parquet")
+ .option("checkpointLocation", checkpointDir)
+ .startStream(outputDir)
+
+ inputData.addData(1, 2, 3)
+ failAfter(streamingTimeout) { query.processAllAvailable() }
+
+ val outputDf = sqlContext.read.parquet(outputDir).as[Int]
+ checkDataset(
+ outputDf,
+ 1, 2, 3)
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
new file mode 100644
index 0000000000..5a1bfb3a00
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.io.File
+import java.util.UUID
+
+import scala.util.Random
+import scala.util.control.NonFatal
+
+import org.apache.spark.sql.{ContinuousQuery, ContinuousQueryException, StreamTest}
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.util.Utils
+
+/**
+ * A stress test for streamign queries that read and write files. This test constists of
+ * two threads:
+ * - one that writes out `numRecords` distinct integers to files of random sizes (the total
+ * number of records is fixed but each files size / creation time is random).
+ * - another that continually restarts a buggy streaming query (i.e. fails with 5% probability on
+ * any partition).
+ *
+ * At the end, the resulting files are loaded and the answer is checked.
+ */
+class FileStressSuite extends StreamTest with SharedSQLContext {
+ import testImplicits._
+
+ test("fault tolerance stress test") {
+ val numRecords = 10000
+ val inputDir = Utils.createTempDir("stream.input").getCanonicalPath
+ val stagingDir = Utils.createTempDir("stream.staging").getCanonicalPath
+ val outputDir = Utils.createTempDir("stream.output").getCanonicalPath
+ val checkpoint = Utils.createTempDir("stream.checkpoint").getCanonicalPath
+
+ @volatile
+ var continue = true
+ @volatile
+ var stream: ContinuousQuery = null
+
+ val writer = new Thread("stream writer") {
+ override def run(): Unit = {
+ var i = numRecords
+ while (i > 0) {
+ val count = Random.nextInt(100)
+ var j = 0
+ var string = ""
+ while (j < count && i > 0) {
+ if (i % 10000 == 0) { logError(s"Wrote record $i") }
+ string = string + i + "\n"
+ j += 1
+ i -= 1
+ }
+
+ val uuid = UUID.randomUUID().toString
+ val fileName = new File(stagingDir, uuid)
+ stringToFile(fileName, string)
+ fileName.renameTo(new File(inputDir, uuid))
+ val sleep = Random.nextInt(100)
+ Thread.sleep(sleep)
+ }
+
+ logError("== DONE WRITING ==")
+ var done = false
+ while (!done) {
+ try {
+ stream.processAllAvailable()
+ done = true
+ } catch {
+ case NonFatal(_) =>
+ }
+ }
+
+ continue = false
+ stream.stop()
+ }
+ }
+ writer.start()
+
+ val input = sqlContext.read.format("text").stream(inputDir)
+ def startStream(): ContinuousQuery = input
+ .repartition(5)
+ .as[String]
+ .mapPartitions { iter =>
+ val rand = Random.nextInt(100)
+ if (rand < 5) { sys.error("failure") }
+ iter.map(_.toLong)
+ }
+ .write
+ .format("parquet")
+ .option("checkpointLocation", checkpoint)
+ .startStream(outputDir)
+
+ var failures = 0
+ val streamThread = new Thread("stream runner") {
+ while (continue) {
+ if (failures % 10 == 0) { logError(s"Query restart #$failures") }
+ stream = startStream()
+
+ try {
+ stream.awaitTermination()
+ } catch {
+ case ce: ContinuousQueryException =>
+ failures += 1
+ }
+ }
+ }
+
+ streamThread.join()
+
+ logError(s"Stream restarted $failures times.")
+ assert(sqlContext.read.parquet(outputDir).distinct().count() == numRecords)
+ }
+}