aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--python/pyspark/sql/tests.py4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala28
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala116
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala230
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala13
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala218
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala40
9 files changed, 605 insertions, 54 deletions
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 4995b263e1..cd5c4a7b3e 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -940,7 +940,7 @@ class SQLTests(ReusedPySparkTestCase):
cq.processAllAvailable()
output_files = []
for _, _, files in os.walk(out):
- output_files.extend([f for f in files if 'parquet' in f and not f.startswith('.')])
+ output_files.extend([f for f in files if not f.startswith('.')])
self.assertTrue(len(output_files) > 0)
self.assertTrue(len(os.listdir(chk)) > 0)
finally:
@@ -967,7 +967,7 @@ class SQLTests(ReusedPySparkTestCase):
cq.processAllAvailable()
output_files = []
for _, _, files in os.walk(out):
- output_files.extend([f for f in files if 'parquet' in f and not f.startswith('.')])
+ output_files.extend([f for f in files if not f.startswith('.')])
self.assertTrue(len(output_files) > 0)
self.assertTrue(len(os.listdir(chk)) > 0)
self.assertFalse(os.path.isdir(fake1)) # should not have been created
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 63dc1fd71e..6114142cef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -203,13 +203,14 @@ case class DataSource(
def createSink(): Sink = {
providingClass.newInstance() match {
case s: StreamSinkProvider => s.createSink(sparkSession.wrapped, options, partitionColumns)
- case format: FileFormat =>
+
+ case parquet: parquet.DefaultSource =>
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val path = caseInsensitiveOptions.getOrElse("path", {
throw new IllegalArgumentException("'path' is not specified")
})
+ new FileStreamSink(sparkSession, path, parquet, partitionColumns, options)
- new FileStreamSink(sparkSession, path, format)
case _ =>
throw new UnsupportedOperationException(
s"Data source $className does not support streamed writing")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index 25f88d9c39..0a3461151c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -64,6 +64,20 @@ abstract class OutputWriterFactory extends Serializable {
bucketId: Option[Int], // TODO: This doesn't belong here...
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter
+
+ /**
+ * Returns a new instance of [[OutputWriter]] that will write data to the given path.
+ * This method gets called by each task on executor to write [[InternalRow]]s to
+ * format-specific files. Compared to the other `newInstance()`, this is a newer API that
+ * passes only the path that the writer must write to. The writer must write to the exact path
+ * and not modify it (do not add subdirectories, extensions, etc.). All other
+ * file-format-specific information needed to create the writer must be passed
+ * through the [[OutputWriterFactory]] implementation.
+ * @since 2.0.0
+ */
+ private[sql] def newWriter(path: String): OutputWriter = {
+ throw new UnsupportedOperationException("newInstance with just path not supported")
+ }
}
/**
@@ -223,6 +237,20 @@ trait FileFormat {
// Until then we guard in [[FileSourceStrategy]] to only call this method on supported formats.
throw new UnsupportedOperationException(s"buildReader is not supported for $this")
}
+
+ /**
+ * Returns a [[OutputWriterFactory]] for generating output writers that can write data.
+ * This method is current used only by FileStreamSinkWriter to generate output writers that
+ * does not use output committers to write data. The OutputWriter generated by the returned
+ * [[OutputWriterFactory]] must implement the method `newWriter(path)`..
+ */
+ def buildWriter(
+ sqlContext: SQLContext,
+ dataSchema: StructType,
+ options: Map[String, String]): OutputWriterFactory = {
+ // TODO: Remove this default implementation when the other formats have been ported
+ throw new UnsupportedOperationException(s"buildWriter is not supported for $this")
+ }
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index b1513bbe94..79185df673 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -41,13 +41,13 @@ import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.JoinedRow
+import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.{AtomicType, DataType, StructType}
+import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration
private[sql] class DefaultSource
@@ -372,8 +372,120 @@ private[sql] class DefaultSource
}
}
}
+
+ override def buildWriter(
+ sqlContext: SQLContext,
+ dataSchema: StructType,
+ options: Map[String, String]): OutputWriterFactory = {
+ new ParquetOutputWriterFactory(
+ sqlContext.conf,
+ dataSchema,
+ sqlContext.sparkContext.hadoopConfiguration,
+ options)
+ }
}
+/**
+ * A factory for generating OutputWriters for writing parquet files. This implemented is different
+ * from the [[ParquetOutputWriter]] as this does not use any [[OutputCommitter]]. It simply
+ * writes the data to the path used to generate the output writer. Callers of this factory
+ * has to ensure which files are to be considered as committed.
+ */
+private[sql] class ParquetOutputWriterFactory(
+ sqlConf: SQLConf,
+ dataSchema: StructType,
+ hadoopConf: Configuration,
+ options: Map[String, String]) extends OutputWriterFactory {
+
+ private val serializableConf: SerializableConfiguration = {
+ val job = Job.getInstance(hadoopConf)
+ val conf = ContextUtil.getConfiguration(job)
+ val parquetOptions = new ParquetOptions(options, sqlConf)
+
+ // We're not really using `ParquetOutputFormat[Row]` for writing data here, because we override
+ // it in `ParquetOutputWriter` to support appending and dynamic partitioning. The reason why
+ // we set it here is to setup the output committer class to `ParquetOutputCommitter`, which is
+ // bundled with `ParquetOutputFormat[Row]`.
+ job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]])
+
+ ParquetOutputFormat.setWriteSupportClass(job, classOf[CatalystWriteSupport])
+
+ // We want to clear this temporary metadata from saving into Parquet file.
+ // This metadata is only useful for detecting optional columns when pushdowning filters.
+ val dataSchemaToWrite = StructType.removeMetadata(
+ StructType.metadataKeyForOptionalField,
+ dataSchema).asInstanceOf[StructType]
+ CatalystWriteSupport.setSchema(dataSchemaToWrite, conf)
+
+ // Sets flags for `CatalystSchemaConverter` (which converts Catalyst schema to Parquet schema)
+ // and `CatalystWriteSupport` (writing actual rows to Parquet files).
+ conf.set(
+ SQLConf.PARQUET_BINARY_AS_STRING.key,
+ sqlConf.isParquetBinaryAsString.toString)
+
+ conf.set(
+ SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
+ sqlConf.isParquetINT96AsTimestamp.toString)
+
+ conf.set(
+ SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key,
+ sqlConf.writeLegacyParquetFormat.toString)
+
+ // Sets compression scheme
+ conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodec)
+ new SerializableConfiguration(conf)
+ }
+
+ /**
+ * Returns a [[OutputWriter]] that writes data to the give path without using
+ * [[OutputCommitter]].
+ */
+ override private[sql] def newWriter(path: String): OutputWriter = new OutputWriter {
+
+ // Create TaskAttemptContext that is used to pass on Configuration to the ParquetRecordWriter
+ private val hadoopTaskAttempId = new TaskAttemptID(new TaskID(new JobID, TaskType.MAP, 0), 0)
+ private val hadoopAttemptContext = new TaskAttemptContextImpl(
+ serializableConf.value, hadoopTaskAttempId)
+
+ // Instance of ParquetRecordWriter that does not use OutputCommitter
+ private val recordWriter = createNoCommitterRecordWriter(path, hadoopAttemptContext)
+
+ override def write(row: Row): Unit = {
+ throw new UnsupportedOperationException("call writeInternal")
+ }
+
+ protected[sql] override def writeInternal(row: InternalRow): Unit = {
+ recordWriter.write(null, row)
+ }
+
+ override def close(): Unit = recordWriter.close(hadoopAttemptContext)
+ }
+
+ /** Create a [[ParquetRecordWriter]] that writes the given path without using OutputCommitter */
+ private def createNoCommitterRecordWriter(
+ path: String,
+ hadoopAttemptContext: TaskAttemptContext): RecordWriter[Void, InternalRow] = {
+ // Custom ParquetOutputFormat that disable use of committer and writes to the given path
+ val outputFormat = new ParquetOutputFormat[InternalRow]() {
+ override def getOutputCommitter(c: TaskAttemptContext): OutputCommitter = { null }
+ override def getDefaultWorkFile(c: TaskAttemptContext, ext: String): Path = { new Path(path) }
+ }
+ outputFormat.getRecordWriter(hadoopAttemptContext)
+ }
+
+ /** Disable the use of the older API. */
+ def newInstance(
+ path: String,
+ bucketId: Option[Int],
+ dataSchema: StructType,
+ context: TaskAttemptContext): OutputWriter = {
+ throw new UnsupportedOperationException(
+ "this verison of newInstance not supported for " +
+ "ParquetOutputWriterFactory")
+ }
+}
+
+
// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
private[sql] class ParquetOutputWriter(
path: String,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index 70aea7fa49..e191010329 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -19,11 +19,20 @@ package org.apache.spark.sql.execution.streaming
import java.util.UUID
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
+import org.apache.spark.{SparkEnv, SparkException, TaskContext, TaskContextImpl}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SparkSession}
-import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.UnsafeKVExternalSorter
+import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriter, PartitioningUtils}
+import org.apache.spark.sql.types.{StringType, StructType}
+import org.apache.spark.util.SerializableConfiguration
object FileStreamSink {
// The name of the subdirectory that is used to store metadata about which files are valid.
@@ -40,28 +49,24 @@ object FileStreamSink {
class FileStreamSink(
sparkSession: SparkSession,
path: String,
- fileFormat: FileFormat) extends Sink with Logging {
+ fileFormat: FileFormat,
+ partitionColumnNames: Seq[String],
+ options: Map[String, String]) extends Sink with Logging {
private val basePath = new Path(path)
private val logPath = new Path(basePath, FileStreamSink.metadataDir)
private val fileLog = new FileStreamSinkLog(sparkSession, logPath.toUri.toString)
- private val fs = basePath.getFileSystem(sparkSession.sessionState.newHadoopConf())
+ private val hadoopConf = sparkSession.sessionState.newHadoopConf()
+ private val fs = basePath.getFileSystem(hadoopConf)
override def addBatch(batchId: Long, data: DataFrame): Unit = {
if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
logInfo(s"Skipping already committed batch $batchId")
} else {
- val files = fs.listStatus(writeFiles(data)).map { f =>
- SinkFileStatus(
- path = f.getPath.toUri.toString,
- size = f.getLen,
- isDir = f.isDirectory,
- modificationTime = f.getModificationTime,
- blockReplication = f.getReplication,
- blockSize = f.getBlockSize,
- action = FileStreamSinkLog.ADD_ACTION)
- }
- if (fileLog.add(batchId, files)) {
+ val writer = new FileStreamSinkWriter(
+ data, fileFormat, path, partitionColumnNames, hadoopConf, options)
+ val fileStatuses = writer.write()
+ if (fileLog.add(batchId, fileStatuses)) {
logInfo(s"Committed batch $batchId")
} else {
throw new IllegalStateException(s"Race while writing batch $batchId")
@@ -69,17 +74,192 @@ class FileStreamSink(
}
}
- /** Writes the [[DataFrame]] to a UUID-named dir, returning the list of files paths. */
- private def writeFiles(data: DataFrame): Array[Path] = {
- val file = new Path(basePath, UUID.randomUUID().toString).toUri.toString
- data.write.parquet(file)
- sparkSession.read
- .schema(data.schema)
- .parquet(file)
- .inputFiles
- .map(new Path(_))
- .filterNot(_.getName.startsWith("_"))
+ override def toString: String = s"FileSink[$path]"
+}
+
+
+/**
+ * Writes data given to a [[FileStreamSink]] to the given `basePath` in the given `fileFormat`,
+ * partitioned by the given `partitionColumnNames`. This writer always appends data to the
+ * directory if it already has data.
+ */
+class FileStreamSinkWriter(
+ data: DataFrame,
+ fileFormat: FileFormat,
+ basePath: String,
+ partitionColumnNames: Seq[String],
+ hadoopConf: Configuration,
+ options: Map[String, String]) extends Serializable with Logging {
+
+ PartitioningUtils.validatePartitionColumnDataTypes(
+ data.schema, partitionColumnNames, data.sqlContext.conf.caseSensitiveAnalysis)
+
+ private val serializableConf = new SerializableConfiguration(hadoopConf)
+ private val dataSchema = data.schema
+ private val dataColumns = data.logicalPlan.output
+
+ // Get the actual partition columns as attributes after matching them by name with
+ // the given columns names.
+ private val partitionColumns = partitionColumnNames.map { col =>
+ val nameEquality = if (data.sparkSession.sessionState.conf.caseSensitiveAnalysis) {
+ org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
+ } else {
+ org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution
+ }
+ data.logicalPlan.output.find(f => nameEquality(f.name, col)).getOrElse {
+ throw new RuntimeException(s"Partition column $col not found in schema $dataSchema")
+ }
+ }
+
+ // Columns that are to be written to the files. If there are partitioning columns, then
+ // those will not be written to the files.
+ private val writeColumns = {
+ val partitionSet = AttributeSet(partitionColumns)
+ dataColumns.filterNot(partitionSet.contains)
}
- override def toString: String = s"FileSink[$path]"
+ // An OutputWriterFactory for generating writers in the executors for writing the files.
+ private val outputWriterFactory =
+ fileFormat.buildWriter(data.sqlContext, writeColumns.toStructType, options)
+
+ /** Expressions that given a partition key build a string like: col1=val/col2=val/... */
+ private def partitionStringExpression: Seq[Expression] = {
+ partitionColumns.zipWithIndex.flatMap { case (c, i) =>
+ val escaped =
+ ScalaUDF(
+ PartitioningUtils.escapePathName _,
+ StringType,
+ Seq(Cast(c, StringType)),
+ Seq(StringType))
+ val str = If(IsNull(c), Literal(PartitioningUtils.DEFAULT_PARTITION_NAME), escaped)
+ val partitionName = Literal(c.name + "=") :: str :: Nil
+ if (i == 0) partitionName else Literal(Path.SEPARATOR) :: partitionName
+ }
+ }
+
+ /** Generate a new output writer from the writer factory */
+ private def newOutputWriter(path: Path): OutputWriter = {
+ val newWriter = outputWriterFactory.newWriter(path.toString)
+ newWriter.initConverter(dataSchema)
+ newWriter
+ }
+
+ /** Write the dataframe to files. This gets called in the driver by the [[FileStreamSink]]. */
+ def write(): Array[SinkFileStatus] = {
+ data.sqlContext.sparkContext.runJob(
+ data.queryExecution.toRdd,
+ (taskContext: TaskContext, iterator: Iterator[InternalRow]) => {
+ if (partitionColumns.isEmpty) {
+ Seq(writePartitionToSingleFile(iterator))
+ } else {
+ writePartitionToPartitionedFiles(iterator)
+ }
+ }).flatten
+ }
+
+ /**
+ * Writes a RDD partition to a single file without dynamic partitioning.
+ * This gets called in the executor, and it uses a [[OutputWriter]] to write the data.
+ */
+ def writePartitionToSingleFile(iterator: Iterator[InternalRow]): SinkFileStatus = {
+ var writer: OutputWriter = null
+ try {
+ val path = new Path(basePath, UUID.randomUUID.toString)
+ val fs = path.getFileSystem(serializableConf.value)
+ writer = newOutputWriter(path)
+ while (iterator.hasNext) {
+ writer.writeInternal(iterator.next)
+ }
+ writer.close()
+ writer = null
+ SinkFileStatus(fs.getFileStatus(path))
+ } catch {
+ case cause: Throwable =>
+ logError("Aborting task.", cause)
+ // call failure callbacks first, so we could have a chance to cleanup the writer.
+ TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(cause)
+ throw new SparkException("Task failed while writing rows.", cause)
+ } finally {
+ if (writer != null) {
+ writer.close()
+ }
+ }
+ }
+
+ /**
+ * Writes a RDD partition to multiple dynamically partitioned files.
+ * This gets called in the executor. It first sorts the data based on the partitioning columns
+ * and then writes the data of each key to separate files using [[OutputWriter]]s.
+ */
+ def writePartitionToPartitionedFiles(iterator: Iterator[InternalRow]): Seq[SinkFileStatus] = {
+
+ // Returns the partitioning columns for sorting
+ val getSortingKey = UnsafeProjection.create(partitionColumns, dataColumns)
+
+ // Returns the data columns to be written given an input row
+ val getOutputRow = UnsafeProjection.create(writeColumns, dataColumns)
+
+ // Returns the partition path given a partition key
+ val getPartitionString =
+ UnsafeProjection.create(Concat(partitionStringExpression) :: Nil, partitionColumns)
+
+ // Sort the data before write, so that we only need one writer at the same time.
+ val sorter = new UnsafeKVExternalSorter(
+ partitionColumns.toStructType,
+ StructType.fromAttributes(writeColumns),
+ SparkEnv.get.blockManager,
+ SparkEnv.get.serializerManager,
+ TaskContext.get().taskMemoryManager().pageSizeBytes)
+
+ while (iterator.hasNext) {
+ val currentRow = iterator.next()
+ sorter.insertKV(getSortingKey(currentRow), getOutputRow(currentRow))
+ }
+ logDebug(s"Sorting complete. Writing out partition files one at a time.")
+
+ val sortedIterator = sorter.sortedIterator()
+ val paths = new ArrayBuffer[Path]
+
+ // Write the sorted data to partitioned files, one for each unique key
+ var currentWriter: OutputWriter = null
+ try {
+ var currentKey: UnsafeRow = null
+ while (sortedIterator.next()) {
+ val nextKey = sortedIterator.getKey
+
+ // If key changes, close current writer, and open a new writer to a new partitioned file
+ if (currentKey != nextKey) {
+ if (currentWriter != null) {
+ currentWriter.close()
+ currentWriter = null
+ }
+ currentKey = nextKey.copy()
+ val partitionPath = getPartitionString(currentKey).getString(0)
+ val path = new Path(new Path(basePath, partitionPath), UUID.randomUUID.toString)
+ paths += path
+ currentWriter = newOutputWriter(path)
+ logInfo(s"Writing partition $currentKey to $path")
+ }
+ currentWriter.writeInternal(sortedIterator.getValue)
+ }
+ if (currentWriter != null) {
+ currentWriter.close()
+ currentWriter = null
+ }
+ if (paths.nonEmpty) {
+ val fs = paths.head.getFileSystem(serializableConf.value)
+ paths.map(p => SinkFileStatus(fs.getFileStatus(p)))
+ } else Seq.empty
+ } catch {
+ case cause: Throwable =>
+ logError("Aborting task.", cause)
+ // call failure callbacks first, so we could have a chance to cleanup the writer.
+ TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(cause)
+ throw new SparkException("Task failed while writing rows.", cause)
+ } finally {
+ if (currentWriter != null) {
+ currentWriter.close()
+ }
+ }
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
index b694b6155a..4254df44c9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
@@ -54,6 +54,19 @@ case class SinkFileStatus(
}
}
+object SinkFileStatus {
+ def apply(f: FileStatus): SinkFileStatus = {
+ SinkFileStatus(
+ path = f.getPath.toUri.toString,
+ size = f.getLen,
+ isDir = f.isDirectory,
+ modificationTime = f.getModificationTime,
+ blockReplication = f.getReplication,
+ blockSize = f.getBlockSize,
+ action = FileStreamSinkLog.ADD_ACTION)
+ }
+}
+
/**
* A special log for [[FileStreamSink]]. It will write one log file for each batch. The first line
* of the log file is the version number, and there are multiple JSON lines following. Each JSON
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index 9fe06a6c36..fca3d51535 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -216,8 +216,9 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
new FileContextManager(metadataPath, hadoopConf)
} catch {
case e: UnsupportedFileSystemException =>
- logWarning("Could not use FileContext API for managing metadata log file. The log may be" +
- "inconsistent under failures.", e)
+ logWarning("Could not use FileContext API for managing metadata log files at path " +
+ s"$metadataPath. Using FileSystem API instead for managing log files. The log may be " +
+ s"inconsistent under failures.")
new FileSystemManager(metadataPath, hadoopConf)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 8cf5dedabc..609ca976a0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -17,33 +17,223 @@
package org.apache.spark.sql.streaming
-import org.apache.spark.sql.StreamTest
-import org.apache.spark.sql.execution.streaming.MemoryStream
+import java.io.File
+
+import org.apache.commons.io.FileUtils
+import org.apache.commons.io.filefilter.{DirectoryFileFilter, RegexFileFilter}
+
+import org.apache.spark.sql.{ContinuousQuery, Row, StreamTest}
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.execution.datasources.parquet
+import org.apache.spark.sql.execution.streaming.{FileStreamSinkWriter, MemoryStream}
+import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.Utils
class FileStreamSinkSuite extends StreamTest with SharedSQLContext {
import testImplicits._
- test("unpartitioned writing") {
+
+ test("FileStreamSinkWriter - unpartitioned data") {
+ val path = Utils.createTempDir()
+ path.delete()
+
+ val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
+ val fileFormat = new parquet.DefaultSource()
+
+ def writeRange(start: Int, end: Int, numPartitions: Int): Seq[String] = {
+ val df = sqlContext
+ .range(start, end, 1, numPartitions)
+ .select($"id", lit(100).as("data"))
+ val writer = new FileStreamSinkWriter(
+ df, fileFormat, path.toString, partitionColumnNames = Nil, hadoopConf, Map.empty)
+ writer.write().map(_.path.stripPrefix("file://"))
+ }
+
+ // Write and check whether new files are written correctly
+ val files1 = writeRange(0, 10, 2)
+ assert(files1.size === 2, s"unexpected number of files: $files1")
+ checkFilesExist(path, files1, "file not written")
+ checkAnswer(sqlContext.read.load(path.getCanonicalPath), (0 until 10).map(Row(_, 100)))
+
+ // Append and check whether new files are written correctly and old files still exist
+ val files2 = writeRange(10, 20, 3)
+ assert(files2.size === 3, s"unexpected number of files: $files2")
+ assert(files2.intersect(files1).isEmpty, "old files returned")
+ checkFilesExist(path, files2, s"New file not written")
+ checkFilesExist(path, files1, s"Old file not found")
+ checkAnswer(sqlContext.read.load(path.getCanonicalPath), (0 until 20).map(Row(_, 100)))
+ }
+
+ test("FileStreamSinkWriter - partitioned data") {
+ implicit val e = ExpressionEncoder[java.lang.Long]
+ val path = Utils.createTempDir()
+ path.delete()
+
+ val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
+ val fileFormat = new parquet.DefaultSource()
+
+ def writeRange(start: Int, end: Int, numPartitions: Int): Seq[String] = {
+ val df = sqlContext
+ .range(start, end, 1, numPartitions)
+ .flatMap(x => Iterator(x, x, x)).toDF("id")
+ .select($"id", lit(100).as("data1"), lit(1000).as("data2"))
+
+ require(df.rdd.partitions.size === numPartitions)
+ val writer = new FileStreamSinkWriter(
+ df, fileFormat, path.toString, partitionColumnNames = Seq("id"), hadoopConf, Map.empty)
+ writer.write().map(_.path.stripPrefix("file://"))
+ }
+
+ def checkOneFileWrittenPerKey(keys: Seq[Int], filesWritten: Seq[String]): Unit = {
+ keys.foreach { id =>
+ assert(
+ filesWritten.count(_.contains(s"/id=$id/")) == 1,
+ s"no file for id=$id. all files: \n\t${filesWritten.mkString("\n\t")}"
+ )
+ }
+ }
+
+ // Write and check whether new files are written correctly
+ val files1 = writeRange(0, 10, 2)
+ assert(files1.size === 10, s"unexpected number of files:\n${files1.mkString("\n")}")
+ checkFilesExist(path, files1, "file not written")
+ checkOneFileWrittenPerKey(0 until 10, files1)
+
+ val answer1 = (0 until 10).flatMap(x => Iterator(x, x, x)).map(Row(100, 1000, _))
+ checkAnswer(sqlContext.read.load(path.getCanonicalPath), answer1)
+
+ // Append and check whether new files are written correctly and old files still exist
+ val files2 = writeRange(0, 20, 3)
+ assert(files2.size === 20, s"unexpected number of files:\n${files2.mkString("\n")}")
+ assert(files2.intersect(files1).isEmpty, "old files returned")
+ checkFilesExist(path, files2, s"New file not written")
+ checkFilesExist(path, files1, s"Old file not found")
+ checkOneFileWrittenPerKey(0 until 20, files2)
+
+ val answer2 = (0 until 20).flatMap(x => Iterator(x, x, x)).map(Row(100, 1000, _))
+ checkAnswer(sqlContext.read.load(path.getCanonicalPath), answer1 ++ answer2)
+ }
+
+ test("FileStreamSink - unpartitioned writing and batch reading") {
val inputData = MemoryStream[Int]
val df = inputData.toDF()
val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath
- val query =
- df.write
- .format("parquet")
- .option("checkpointLocation", checkpointDir)
- .startStream(outputDir)
+ var query: ContinuousQuery = null
+
+ try {
+ query =
+ df.write
+ .format("parquet")
+ .option("checkpointLocation", checkpointDir)
+ .startStream(outputDir)
+
+ inputData.addData(1, 2, 3)
+
+ failAfter(streamingTimeout) {
+ query.processAllAvailable()
+ }
- inputData.addData(1, 2, 3)
- failAfter(streamingTimeout) { query.processAllAvailable() }
+ val outputDf = sqlContext.read.parquet(outputDir).as[Int]
+ checkDataset(outputDf, 1, 2, 3)
- val outputDf = sqlContext.read.parquet(outputDir).as[Int]
- checkDataset(
- outputDf,
- 1, 2, 3)
+ } finally {
+ if (query != null) {
+ query.stop()
+ }
+ }
}
+
+ test("FileStreamSink - partitioned writing and batch reading [IGNORES PARTITION COLUMN]") {
+ val inputData = MemoryStream[Int]
+ val ds = inputData.toDS()
+
+ val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
+ val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath
+
+ var query: ContinuousQuery = null
+
+ try {
+ query =
+ ds.map(i => (i, i * 1000))
+ .toDF("id", "value")
+ .write
+ .format("parquet")
+ .partitionBy("id")
+ .option("checkpointLocation", checkpointDir)
+ .startStream(outputDir)
+
+ inputData.addData(1, 2, 3)
+ failAfter(streamingTimeout) {
+ query.processAllAvailable()
+ }
+
+ // TODO (tdas): Test partition column can be read or not
+ val outputDf = sqlContext.read.parquet(outputDir)
+ checkDataset(
+ outputDf.as[Int],
+ 1000, 2000, 3000)
+
+ } finally {
+ if (query != null) {
+ query.stop()
+ }
+ }
+ }
+
+ test("FileStreamSink - supported formats") {
+ def testFormat(format: Option[String]): Unit = {
+ val inputData = MemoryStream[Int]
+ val ds = inputData.toDS()
+
+ val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
+ val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath
+
+ var query: ContinuousQuery = null
+
+ try {
+ val writer =
+ ds.map(i => (i, i * 1000))
+ .toDF("id", "value")
+ .write
+ if (format.nonEmpty) {
+ writer.format(format.get)
+ }
+ query = writer
+ .option("checkpointLocation", checkpointDir)
+ .startStream(outputDir)
+ } finally {
+ if (query != null) {
+ query.stop()
+ }
+ }
+ }
+
+ testFormat(None) // should not throw error as default format parquet when not specified
+ testFormat(Some("parquet"))
+ val e = intercept[UnsupportedOperationException] {
+ testFormat(Some("text"))
+ }
+ Seq("text", "not support", "stream").foreach { s =>
+ assert(e.getMessage.contains(s))
+ }
+ }
+
+ private def checkFilesExist(dir: File, expectedFiles: Seq[String], msg: String): Unit = {
+ import scala.collection.JavaConverters._
+ val files =
+ FileUtils.listFiles(dir, new RegexFileFilter("[^.]+"), DirectoryFileFilter.DIRECTORY)
+ .asScala
+ .map(_.getCanonicalPath)
+ .toSet
+
+ expectedFiles.foreach { f =>
+ assert(files.contains(f),
+ s"\n$msg\nexpected file:\n\t$f\nfound files:\n${files.mkString("\n\t")}")
+ }
+ }
+
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
index 5b49a0a86a..50703e532f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
@@ -41,7 +41,15 @@ import org.apache.spark.util.Utils
class FileStressSuite extends StreamTest with SharedSQLContext {
import testImplicits._
- test("fault tolerance stress test") {
+ testQuietly("fault tolerance stress test - unpartitioned output") {
+ stressTest(partitionWrites = false)
+ }
+
+ testQuietly("fault tolerance stress test - partitioned output") {
+ stressTest(partitionWrites = true)
+ }
+
+ def stressTest(partitionWrites: Boolean): Unit = {
val numRecords = 10000
val inputDir = Utils.createTempDir(namePrefix = "stream.input").getCanonicalPath
val stagingDir = Utils.createTempDir(namePrefix = "stream.staging").getCanonicalPath
@@ -93,18 +101,36 @@ class FileStressSuite extends StreamTest with SharedSQLContext {
writer.start()
val input = sqlContext.read.format("text").stream(inputDir)
- def startStream(): ContinuousQuery = input
+
+ def startStream(): ContinuousQuery = {
+ val output = input
.repartition(5)
.as[String]
.mapPartitions { iter =>
val rand = Random.nextInt(100)
- if (rand < 5) { sys.error("failure") }
+ if (rand < 10) {
+ sys.error("failure")
+ }
iter.map(_.toLong)
}
- .write
- .format("parquet")
- .option("checkpointLocation", checkpoint)
- .startStream(outputDir)
+ .map(x => (x % 400, x.toString))
+ .toDF("id", "data")
+
+ if (partitionWrites) {
+ output
+ .write
+ .partitionBy("id")
+ .format("parquet")
+ .option("checkpointLocation", checkpoint)
+ .startStream(outputDir)
+ } else {
+ output
+ .write
+ .format("parquet")
+ .option("checkpointLocation", checkpoint)
+ .startStream(outputDir)
+ }
+ }
var failures = 0
val streamThread = new Thread("stream runner") {