diff options
9 files changed, 605 insertions, 54 deletions
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 4995b263e1..cd5c4a7b3e 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -940,7 +940,7 @@ class SQLTests(ReusedPySparkTestCase): cq.processAllAvailable() output_files = [] for _, _, files in os.walk(out): - output_files.extend([f for f in files if 'parquet' in f and not f.startswith('.')]) + output_files.extend([f for f in files if not f.startswith('.')]) self.assertTrue(len(output_files) > 0) self.assertTrue(len(os.listdir(chk)) > 0) finally: @@ -967,7 +967,7 @@ class SQLTests(ReusedPySparkTestCase): cq.processAllAvailable() output_files = [] for _, _, files in os.walk(out): - output_files.extend([f for f in files if 'parquet' in f and not f.startswith('.')]) + output_files.extend([f for f in files if not f.startswith('.')]) self.assertTrue(len(output_files) > 0) self.assertTrue(len(os.listdir(chk)) > 0) self.assertFalse(os.path.isdir(fake1)) # should not have been created diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 63dc1fd71e..6114142cef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -203,13 +203,14 @@ case class DataSource( def createSink(): Sink = { providingClass.newInstance() match { case s: StreamSinkProvider => s.createSink(sparkSession.wrapped, options, partitionColumns) - case format: FileFormat => + + case parquet: parquet.DefaultSource => val caseInsensitiveOptions = new CaseInsensitiveMap(options) val path = caseInsensitiveOptions.getOrElse("path", { throw new IllegalArgumentException("'path' is not specified") }) + new FileStreamSink(sparkSession, path, parquet, partitionColumns, options) - new FileStreamSink(sparkSession, path, format) case _ => throw new UnsupportedOperationException( s"Data source $className does not support streamed writing") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala index 25f88d9c39..0a3461151c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala @@ -64,6 +64,20 @@ abstract class OutputWriterFactory extends Serializable { bucketId: Option[Int], // TODO: This doesn't belong here... dataSchema: StructType, context: TaskAttemptContext): OutputWriter + + /** + * Returns a new instance of [[OutputWriter]] that will write data to the given path. + * This method gets called by each task on executor to write [[InternalRow]]s to + * format-specific files. Compared to the other `newInstance()`, this is a newer API that + * passes only the path that the writer must write to. The writer must write to the exact path + * and not modify it (do not add subdirectories, extensions, etc.). All other + * file-format-specific information needed to create the writer must be passed + * through the [[OutputWriterFactory]] implementation. + * @since 2.0.0 + */ + private[sql] def newWriter(path: String): OutputWriter = { + throw new UnsupportedOperationException("newInstance with just path not supported") + } } /** @@ -223,6 +237,20 @@ trait FileFormat { // Until then we guard in [[FileSourceStrategy]] to only call this method on supported formats. throw new UnsupportedOperationException(s"buildReader is not supported for $this") } + + /** + * Returns a [[OutputWriterFactory]] for generating output writers that can write data. + * This method is current used only by FileStreamSinkWriter to generate output writers that + * does not use output committers to write data. The OutputWriter generated by the returned + * [[OutputWriterFactory]] must implement the method `newWriter(path)`.. + */ + def buildWriter( + sqlContext: SQLContext, + dataSchema: StructType, + options: Map[String, String]): OutputWriterFactory = { + // TODO: Remove this default implementation when the other formats have been ported + throw new UnsupportedOperationException(s"buildWriter is not supported for $this") + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index b1513bbe94..79185df673 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -41,13 +41,13 @@ import org.apache.spark.SparkException import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.JoinedRow +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ -import org.apache.spark.sql.types.{AtomicType, DataType, StructType} +import org.apache.spark.sql.types._ import org.apache.spark.util.SerializableConfiguration private[sql] class DefaultSource @@ -372,8 +372,120 @@ private[sql] class DefaultSource } } } + + override def buildWriter( + sqlContext: SQLContext, + dataSchema: StructType, + options: Map[String, String]): OutputWriterFactory = { + new ParquetOutputWriterFactory( + sqlContext.conf, + dataSchema, + sqlContext.sparkContext.hadoopConfiguration, + options) + } } +/** + * A factory for generating OutputWriters for writing parquet files. This implemented is different + * from the [[ParquetOutputWriter]] as this does not use any [[OutputCommitter]]. It simply + * writes the data to the path used to generate the output writer. Callers of this factory + * has to ensure which files are to be considered as committed. + */ +private[sql] class ParquetOutputWriterFactory( + sqlConf: SQLConf, + dataSchema: StructType, + hadoopConf: Configuration, + options: Map[String, String]) extends OutputWriterFactory { + + private val serializableConf: SerializableConfiguration = { + val job = Job.getInstance(hadoopConf) + val conf = ContextUtil.getConfiguration(job) + val parquetOptions = new ParquetOptions(options, sqlConf) + + // We're not really using `ParquetOutputFormat[Row]` for writing data here, because we override + // it in `ParquetOutputWriter` to support appending and dynamic partitioning. The reason why + // we set it here is to setup the output committer class to `ParquetOutputCommitter`, which is + // bundled with `ParquetOutputFormat[Row]`. + job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]]) + + ParquetOutputFormat.setWriteSupportClass(job, classOf[CatalystWriteSupport]) + + // We want to clear this temporary metadata from saving into Parquet file. + // This metadata is only useful for detecting optional columns when pushdowning filters. + val dataSchemaToWrite = StructType.removeMetadata( + StructType.metadataKeyForOptionalField, + dataSchema).asInstanceOf[StructType] + CatalystWriteSupport.setSchema(dataSchemaToWrite, conf) + + // Sets flags for `CatalystSchemaConverter` (which converts Catalyst schema to Parquet schema) + // and `CatalystWriteSupport` (writing actual rows to Parquet files). + conf.set( + SQLConf.PARQUET_BINARY_AS_STRING.key, + sqlConf.isParquetBinaryAsString.toString) + + conf.set( + SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, + sqlConf.isParquetINT96AsTimestamp.toString) + + conf.set( + SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, + sqlConf.writeLegacyParquetFormat.toString) + + // Sets compression scheme + conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodec) + new SerializableConfiguration(conf) + } + + /** + * Returns a [[OutputWriter]] that writes data to the give path without using + * [[OutputCommitter]]. + */ + override private[sql] def newWriter(path: String): OutputWriter = new OutputWriter { + + // Create TaskAttemptContext that is used to pass on Configuration to the ParquetRecordWriter + private val hadoopTaskAttempId = new TaskAttemptID(new TaskID(new JobID, TaskType.MAP, 0), 0) + private val hadoopAttemptContext = new TaskAttemptContextImpl( + serializableConf.value, hadoopTaskAttempId) + + // Instance of ParquetRecordWriter that does not use OutputCommitter + private val recordWriter = createNoCommitterRecordWriter(path, hadoopAttemptContext) + + override def write(row: Row): Unit = { + throw new UnsupportedOperationException("call writeInternal") + } + + protected[sql] override def writeInternal(row: InternalRow): Unit = { + recordWriter.write(null, row) + } + + override def close(): Unit = recordWriter.close(hadoopAttemptContext) + } + + /** Create a [[ParquetRecordWriter]] that writes the given path without using OutputCommitter */ + private def createNoCommitterRecordWriter( + path: String, + hadoopAttemptContext: TaskAttemptContext): RecordWriter[Void, InternalRow] = { + // Custom ParquetOutputFormat that disable use of committer and writes to the given path + val outputFormat = new ParquetOutputFormat[InternalRow]() { + override def getOutputCommitter(c: TaskAttemptContext): OutputCommitter = { null } + override def getDefaultWorkFile(c: TaskAttemptContext, ext: String): Path = { new Path(path) } + } + outputFormat.getRecordWriter(hadoopAttemptContext) + } + + /** Disable the use of the older API. */ + def newInstance( + path: String, + bucketId: Option[Int], + dataSchema: StructType, + context: TaskAttemptContext): OutputWriter = { + throw new UnsupportedOperationException( + "this verison of newInstance not supported for " + + "ParquetOutputWriterFactory") + } +} + + // NOTE: This class is instantiated and used on executor side only, no need to be serializable. private[sql] class ParquetOutputWriter( path: String, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index 70aea7fa49..e191010329 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -19,11 +19,20 @@ package org.apache.spark.sql.execution.streaming import java.util.UUID +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path +import org.apache.spark.{SparkEnv, SparkException, TaskContext, TaskContextImpl} import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, SparkSession} -import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.UnsafeKVExternalSorter +import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriter, PartitioningUtils} +import org.apache.spark.sql.types.{StringType, StructType} +import org.apache.spark.util.SerializableConfiguration object FileStreamSink { // The name of the subdirectory that is used to store metadata about which files are valid. @@ -40,28 +49,24 @@ object FileStreamSink { class FileStreamSink( sparkSession: SparkSession, path: String, - fileFormat: FileFormat) extends Sink with Logging { + fileFormat: FileFormat, + partitionColumnNames: Seq[String], + options: Map[String, String]) extends Sink with Logging { private val basePath = new Path(path) private val logPath = new Path(basePath, FileStreamSink.metadataDir) private val fileLog = new FileStreamSinkLog(sparkSession, logPath.toUri.toString) - private val fs = basePath.getFileSystem(sparkSession.sessionState.newHadoopConf()) + private val hadoopConf = sparkSession.sessionState.newHadoopConf() + private val fs = basePath.getFileSystem(hadoopConf) override def addBatch(batchId: Long, data: DataFrame): Unit = { if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) { logInfo(s"Skipping already committed batch $batchId") } else { - val files = fs.listStatus(writeFiles(data)).map { f => - SinkFileStatus( - path = f.getPath.toUri.toString, - size = f.getLen, - isDir = f.isDirectory, - modificationTime = f.getModificationTime, - blockReplication = f.getReplication, - blockSize = f.getBlockSize, - action = FileStreamSinkLog.ADD_ACTION) - } - if (fileLog.add(batchId, files)) { + val writer = new FileStreamSinkWriter( + data, fileFormat, path, partitionColumnNames, hadoopConf, options) + val fileStatuses = writer.write() + if (fileLog.add(batchId, fileStatuses)) { logInfo(s"Committed batch $batchId") } else { throw new IllegalStateException(s"Race while writing batch $batchId") @@ -69,17 +74,192 @@ class FileStreamSink( } } - /** Writes the [[DataFrame]] to a UUID-named dir, returning the list of files paths. */ - private def writeFiles(data: DataFrame): Array[Path] = { - val file = new Path(basePath, UUID.randomUUID().toString).toUri.toString - data.write.parquet(file) - sparkSession.read - .schema(data.schema) - .parquet(file) - .inputFiles - .map(new Path(_)) - .filterNot(_.getName.startsWith("_")) + override def toString: String = s"FileSink[$path]" +} + + +/** + * Writes data given to a [[FileStreamSink]] to the given `basePath` in the given `fileFormat`, + * partitioned by the given `partitionColumnNames`. This writer always appends data to the + * directory if it already has data. + */ +class FileStreamSinkWriter( + data: DataFrame, + fileFormat: FileFormat, + basePath: String, + partitionColumnNames: Seq[String], + hadoopConf: Configuration, + options: Map[String, String]) extends Serializable with Logging { + + PartitioningUtils.validatePartitionColumnDataTypes( + data.schema, partitionColumnNames, data.sqlContext.conf.caseSensitiveAnalysis) + + private val serializableConf = new SerializableConfiguration(hadoopConf) + private val dataSchema = data.schema + private val dataColumns = data.logicalPlan.output + + // Get the actual partition columns as attributes after matching them by name with + // the given columns names. + private val partitionColumns = partitionColumnNames.map { col => + val nameEquality = if (data.sparkSession.sessionState.conf.caseSensitiveAnalysis) { + org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution + } else { + org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution + } + data.logicalPlan.output.find(f => nameEquality(f.name, col)).getOrElse { + throw new RuntimeException(s"Partition column $col not found in schema $dataSchema") + } + } + + // Columns that are to be written to the files. If there are partitioning columns, then + // those will not be written to the files. + private val writeColumns = { + val partitionSet = AttributeSet(partitionColumns) + dataColumns.filterNot(partitionSet.contains) } - override def toString: String = s"FileSink[$path]" + // An OutputWriterFactory for generating writers in the executors for writing the files. + private val outputWriterFactory = + fileFormat.buildWriter(data.sqlContext, writeColumns.toStructType, options) + + /** Expressions that given a partition key build a string like: col1=val/col2=val/... */ + private def partitionStringExpression: Seq[Expression] = { + partitionColumns.zipWithIndex.flatMap { case (c, i) => + val escaped = + ScalaUDF( + PartitioningUtils.escapePathName _, + StringType, + Seq(Cast(c, StringType)), + Seq(StringType)) + val str = If(IsNull(c), Literal(PartitioningUtils.DEFAULT_PARTITION_NAME), escaped) + val partitionName = Literal(c.name + "=") :: str :: Nil + if (i == 0) partitionName else Literal(Path.SEPARATOR) :: partitionName + } + } + + /** Generate a new output writer from the writer factory */ + private def newOutputWriter(path: Path): OutputWriter = { + val newWriter = outputWriterFactory.newWriter(path.toString) + newWriter.initConverter(dataSchema) + newWriter + } + + /** Write the dataframe to files. This gets called in the driver by the [[FileStreamSink]]. */ + def write(): Array[SinkFileStatus] = { + data.sqlContext.sparkContext.runJob( + data.queryExecution.toRdd, + (taskContext: TaskContext, iterator: Iterator[InternalRow]) => { + if (partitionColumns.isEmpty) { + Seq(writePartitionToSingleFile(iterator)) + } else { + writePartitionToPartitionedFiles(iterator) + } + }).flatten + } + + /** + * Writes a RDD partition to a single file without dynamic partitioning. + * This gets called in the executor, and it uses a [[OutputWriter]] to write the data. + */ + def writePartitionToSingleFile(iterator: Iterator[InternalRow]): SinkFileStatus = { + var writer: OutputWriter = null + try { + val path = new Path(basePath, UUID.randomUUID.toString) + val fs = path.getFileSystem(serializableConf.value) + writer = newOutputWriter(path) + while (iterator.hasNext) { + writer.writeInternal(iterator.next) + } + writer.close() + writer = null + SinkFileStatus(fs.getFileStatus(path)) + } catch { + case cause: Throwable => + logError("Aborting task.", cause) + // call failure callbacks first, so we could have a chance to cleanup the writer. + TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(cause) + throw new SparkException("Task failed while writing rows.", cause) + } finally { + if (writer != null) { + writer.close() + } + } + } + + /** + * Writes a RDD partition to multiple dynamically partitioned files. + * This gets called in the executor. It first sorts the data based on the partitioning columns + * and then writes the data of each key to separate files using [[OutputWriter]]s. + */ + def writePartitionToPartitionedFiles(iterator: Iterator[InternalRow]): Seq[SinkFileStatus] = { + + // Returns the partitioning columns for sorting + val getSortingKey = UnsafeProjection.create(partitionColumns, dataColumns) + + // Returns the data columns to be written given an input row + val getOutputRow = UnsafeProjection.create(writeColumns, dataColumns) + + // Returns the partition path given a partition key + val getPartitionString = + UnsafeProjection.create(Concat(partitionStringExpression) :: Nil, partitionColumns) + + // Sort the data before write, so that we only need one writer at the same time. + val sorter = new UnsafeKVExternalSorter( + partitionColumns.toStructType, + StructType.fromAttributes(writeColumns), + SparkEnv.get.blockManager, + SparkEnv.get.serializerManager, + TaskContext.get().taskMemoryManager().pageSizeBytes) + + while (iterator.hasNext) { + val currentRow = iterator.next() + sorter.insertKV(getSortingKey(currentRow), getOutputRow(currentRow)) + } + logDebug(s"Sorting complete. Writing out partition files one at a time.") + + val sortedIterator = sorter.sortedIterator() + val paths = new ArrayBuffer[Path] + + // Write the sorted data to partitioned files, one for each unique key + var currentWriter: OutputWriter = null + try { + var currentKey: UnsafeRow = null + while (sortedIterator.next()) { + val nextKey = sortedIterator.getKey + + // If key changes, close current writer, and open a new writer to a new partitioned file + if (currentKey != nextKey) { + if (currentWriter != null) { + currentWriter.close() + currentWriter = null + } + currentKey = nextKey.copy() + val partitionPath = getPartitionString(currentKey).getString(0) + val path = new Path(new Path(basePath, partitionPath), UUID.randomUUID.toString) + paths += path + currentWriter = newOutputWriter(path) + logInfo(s"Writing partition $currentKey to $path") + } + currentWriter.writeInternal(sortedIterator.getValue) + } + if (currentWriter != null) { + currentWriter.close() + currentWriter = null + } + if (paths.nonEmpty) { + val fs = paths.head.getFileSystem(serializableConf.value) + paths.map(p => SinkFileStatus(fs.getFileStatus(p))) + } else Seq.empty + } catch { + case cause: Throwable => + logError("Aborting task.", cause) + // call failure callbacks first, so we could have a chance to cleanup the writer. + TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(cause) + throw new SparkException("Task failed while writing rows.", cause) + } finally { + if (currentWriter != null) { + currentWriter.close() + } + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala index b694b6155a..4254df44c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala @@ -54,6 +54,19 @@ case class SinkFileStatus( } } +object SinkFileStatus { + def apply(f: FileStatus): SinkFileStatus = { + SinkFileStatus( + path = f.getPath.toUri.toString, + size = f.getLen, + isDir = f.isDirectory, + modificationTime = f.getModificationTime, + blockReplication = f.getReplication, + blockSize = f.getBlockSize, + action = FileStreamSinkLog.ADD_ACTION) + } +} + /** * A special log for [[FileStreamSink]]. It will write one log file for each batch. The first line * of the log file is the version number, and there are multiple JSON lines following. Each JSON diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index 9fe06a6c36..fca3d51535 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -216,8 +216,9 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String) new FileContextManager(metadataPath, hadoopConf) } catch { case e: UnsupportedFileSystemException => - logWarning("Could not use FileContext API for managing metadata log file. The log may be" + - "inconsistent under failures.", e) + logWarning("Could not use FileContext API for managing metadata log files at path " + + s"$metadataPath. Using FileSystem API instead for managing log files. The log may be " + + s"inconsistent under failures.") new FileSystemManager(metadataPath, hadoopConf) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index 8cf5dedabc..609ca976a0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -17,33 +17,223 @@ package org.apache.spark.sql.streaming -import org.apache.spark.sql.StreamTest -import org.apache.spark.sql.execution.streaming.MemoryStream +import java.io.File + +import org.apache.commons.io.FileUtils +import org.apache.commons.io.filefilter.{DirectoryFileFilter, RegexFileFilter} + +import org.apache.spark.sql.{ContinuousQuery, Row, StreamTest} +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.execution.datasources.parquet +import org.apache.spark.sql.execution.streaming.{FileStreamSinkWriter, MemoryStream} +import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.Utils class FileStreamSinkSuite extends StreamTest with SharedSQLContext { import testImplicits._ - test("unpartitioned writing") { + + test("FileStreamSinkWriter - unpartitioned data") { + val path = Utils.createTempDir() + path.delete() + + val hadoopConf = sqlContext.sparkContext.hadoopConfiguration + val fileFormat = new parquet.DefaultSource() + + def writeRange(start: Int, end: Int, numPartitions: Int): Seq[String] = { + val df = sqlContext + .range(start, end, 1, numPartitions) + .select($"id", lit(100).as("data")) + val writer = new FileStreamSinkWriter( + df, fileFormat, path.toString, partitionColumnNames = Nil, hadoopConf, Map.empty) + writer.write().map(_.path.stripPrefix("file://")) + } + + // Write and check whether new files are written correctly + val files1 = writeRange(0, 10, 2) + assert(files1.size === 2, s"unexpected number of files: $files1") + checkFilesExist(path, files1, "file not written") + checkAnswer(sqlContext.read.load(path.getCanonicalPath), (0 until 10).map(Row(_, 100))) + + // Append and check whether new files are written correctly and old files still exist + val files2 = writeRange(10, 20, 3) + assert(files2.size === 3, s"unexpected number of files: $files2") + assert(files2.intersect(files1).isEmpty, "old files returned") + checkFilesExist(path, files2, s"New file not written") + checkFilesExist(path, files1, s"Old file not found") + checkAnswer(sqlContext.read.load(path.getCanonicalPath), (0 until 20).map(Row(_, 100))) + } + + test("FileStreamSinkWriter - partitioned data") { + implicit val e = ExpressionEncoder[java.lang.Long] + val path = Utils.createTempDir() + path.delete() + + val hadoopConf = sqlContext.sparkContext.hadoopConfiguration + val fileFormat = new parquet.DefaultSource() + + def writeRange(start: Int, end: Int, numPartitions: Int): Seq[String] = { + val df = sqlContext + .range(start, end, 1, numPartitions) + .flatMap(x => Iterator(x, x, x)).toDF("id") + .select($"id", lit(100).as("data1"), lit(1000).as("data2")) + + require(df.rdd.partitions.size === numPartitions) + val writer = new FileStreamSinkWriter( + df, fileFormat, path.toString, partitionColumnNames = Seq("id"), hadoopConf, Map.empty) + writer.write().map(_.path.stripPrefix("file://")) + } + + def checkOneFileWrittenPerKey(keys: Seq[Int], filesWritten: Seq[String]): Unit = { + keys.foreach { id => + assert( + filesWritten.count(_.contains(s"/id=$id/")) == 1, + s"no file for id=$id. all files: \n\t${filesWritten.mkString("\n\t")}" + ) + } + } + + // Write and check whether new files are written correctly + val files1 = writeRange(0, 10, 2) + assert(files1.size === 10, s"unexpected number of files:\n${files1.mkString("\n")}") + checkFilesExist(path, files1, "file not written") + checkOneFileWrittenPerKey(0 until 10, files1) + + val answer1 = (0 until 10).flatMap(x => Iterator(x, x, x)).map(Row(100, 1000, _)) + checkAnswer(sqlContext.read.load(path.getCanonicalPath), answer1) + + // Append and check whether new files are written correctly and old files still exist + val files2 = writeRange(0, 20, 3) + assert(files2.size === 20, s"unexpected number of files:\n${files2.mkString("\n")}") + assert(files2.intersect(files1).isEmpty, "old files returned") + checkFilesExist(path, files2, s"New file not written") + checkFilesExist(path, files1, s"Old file not found") + checkOneFileWrittenPerKey(0 until 20, files2) + + val answer2 = (0 until 20).flatMap(x => Iterator(x, x, x)).map(Row(100, 1000, _)) + checkAnswer(sqlContext.read.load(path.getCanonicalPath), answer1 ++ answer2) + } + + test("FileStreamSink - unpartitioned writing and batch reading") { val inputData = MemoryStream[Int] val df = inputData.toDF() val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath - val query = - df.write - .format("parquet") - .option("checkpointLocation", checkpointDir) - .startStream(outputDir) + var query: ContinuousQuery = null + + try { + query = + df.write + .format("parquet") + .option("checkpointLocation", checkpointDir) + .startStream(outputDir) + + inputData.addData(1, 2, 3) + + failAfter(streamingTimeout) { + query.processAllAvailable() + } - inputData.addData(1, 2, 3) - failAfter(streamingTimeout) { query.processAllAvailable() } + val outputDf = sqlContext.read.parquet(outputDir).as[Int] + checkDataset(outputDf, 1, 2, 3) - val outputDf = sqlContext.read.parquet(outputDir).as[Int] - checkDataset( - outputDf, - 1, 2, 3) + } finally { + if (query != null) { + query.stop() + } + } } + + test("FileStreamSink - partitioned writing and batch reading [IGNORES PARTITION COLUMN]") { + val inputData = MemoryStream[Int] + val ds = inputData.toDS() + + val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath + val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath + + var query: ContinuousQuery = null + + try { + query = + ds.map(i => (i, i * 1000)) + .toDF("id", "value") + .write + .format("parquet") + .partitionBy("id") + .option("checkpointLocation", checkpointDir) + .startStream(outputDir) + + inputData.addData(1, 2, 3) + failAfter(streamingTimeout) { + query.processAllAvailable() + } + + // TODO (tdas): Test partition column can be read or not + val outputDf = sqlContext.read.parquet(outputDir) + checkDataset( + outputDf.as[Int], + 1000, 2000, 3000) + + } finally { + if (query != null) { + query.stop() + } + } + } + + test("FileStreamSink - supported formats") { + def testFormat(format: Option[String]): Unit = { + val inputData = MemoryStream[Int] + val ds = inputData.toDS() + + val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath + val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath + + var query: ContinuousQuery = null + + try { + val writer = + ds.map(i => (i, i * 1000)) + .toDF("id", "value") + .write + if (format.nonEmpty) { + writer.format(format.get) + } + query = writer + .option("checkpointLocation", checkpointDir) + .startStream(outputDir) + } finally { + if (query != null) { + query.stop() + } + } + } + + testFormat(None) // should not throw error as default format parquet when not specified + testFormat(Some("parquet")) + val e = intercept[UnsupportedOperationException] { + testFormat(Some("text")) + } + Seq("text", "not support", "stream").foreach { s => + assert(e.getMessage.contains(s)) + } + } + + private def checkFilesExist(dir: File, expectedFiles: Seq[String], msg: String): Unit = { + import scala.collection.JavaConverters._ + val files = + FileUtils.listFiles(dir, new RegexFileFilter("[^.]+"), DirectoryFileFilter.DIRECTORY) + .asScala + .map(_.getCanonicalPath) + .toSet + + expectedFiles.foreach { f => + assert(files.contains(f), + s"\n$msg\nexpected file:\n\t$f\nfound files:\n${files.mkString("\n\t")}") + } + } + } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala index 5b49a0a86a..50703e532f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala @@ -41,7 +41,15 @@ import org.apache.spark.util.Utils class FileStressSuite extends StreamTest with SharedSQLContext { import testImplicits._ - test("fault tolerance stress test") { + testQuietly("fault tolerance stress test - unpartitioned output") { + stressTest(partitionWrites = false) + } + + testQuietly("fault tolerance stress test - partitioned output") { + stressTest(partitionWrites = true) + } + + def stressTest(partitionWrites: Boolean): Unit = { val numRecords = 10000 val inputDir = Utils.createTempDir(namePrefix = "stream.input").getCanonicalPath val stagingDir = Utils.createTempDir(namePrefix = "stream.staging").getCanonicalPath @@ -93,18 +101,36 @@ class FileStressSuite extends StreamTest with SharedSQLContext { writer.start() val input = sqlContext.read.format("text").stream(inputDir) - def startStream(): ContinuousQuery = input + + def startStream(): ContinuousQuery = { + val output = input .repartition(5) .as[String] .mapPartitions { iter => val rand = Random.nextInt(100) - if (rand < 5) { sys.error("failure") } + if (rand < 10) { + sys.error("failure") + } iter.map(_.toLong) } - .write - .format("parquet") - .option("checkpointLocation", checkpoint) - .startStream(outputDir) + .map(x => (x % 400, x.toString)) + .toDF("id", "data") + + if (partitionWrites) { + output + .write + .partitionBy("id") + .format("parquet") + .option("checkpointLocation", checkpoint) + .startStream(outputDir) + } else { + output + .write + .format("parquet") + .option("checkpointLocation", checkpoint) + .startStream(outputDir) + } + } var failures = 0 val streamThread = new Thread("stream runner") { |