aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-10-20 12:18:56 -0700
committerReynold Xin <rxin@databricks.com>2016-10-20 12:18:56 -0700
commit7f9ec19eae60abe589ffd22259a9065e7e353a57 (patch)
tree304e751a63b5ec83ec4e8fa918573020890f2ae5
parent947f4f25273161dc4719419a35613a71c2e2a150 (diff)
downloadspark-7f9ec19eae60abe589ffd22259a9065e7e353a57.tar.gz
spark-7f9ec19eae60abe589ffd22259a9065e7e353a57.tar.bz2
spark-7f9ec19eae60abe589ffd22259a9065e7e353a57.zip
[SPARK-18021][SQL] Refactor file name specification for data sources
## What changes were proposed in this pull request? Currently each data source OutputWriter is responsible for specifying the entire file name for each file output. This, however, does not make any sense because we rely on file naming schemes for certain behaviors in Spark SQL, e.g. bucket id. The current approach allows individual data sources to break the implementation of bucketing. On the flip side, we also don't want to move file naming entirely out of data sources, because different data sources do want to specify different extensions. This patch divides file name specification into two parts: the first part is a prefix specified by the caller of OutputWriter (in WriteOutput), and the second part is the suffix that can be specified by the OutputWriter itself. Note that a side effect of this change is that now all file based data sources also support bucketing automatically. There are also some other minor cleanups: - Removed the UUID passed through generic Configuration string - Some minor rewrites for better clarity - Renamed "path" in multiple places to "stagingDir", to more accurately reflect its meaning ## How was this patch tested? This should be covered by existing data source tests. Author: Reynold Xin <rxin@databricks.com> Closes #15562 from rxin/SPARK-18021.
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala16
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/OutputWriter.scala17
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala56
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala17
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala32
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala21
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala21
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala26
12 files changed, 99 insertions, 143 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
index 8577803743..fff86686b5 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
@@ -40,7 +40,8 @@ import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration
private[libsvm] class LibSVMOutputWriter(
- path: String,
+ stagingDir: String,
+ fileNamePrefix: String,
dataSchema: StructType,
context: TaskAttemptContext)
extends OutputWriter {
@@ -50,11 +51,7 @@ private[libsvm] class LibSVMOutputWriter(
private val recordWriter: RecordWriter[NullWritable, Text] = {
new TextOutputFormat[NullWritable, Text]() {
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
- val configuration = context.getConfiguration
- val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
- val taskAttemptId = context.getTaskAttemptID
- val split = taskAttemptId.getTaskID.getId
- new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
+ new Path(stagingDir, fileNamePrefix + extension)
}
}.getRecordWriter(context)
}
@@ -132,12 +129,11 @@ private[libsvm] class LibSVMFileFormat extends TextBasedFileFormat with DataSour
dataSchema: StructType): OutputWriterFactory = {
new OutputWriterFactory {
override def newInstance(
- path: String,
- bucketId: Option[Int],
+ stagingDir: String,
+ fileNamePrefix: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
- if (bucketId.isDefined) { sys.error("LibSVM doesn't support bucketing") }
- new LibSVMOutputWriter(path, dataSchema, context)
+ new LibSVMOutputWriter(stagingDir, fileNamePrefix, dataSchema, context)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/OutputWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/OutputWriter.scala
index d2eec7b141..f4cefdab07 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/OutputWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/OutputWriter.scala
@@ -34,18 +34,23 @@ abstract class OutputWriterFactory extends Serializable {
* When writing to a [[HadoopFsRelation]], this method gets called by each task on executor side
* to instantiate new [[OutputWriter]]s.
*
- * @param path Path of the file to which this [[OutputWriter]] is supposed to write. Note that
- * this may not point to the final output file. For example, `FileOutputFormat` writes to
- * temporary directories and then merge written files back to the final destination. In
- * this case, `path` points to a temporary output file under the temporary directory.
+ * @param stagingDir Base path (directory) of the file to which this [[OutputWriter]] is supposed
+ * to write. Note that this may not point to the final output file. For
+ * example, `FileOutputFormat` writes to temporary directories and then merge
+ * written files back to the final destination. In this case, `path` points to
+ * a temporary output file under the temporary directory.
+ * @param fileNamePrefix Prefix of the file name. The returned OutputWriter must make sure this
+ * prefix is used in the actual file name. For example, if the prefix is
+ * "part-1-2-3", then the file name must start with "part_1_2_3" but can
+ * end in arbitrary extension.
* @param dataSchema Schema of the rows to be written. Partition columns are not included in the
* schema if the relation being written is partitioned.
* @param context The Hadoop MapReduce task context.
* @since 1.4.0
*/
def newInstance(
- path: String,
- bucketId: Option[Int], // TODO: This doesn't belong here...
+ stagingDir: String,
+ fileNamePrefix: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala
index 54d0f3bd62..bd56e511d0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteOutput.scala
@@ -46,6 +46,7 @@ object WriteOutput extends Logging {
/** A shared job description for all the write tasks. */
private class WriteJobDescription(
+ val uuid: String, // prevent collision between different (appending) write jobs
val serializableHadoopConf: SerializableConfiguration,
val outputWriterFactory: OutputWriterFactory,
val allColumns: Seq[Attribute],
@@ -102,6 +103,7 @@ object WriteOutput extends Logging {
fileFormat.prepareWrite(sparkSession, job, options, dataColumns.toStructType)
val description = new WriteJobDescription(
+ uuid = UUID.randomUUID().toString,
serializableHadoopConf = new SerializableConfiguration(job.getConfiguration),
outputWriterFactory = outputWriterFactory,
allColumns = plan.output,
@@ -213,6 +215,11 @@ object WriteOutput extends Logging {
private trait ExecuteWriteTask {
def execute(iterator: Iterator[InternalRow]): Unit
def releaseResources(): Unit
+
+ final def filePrefix(split: Int, uuid: String, bucketId: Option[Int]): String = {
+ val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
+ f"part-r-$split%05d-$uuid$bucketString"
+ }
}
/** Writes data to a single directory (used for non-dynamic-partition writes). */
@@ -222,9 +229,11 @@ object WriteOutput extends Logging {
stagingPath: String) extends ExecuteWriteTask {
private[this] var outputWriter: OutputWriter = {
+ val split = taskAttemptContext.getTaskAttemptID.getTaskID.getId
+
val outputWriter = description.outputWriterFactory.newInstance(
- path = stagingPath,
- bucketId = None,
+ stagingDir = stagingPath,
+ fileNamePrefix = filePrefix(split, description.uuid, None),
dataSchema = description.nonPartitionColumns.toStructType,
context = taskAttemptContext)
outputWriter.initConverter(dataSchema = description.nonPartitionColumns.toStructType)
@@ -287,29 +296,31 @@ object WriteOutput extends Logging {
}
}
- private def getBucketIdFromKey(key: InternalRow): Option[Int] =
- description.bucketSpec.map { _ => key.getInt(description.partitionColumns.length) }
-
/**
* Open and returns a new OutputWriter given a partition key and optional bucket id.
* If bucket id is specified, we will append it to the end of the file name, but before the
* file extension, e.g. part-r-00009-ea518ad4-455a-4431-b471-d24e03814677-00002.gz.parquet
*/
- private def newOutputWriter(
- key: InternalRow,
- getPartitionString: UnsafeProjection): OutputWriter = {
+ private def newOutputWriter(key: InternalRow, partString: UnsafeProjection): OutputWriter = {
val path =
if (description.partitionColumns.nonEmpty) {
- val partitionPath = getPartitionString(key).getString(0)
+ val partitionPath = partString(key).getString(0)
new Path(stagingPath, partitionPath).toString
} else {
stagingPath
}
- val bucketId = getBucketIdFromKey(key)
+ // If the bucket spec is defined, the bucket column is right after the partition columns
+ val bucketId = if (description.bucketSpec.isDefined) {
+ Some(key.getInt(description.partitionColumns.length))
+ } else {
+ None
+ }
+
+ val split = taskAttemptContext.getTaskAttemptID.getTaskID.getId
val newWriter = description.outputWriterFactory.newInstance(
- path = path,
- bucketId = bucketId,
+ stagingDir = path,
+ fileNamePrefix = filePrefix(split, description.uuid, bucketId),
dataSchema = description.nonPartitionColumns.toStructType,
context = taskAttemptContext)
newWriter.initConverter(description.nonPartitionColumns.toStructType)
@@ -319,7 +330,7 @@ object WriteOutput extends Logging {
override def execute(iter: Iterator[InternalRow]): Unit = {
// We should first sort by partition columns, then bucket id, and finally sorting columns.
val sortingExpressions: Seq[Expression] =
- description.partitionColumns ++ bucketIdExpression ++ sortColumns
+ description.partitionColumns ++ bucketIdExpression ++ sortColumns
val getSortingKey = UnsafeProjection.create(sortingExpressions, description.allColumns)
val sortingKeySchema = StructType(sortingExpressions.map {
@@ -333,8 +344,8 @@ object WriteOutput extends Logging {
description.nonPartitionColumns, description.allColumns)
// Returns the partition path given a partition key.
- val getPartitionString =
- UnsafeProjection.create(Seq(Concat(partitionStringExpression)), description.partitionColumns)
+ val getPartitionString = UnsafeProjection.create(
+ Seq(Concat(partitionStringExpression)), description.partitionColumns)
// Sorts the data before write, so that we only need one writer at the same time.
val sorter = new UnsafeKVExternalSorter(
@@ -405,17 +416,6 @@ object WriteOutput extends Logging {
job.getConfiguration.setBoolean("mapred.task.is.map", true)
job.getConfiguration.setInt("mapred.task.partition", 0)
- // This UUID is sent to executor side together with the serialized `Configuration` object within
- // the `Job` instance. `OutputWriters` on the executor side should use this UUID to generate
- // unique task output files.
- // This UUID is used to avoid output file name collision between different appending write jobs.
- // These jobs may belong to different SparkContext instances. Concrete data source
- // implementations may use this UUID to generate unique file names (e.g.,
- // `part-r-<task-id>-<job-uuid>.parquet`). The reason why this ID is used to identify a job
- // rather than a single task output file is that, speculative tasks must generate the same
- // output file name as the original task.
- job.getConfiguration.set(WriterContainer.DATASOURCE_WRITEJOBUUID, UUID.randomUUID().toString)
-
val taskAttemptContext = new TaskAttemptContextImpl(job.getConfiguration, taskAttemptId)
val outputCommitter = newOutputCommitter(
job.getOutputFormatClass, taskAttemptContext, path, isAppend)
@@ -474,7 +474,3 @@ object WriteOutput extends Logging {
}
}
}
-
-object WriterContainer {
- val DATASOURCE_WRITEJOBUUID = "spark.sql.sources.writeJobUUID"
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
index 55cb26d651..eefacbf05b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory, PartitionedFile, WriterContainer}
+import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory, PartitionedFile}
import org.apache.spark.sql.types._
object CSVRelation extends Logging {
@@ -170,17 +170,17 @@ object CSVRelation extends Logging {
private[csv] class CSVOutputWriterFactory(params: CSVOptions) extends OutputWriterFactory {
override def newInstance(
- path: String,
- bucketId: Option[Int],
+ stagingDir: String,
+ fileNamePrefix: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
- if (bucketId.isDefined) sys.error("csv doesn't support bucketing")
- new CsvOutputWriter(path, dataSchema, context, params)
+ new CsvOutputWriter(stagingDir, fileNamePrefix, dataSchema, context, params)
}
}
private[csv] class CsvOutputWriter(
- path: String,
+ stagingDir: String,
+ fileNamePrefix: String,
dataSchema: StructType,
context: TaskAttemptContext,
params: CSVOptions) extends OutputWriter with Logging {
@@ -199,11 +199,7 @@ private[csv] class CsvOutputWriter(
private val recordWriter: RecordWriter[NullWritable, Text] = {
new TextOutputFormat[NullWritable, Text]() {
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
- val configuration = context.getConfiguration
- val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
- val taskAttemptId = context.getTaskAttemptID
- val split = taskAttemptId.getTaskID.getId
- new Path(path, f"part-r-$split%05d-$uniqueWriteJobId.csv$extension")
+ new Path(stagingDir, s"$fileNamePrefix.csv$extension")
}
}.getRecordWriter(context)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index 9fe38ccc9f..cdbb2f7292 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -82,11 +82,11 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
new OutputWriterFactory {
override def newInstance(
- path: String,
- bucketId: Option[Int],
+ stagingDir: String,
+ fileNamePrefix: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
- new JsonOutputWriter(path, parsedOptions, bucketId, dataSchema, context)
+ new JsonOutputWriter(stagingDir, parsedOptions, fileNamePrefix, dataSchema, context)
}
}
}
@@ -153,9 +153,9 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
}
private[json] class JsonOutputWriter(
- path: String,
+ stagingDir: String,
options: JSONOptions,
- bucketId: Option[Int],
+ fileNamePrefix: String,
dataSchema: StructType,
context: TaskAttemptContext)
extends OutputWriter with Logging {
@@ -168,12 +168,7 @@ private[json] class JsonOutputWriter(
private val recordWriter: RecordWriter[NullWritable, Text] = {
new TextOutputFormat[NullWritable, Text]() {
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
- val configuration = context.getConfiguration
- val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
- val taskAttemptId = context.getTaskAttemptID
- val split = taskAttemptId.getTaskID.getId
- val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
- new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$bucketString.json$extension")
+ new Path(stagingDir, s"$fileNamePrefix.json$extension")
}
}.getRecordWriter(context)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 6faafed1e6..87b944ba52 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -27,7 +27,7 @@ import scala.util.{Failure, Try}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.parquet.{Log => ApacheParquetLog}
import org.apache.parquet.filter2.compat.FilterCompat
@@ -45,7 +45,6 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
@@ -134,10 +133,10 @@ class ParquetFileFormat
new OutputWriterFactory {
override def newInstance(
path: String,
- bucketId: Option[Int],
+ fileNamePrefix: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
- new ParquetOutputWriter(path, bucketId, context)
+ new ParquetOutputWriter(path, fileNamePrefix, context)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala
index f89ce05d82..39c199784c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala
@@ -26,7 +26,7 @@ import org.apache.parquet.hadoop.util.ContextUtil
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.datasources.{BucketingUtils, OutputWriter, OutputWriterFactory, WriterContainer}
+import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration
@@ -122,13 +122,12 @@ private[parquet] class ParquetOutputWriterFactory(
}
/** Disable the use of the older API. */
- def newInstance(
+ override def newInstance(
path: String,
- bucketId: Option[Int],
+ fileNamePrefix: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
- throw new UnsupportedOperationException(
- "this version of newInstance not supported for " +
+ throw new UnsupportedOperationException("this version of newInstance not supported for " +
"ParquetOutputWriterFactory")
}
}
@@ -136,33 +135,16 @@ private[parquet] class ParquetOutputWriterFactory(
// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
private[parquet] class ParquetOutputWriter(
- path: String,
- bucketId: Option[Int],
+ stagingDir: String,
+ fileNamePrefix: String,
context: TaskAttemptContext)
extends OutputWriter {
private val recordWriter: RecordWriter[Void, InternalRow] = {
val outputFormat = {
new ParquetOutputFormat[InternalRow]() {
- // Here we override `getDefaultWorkFile` for two reasons:
- //
- // 1. To allow appending. We need to generate unique output file names to avoid
- // overwriting existing files (either exist before the write job, or are just written
- // by other tasks within the same write job).
- //
- // 2. To allow dynamic partitioning. Default `getDefaultWorkFile` uses
- // `FileOutputCommitter.getWorkPath()`, which points to the base directory of all
- // partitions in the case of dynamic partitioning.
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
- val configuration = context.getConfiguration
- val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
- val taskAttemptId = context.getTaskAttemptID
- val split = taskAttemptId.getTaskID.getId
- val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
- // It has the `.parquet` extension at the end because (de)compression tools
- // such as gunzip would not be able to decompress this as the compression
- // is not applied on this whole file but on each "page" in Parquet format.
- new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$bucketString$extension")
+ new Path(stagingDir, fileNamePrefix + extension)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
index 9f96667311..6cd2351c57 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
@@ -73,14 +73,11 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister {
new OutputWriterFactory {
override def newInstance(
- path: String,
- bucketId: Option[Int],
+ stagingDir: String,
+ fileNamePrefix: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
- if (bucketId.isDefined) {
- throw new AnalysisException("Text doesn't support bucketing")
- }
- new TextOutputWriter(path, dataSchema, context)
+ new TextOutputWriter(stagingDir, fileNamePrefix, dataSchema, context)
}
}
}
@@ -124,7 +121,11 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister {
}
}
-class TextOutputWriter(path: String, dataSchema: StructType, context: TaskAttemptContext)
+class TextOutputWriter(
+ stagingDir: String,
+ fileNamePrefix: String,
+ dataSchema: StructType,
+ context: TaskAttemptContext)
extends OutputWriter {
private[this] val buffer = new Text()
@@ -132,11 +133,7 @@ class TextOutputWriter(path: String, dataSchema: StructType, context: TaskAttemp
private val recordWriter: RecordWriter[NullWritable, Text] = {
new TextOutputFormat[NullWritable, Text]() {
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
- val configuration = context.getConfiguration
- val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
- val taskAttemptId = context.getTaskAttemptID
- val split = taskAttemptId.getTaskID.getId
- new Path(path, f"part-r-$split%05d-$uniqueWriteJobId.txt$extension")
+ new Path(stagingDir, s"$fileNamePrefix.txt$extension")
}
}.getRecordWriter(context)
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
index 1af3280e18..1ceacb458a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
@@ -83,11 +83,11 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
new OutputWriterFactory {
override def newInstance(
- path: String,
- bucketId: Option[Int],
+ stagingDir: String,
+ fileNamePrefix: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
- new OrcOutputWriter(path, bucketId, dataSchema, context)
+ new OrcOutputWriter(stagingDir, fileNamePrefix, dataSchema, context)
}
}
}
@@ -210,8 +210,8 @@ private[orc] class OrcSerializer(dataSchema: StructType, conf: Configuration)
}
private[orc] class OrcOutputWriter(
- path: String,
- bucketId: Option[Int],
+ stagingDir: String,
+ fileNamePrefix: String,
dataSchema: StructType,
context: TaskAttemptContext)
extends OutputWriter {
@@ -226,10 +226,7 @@ private[orc] class OrcOutputWriter(
private lazy val recordWriter: RecordWriter[NullWritable, Writable] = {
recordWriterInstantiated = true
- val uniqueWriteJobId = conf.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
- val taskAttemptId = context.getTaskAttemptID
- val partition = taskAttemptId.getTaskID.getId
- val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
+
val compressionExtension = {
val name = conf.get(OrcRelation.ORC_COMPRESSION)
OrcRelation.extensionsForCompressionCodecNames.getOrElse(name, "")
@@ -237,12 +234,12 @@ private[orc] class OrcOutputWriter(
// It has the `.orc` extension at the end because (de)compression tools
// such as gunzip would not be able to decompress this as the compression
// is not applied on this whole file but on each "stream" in ORC format.
- val filename = f"part-r-$partition%05d-$uniqueWriteJobId$bucketString$compressionExtension.orc"
+ val filename = s"$fileNamePrefix$compressionExtension.orc"
new OrcOutputFormat().getRecordWriter(
- new Path(path, filename).getFileSystem(conf),
+ new Path(stagingDir, filename).getFileSystem(conf),
conf.asInstanceOf[JobConf],
- new Path(path, filename).toString,
+ new Path(stagingDir, filename).toString,
Reporter.NULL
).asInstanceOf[RecordWriter[NullWritable, Writable]]
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
index 997445114b..2eafe18b85 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
@@ -54,11 +54,6 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
intercept[AnalysisException](df.write.bucketBy(2, "i").sortBy("j").saveAsTable("tt"))
}
- test("write bucketed data to unsupported data source") {
- val df = Seq(Tuple1("a"), Tuple1("b")).toDF("i")
- intercept[SparkException](df.write.bucketBy(3, "i").format("text").saveAsTable("tt"))
- }
-
test("write bucketed data using save()") {
val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala
index 5a8a7f0ab5..d504468402 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala
@@ -39,11 +39,11 @@ class CommitFailureTestSource extends SimpleTextSource {
dataSchema: StructType): OutputWriterFactory =
new OutputWriterFactory {
override def newInstance(
- path: String,
- bucketId: Option[Int],
+ stagingDir: String,
+ fileNamePrefix: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
- new SimpleTextOutputWriter(path, context) {
+ new SimpleTextOutputWriter(stagingDir, fileNamePrefix, context) {
var failed = false
TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) =>
failed = true
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index 906de6bbcb..9e13b217ec 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.{NullWritable, Text}
import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
-import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat}
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.spark.sql.{sources, Row, SparkSession}
import org.apache.spark.sql.catalyst.{expressions, InternalRow}
@@ -51,11 +51,11 @@ class SimpleTextSource extends TextBasedFileFormat with DataSourceRegister {
SimpleTextRelation.lastHadoopConf = Option(job.getConfiguration)
new OutputWriterFactory {
override def newInstance(
- path: String,
- bucketId: Option[Int],
+ stagingDir: String,
+ fileNamePrefix: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
- new SimpleTextOutputWriter(path, context)
+ new SimpleTextOutputWriter(stagingDir, fileNamePrefix, context)
}
}
}
@@ -120,9 +120,11 @@ class SimpleTextSource extends TextBasedFileFormat with DataSourceRegister {
}
}
-class SimpleTextOutputWriter(path: String, context: TaskAttemptContext) extends OutputWriter {
+class SimpleTextOutputWriter(
+ stagingDir: String, fileNamePrefix: String, context: TaskAttemptContext)
+ extends OutputWriter {
private val recordWriter: RecordWriter[NullWritable, Text] =
- new AppendingTextOutputFormat(new Path(path)).getRecordWriter(context)
+ new AppendingTextOutputFormat(new Path(stagingDir), fileNamePrefix).getRecordWriter(context)
override def write(row: Row): Unit = {
val serialized = row.toSeq.map { v =>
@@ -136,19 +138,15 @@ class SimpleTextOutputWriter(path: String, context: TaskAttemptContext) extends
}
}
-class AppendingTextOutputFormat(outputFile: Path) extends TextOutputFormat[NullWritable, Text] {
- val numberFormat = NumberFormat.getInstance()
+class AppendingTextOutputFormat(stagingDir: Path, fileNamePrefix: String)
+ extends TextOutputFormat[NullWritable, Text] {
+ val numberFormat = NumberFormat.getInstance()
numberFormat.setMinimumIntegerDigits(5)
numberFormat.setGroupingUsed(false)
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
- val configuration = context.getConfiguration
- val uniqueWriteJobId = configuration.get(WriterContainer.DATASOURCE_WRITEJOBUUID)
- val taskAttemptId = context.getTaskAttemptID
- val split = taskAttemptId.getTaskID.getId
- val name = FileOutputFormat.getOutputName(context)
- new Path(outputFile, s"$name-${numberFormat.format(split)}-$uniqueWriteJobId")
+ new Path(stagingDir, fileNamePrefix)
}
}