aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala64
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala81
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala59
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala49
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala129
14 files changed, 430 insertions, 15 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala
index eb69804c39..1dc9a6893e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala
@@ -92,6 +92,15 @@ trait ContinuousQuery {
def awaitTermination(timeoutMs: Long): Boolean
/**
+ * Blocks until all available data in the source has been processed an committed to the sink.
+ * This method is intended for testing. Note that in the case of continually arriving data, this
+ * method may block forever. Additionally, this method is only guranteed to block until data that
+ * has been synchronously appended data to a [[org.apache.spark.sql.execution.streaming.Source]]
+ * prior to invocation. (i.e. `getOffset` must immediately reflect the addition).
+ */
+ def processAllAvailable(): Unit
+
+ /**
* Stops the execution of this query if it is running. This method blocks until the threads
* performing execution has stopped.
* @since 2.0.0
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala
index 67dd9dbe23..fec38629d9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala
@@ -32,12 +32,12 @@ import org.apache.spark.sql.execution.streaming.{Offset, StreamExecution}
*/
@Experimental
class ContinuousQueryException private[sql](
- val query: ContinuousQuery,
+ @transient val query: ContinuousQuery,
val message: String,
val cause: Throwable,
val startOffset: Option[Offset] = None,
- val endOffset: Option[Offset] = None
- ) extends Exception(message, cause) {
+ val endOffset: Option[Offset] = None)
+ extends Exception(message, cause) {
/** Time when the exception occurred */
val time: Long = System.currentTimeMillis
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 548da86359..c66921f485 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -22,6 +22,7 @@ import java.util.ServiceLoader
import scala.collection.JavaConverters._
import scala.language.{existentials, implicitConversions}
import scala.util.{Failure, Success, Try}
+import scala.util.control.NonFatal
import org.apache.hadoop.fs.Path
@@ -29,7 +30,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.execution.streaming.{FileStreamSource, Sink, Source}
+import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{CalendarIntervalType, StructType}
import org.apache.spark.util.Utils
@@ -176,14 +177,41 @@ case class DataSource(
/** Returns a sink that can be used to continually write data. */
def createSink(): Sink = {
- val datasourceClass = providingClass.newInstance() match {
- case s: StreamSinkProvider => s
+ providingClass.newInstance() match {
+ case s: StreamSinkProvider => s.createSink(sqlContext, options, partitionColumns)
+ case format: FileFormat =>
+ val caseInsensitiveOptions = new CaseInsensitiveMap(options)
+ val path = caseInsensitiveOptions.getOrElse("path", {
+ throw new IllegalArgumentException("'path' is not specified")
+ })
+
+ new FileStreamSink(sqlContext, path, format)
case _ =>
throw new UnsupportedOperationException(
s"Data source $className does not support streamed writing")
}
+ }
- datasourceClass.createSink(sqlContext, options, partitionColumns)
+ /**
+ * Returns true if there is a single path that has a metadata log indicating which files should
+ * be read.
+ */
+ def hasMetadata(path: Seq[String]): Boolean = {
+ path match {
+ case Seq(singlePath) =>
+ try {
+ val hdfsPath = new Path(singlePath)
+ val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+ val metadataPath = new Path(hdfsPath, FileStreamSink.metadataDir)
+ val res = fs.exists(metadataPath)
+ res
+ } catch {
+ case NonFatal(e) =>
+ logWarning(s"Error while looking for metadata directory.")
+ false
+ }
+ case _ => false
+ }
}
/** Create a resolved [[BaseRelation]] that can be used to read data from this [[DataSource]] */
@@ -200,6 +228,34 @@ case class DataSource(
case (_: RelationProvider, Some(_)) =>
throw new AnalysisException(s"$className does not allow user-specified schemas.")
+ // We are reading from the results of a streaming query. Load files from the metadata log
+ // instead of listing them using HDFS APIs.
+ case (format: FileFormat, _)
+ if hasMetadata(caseInsensitiveOptions.get("path").toSeq ++ paths) =>
+ val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head)
+ val fileCatalog =
+ new StreamFileCatalog(sqlContext, basePath)
+ val dataSchema = userSpecifiedSchema.orElse {
+ format.inferSchema(
+ sqlContext,
+ caseInsensitiveOptions,
+ fileCatalog.allFiles())
+ }.getOrElse {
+ throw new AnalysisException(
+ s"Unable to infer schema for $format at ${fileCatalog.allFiles().mkString(",")}. " +
+ "It must be specified manually")
+ }
+
+ HadoopFsRelation(
+ sqlContext,
+ fileCatalog,
+ partitionSchema = fileCatalog.partitionSpec().partitionColumns,
+ dataSchema = dataSchema,
+ bucketSpec = None,
+ format,
+ options)
+
+ // This is a non-streaming file based datasource.
case (format: FileFormat, _) =>
val allPaths = caseInsensitiveOptions.get("path") ++ paths
val globbedPaths = allPaths.flatMap { path =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala
index e48ac59892..729c8462fe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala
@@ -64,6 +64,9 @@ case class CompositeOffset(offsets: Seq[Option[Offset]]) extends Offset {
assert(sources.size == offsets.size)
new StreamProgress ++ sources.zip(offsets).collect { case (s, Some(o)) => (s, o) }
}
+
+ override def toString: String =
+ offsets.map(_.map(_.toString).getOrElse("-")).mkString("[", ", ", "]")
}
object CompositeOffset {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
new file mode 100644
index 0000000000..e819e95d61
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.UUID
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.sources.FileFormat
+
+object FileStreamSink {
+ // The name of the subdirectory that is used to store metadata about which files are valid.
+ val metadataDir = "_spark_metadata"
+}
+
+/**
+ * A sink that writes out results to parquet files. Each batch is written out to a unique
+ * directory. After all of the files in a batch have been succesfully written, the list of
+ * file paths is appended to the log atomically. In the case of partial failures, some duplicate
+ * data may be present in the target directory, but only one copy of each file will be present
+ * in the log.
+ */
+class FileStreamSink(
+ sqlContext: SQLContext,
+ path: String,
+ fileFormat: FileFormat) extends Sink with Logging {
+
+ private val basePath = new Path(path)
+ private val logPath = new Path(basePath, FileStreamSink.metadataDir)
+ private val fileLog = new HDFSMetadataLog[Seq[String]](sqlContext, logPath.toUri.toString)
+
+ override def addBatch(batchId: Long, data: DataFrame): Unit = {
+ if (fileLog.get(batchId).isDefined) {
+ logInfo(s"Skipping already committed batch $batchId")
+ } else {
+ val files = writeFiles(data)
+ if (fileLog.add(batchId, files)) {
+ logInfo(s"Committed batch $batchId")
+ } else {
+ logWarning(s"Race while writing batch $batchId")
+ }
+ }
+ }
+
+ /** Writes the [[DataFrame]] to a UUID-named dir, returning the list of files paths. */
+ private def writeFiles(data: DataFrame): Seq[String] = {
+ val ctx = sqlContext
+ val outputDir = path
+ val format = fileFormat
+ val schema = data.schema
+
+ val file = new Path(basePath, UUID.randomUUID().toString).toUri.toString
+ data.write.parquet(file)
+ sqlContext.read
+ .schema(data.schema)
+ .parquet(file)
+ .inputFiles
+ .map(new Path(_))
+ .filterNot(_.getName.startsWith("_"))
+ .map(_.toUri.toString)
+ }
+
+ override def toString: String = s"FileSink[$path]"
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index d13b1a6166..1b70055f34 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -44,7 +44,7 @@ class FileStreamSource(
private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)
private val seenFiles = new OpenHashSet[String]
- metadataLog.get(None, maxBatchId).foreach { case (batchId, files) =>
+ metadataLog.get(None, Some(maxBatchId)).foreach { case (batchId, files) =>
files.foreach(seenFiles.add)
}
@@ -114,18 +114,24 @@ class FileStreamSource(
val endId = end.asInstanceOf[LongOffset].offset
assert(startId <= endId)
- val files = metadataLog.get(Some(startId + 1), endId).map(_._2).flatten
- logDebug(s"Return files from batches ${startId + 1}:$endId")
+ val files = metadataLog.get(Some(startId + 1), Some(endId)).map(_._2).flatten
+ logInfo(s"Processing ${files.length} files from ${startId + 1}:$endId")
logDebug(s"Streaming ${files.mkString(", ")}")
dataFrameBuilder(files)
}
private def fetchAllFiles(): Seq[String] = {
- fs.listStatus(new Path(path))
+ val startTime = System.nanoTime()
+ val files = fs.listStatus(new Path(path))
.filterNot(_.getPath.getName.startsWith("_"))
.map(_.getPath.toUri.toString)
+ val endTime = System.nanoTime()
+ logDebug(s"Listed ${files.size} in ${(endTime.toDouble - startTime) / 1000000}ms")
+ files
}
override def getOffset: Option[Offset] = Some(fetchMaxOffset()).filterNot(_.offset == -1)
+
+ override def toString: String = s"FileSource[$path]"
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index 298b5d292e..f27d23b1cd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -170,11 +170,12 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String)
}
}
- override def get(startId: Option[Long], endId: Long): Array[(Long, T)] = {
- val batchIds = fc.util().listStatus(metadataPath, batchFilesFilter)
+ override def get(startId: Option[Long], endId: Option[Long]): Array[(Long, T)] = {
+ val files = fc.util().listStatus(metadataPath, batchFilesFilter)
+ val batchIds = files
.map(_.getPath.getName.toLong)
.filter { batchId =>
- batchId <= endId && (startId.isEmpty || batchId >= startId.get)
+ (endId.isEmpty || batchId <= endId.get) && (startId.isEmpty || batchId >= startId.get)
}
batchIds.sorted.map(batchId => (batchId, get(batchId))).filter(_._2.isDefined).map {
case (batchId, metadataOption) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
index 008195af38..bb176408d8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
@@ -30,4 +30,6 @@ case class LongOffset(offset: Long) extends Offset {
def +(increment: Long): LongOffset = new LongOffset(offset + increment)
def -(decrement: Long): LongOffset = new LongOffset(offset - decrement)
+
+ override def toString: String = s"#$offset"
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
index 3f9896d23c..cc70e1d314 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
@@ -42,7 +42,7 @@ trait MetadataLog[T] {
* Return metadata for batches between startId (inclusive) and endId (inclusive). If `startId` is
* `None`, just return all batches before endId (inclusive).
*/
- def get(startId: Option[Long], endId: Long): Array[(Long, T)]
+ def get(startId: Option[Long], endId: Option[Long]): Array[(Long, T)]
/**
* Return the latest batch Id and its metadata if exist.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 29b058f2e4..5abd7eca2c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -239,6 +239,12 @@ class StreamExecution(
logInfo(s"Committed offsets for batch $currentBatchId.")
true
} else {
+ noNewData = true
+ awaitBatchLock.synchronized {
+ // Wake up any threads that are waiting for the stream to progress.
+ awaitBatchLock.notifyAll()
+ }
+
false
}
}
@@ -334,6 +340,18 @@ class StreamExecution(
logDebug(s"Unblocked at $newOffset for $source")
}
+ /** A flag to indicate that a batch has completed with no new data available. */
+ @volatile private var noNewData = false
+
+ override def processAllAvailable(): Unit = {
+ noNewData = false
+ while (!noNewData) {
+ awaitBatchLock.synchronized { awaitBatchLock.wait(10000) }
+ if (streamDeathCause != null) { throw streamDeathCause }
+ }
+ if (streamDeathCause != null) { throw streamDeathCause }
+ }
+
override def awaitTermination(): Unit = {
if (state == INITIALIZED) {
throw new IllegalStateException("Cannot wait for termination on a query that has not started")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala
new file mode 100644
index 0000000000..b8d69b1845
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.execution.datasources.PartitionSpec
+import org.apache.spark.sql.sources.{FileCatalog, Partition}
+import org.apache.spark.sql.types.StructType
+
+class StreamFileCatalog(sqlContext: SQLContext, path: Path) extends FileCatalog with Logging {
+ val metadataDirectory = new Path(path, FileStreamSink.metadataDir)
+ logInfo(s"Reading streaming file log from $metadataDirectory")
+ val metadataLog = new HDFSMetadataLog[Seq[String]](sqlContext, metadataDirectory.toUri.toString)
+ val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+
+ override def paths: Seq[Path] = path :: Nil
+
+ override def partitionSpec(): PartitionSpec = PartitionSpec(StructType(Nil), Nil)
+
+ /**
+ * Returns all valid files grouped into partitions when the data is partitioned. If the data is
+ * unpartitioned, this will return a single partition with not partition values.
+ *
+ * @param filters the filters used to prune which partitions are returned. These filters must
+ * only refer to partition columns and this method will only return files
+ * where these predicates are guaranteed to evaluate to `true`. Thus, these
+ * filters will not need to be evaluated again on the returned data.
+ */
+ override def listFiles(filters: Seq[Expression]): Seq[Partition] =
+ Partition(InternalRow.empty, allFiles()) :: Nil
+
+ override def getStatus(path: Path): Array[FileStatus] = fs.listStatus(path)
+
+ override def refresh(): Unit = {}
+
+ override def allFiles(): Seq[FileStatus] = {
+ fs.listStatus(metadataLog.get(None, None).flatMap(_._2).map(new Path(_)))
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
index 4ddc218455..9ed5686d97 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
@@ -27,6 +27,8 @@ import org.apache.spark.sql.test.SharedSQLContext
class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
+ private implicit def toOption[A](a: A): Option[A] = Option(a)
+
test("basic") {
withTempDir { temp =>
val metadataLog = new HDFSMetadataLog[String](sqlContext, temp.getAbsolutePath)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
new file mode 100644
index 0000000000..7f31611383
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.sql.StreamTest
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.util.Utils
+
+class FileStreamSinkSuite extends StreamTest with SharedSQLContext {
+ import testImplicits._
+
+ test("unpartitioned writing") {
+ val inputData = MemoryStream[Int]
+ val df = inputData.toDF()
+
+ val outputDir = Utils.createTempDir("stream.output").getCanonicalPath
+ val checkpointDir = Utils.createTempDir("stream.checkpoint").getCanonicalPath
+
+ val query =
+ df.write
+ .format("parquet")
+ .option("checkpointLocation", checkpointDir)
+ .startStream(outputDir)
+
+ inputData.addData(1, 2, 3)
+ failAfter(streamingTimeout) { query.processAllAvailable() }
+
+ val outputDf = sqlContext.read.parquet(outputDir).as[Int]
+ checkDataset(
+ outputDf,
+ 1, 2, 3)
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
new file mode 100644
index 0000000000..5a1bfb3a00
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.io.File
+import java.util.UUID
+
+import scala.util.Random
+import scala.util.control.NonFatal
+
+import org.apache.spark.sql.{ContinuousQuery, ContinuousQueryException, StreamTest}
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.util.Utils
+
+/**
+ * A stress test for streamign queries that read and write files. This test constists of
+ * two threads:
+ * - one that writes out `numRecords` distinct integers to files of random sizes (the total
+ * number of records is fixed but each files size / creation time is random).
+ * - another that continually restarts a buggy streaming query (i.e. fails with 5% probability on
+ * any partition).
+ *
+ * At the end, the resulting files are loaded and the answer is checked.
+ */
+class FileStressSuite extends StreamTest with SharedSQLContext {
+ import testImplicits._
+
+ test("fault tolerance stress test") {
+ val numRecords = 10000
+ val inputDir = Utils.createTempDir("stream.input").getCanonicalPath
+ val stagingDir = Utils.createTempDir("stream.staging").getCanonicalPath
+ val outputDir = Utils.createTempDir("stream.output").getCanonicalPath
+ val checkpoint = Utils.createTempDir("stream.checkpoint").getCanonicalPath
+
+ @volatile
+ var continue = true
+ @volatile
+ var stream: ContinuousQuery = null
+
+ val writer = new Thread("stream writer") {
+ override def run(): Unit = {
+ var i = numRecords
+ while (i > 0) {
+ val count = Random.nextInt(100)
+ var j = 0
+ var string = ""
+ while (j < count && i > 0) {
+ if (i % 10000 == 0) { logError(s"Wrote record $i") }
+ string = string + i + "\n"
+ j += 1
+ i -= 1
+ }
+
+ val uuid = UUID.randomUUID().toString
+ val fileName = new File(stagingDir, uuid)
+ stringToFile(fileName, string)
+ fileName.renameTo(new File(inputDir, uuid))
+ val sleep = Random.nextInt(100)
+ Thread.sleep(sleep)
+ }
+
+ logError("== DONE WRITING ==")
+ var done = false
+ while (!done) {
+ try {
+ stream.processAllAvailable()
+ done = true
+ } catch {
+ case NonFatal(_) =>
+ }
+ }
+
+ continue = false
+ stream.stop()
+ }
+ }
+ writer.start()
+
+ val input = sqlContext.read.format("text").stream(inputDir)
+ def startStream(): ContinuousQuery = input
+ .repartition(5)
+ .as[String]
+ .mapPartitions { iter =>
+ val rand = Random.nextInt(100)
+ if (rand < 5) { sys.error("failure") }
+ iter.map(_.toLong)
+ }
+ .write
+ .format("parquet")
+ .option("checkpointLocation", checkpoint)
+ .startStream(outputDir)
+
+ var failures = 0
+ val streamThread = new Thread("stream runner") {
+ while (continue) {
+ if (failures % 10 == 0) { logError(s"Query restart #$failures") }
+ stream = startStream()
+
+ try {
+ stream.awaitTermination()
+ } catch {
+ case ce: ContinuousQueryException =>
+ failures += 1
+ }
+ }
+ }
+
+ streamThread.join()
+
+ logError(s"Stream restarted $failures times.")
+ assert(sqlContext.read.parquet(outputDir).distinct().count() == numRecords)
+ }
+}