diff options
author | Reynold Xin <rxin@databricks.com> | 2016-10-31 22:23:38 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-10-31 22:23:38 -0700 |
commit | d9d1465009fb40550467089ede315496552374c5 (patch) | |
tree | 47b2d98da776c7bb414676ffde825b34ee1a7ff2 /sql/hive | |
parent | 7d6c87155c740cf622c2c600a8ca64154d24c422 (diff) | |
download | spark-d9d1465009fb40550467089ede315496552374c5.tar.gz spark-d9d1465009fb40550467089ede315496552374c5.tar.bz2 spark-d9d1465009fb40550467089ede315496552374c5.zip |
[SPARK-18024][SQL] Introduce an internal commit protocol API
## What changes were proposed in this pull request?
This patch introduces an internal commit protocol API that is used by the batch data source to do write commits. It currently has only one implementation that uses Hadoop MapReduce's OutputCommitter API. In the future, this commit API can be used to unify streaming and batch commits.
## How was this patch tested?
Should be covered by existing write tests.
Author: Reynold Xin <rxin@databricks.com>
Author: Eric Liang <ekl@databricks.com>
Closes #15707 from rxin/SPARK-18024-2.
Diffstat (limited to 'sql/hive')
3 files changed, 24 insertions, 33 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index eba7aa386a..7c519a0743 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -83,11 +83,19 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable new OutputWriterFactory { override def newInstance( - stagingDir: String, - fileNamePrefix: String, + path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { - new OrcOutputWriter(stagingDir, fileNamePrefix, dataSchema, context) + new OrcOutputWriter(path, dataSchema, context) + } + + override def getFileExtension(context: TaskAttemptContext): String = { + val compressionExtension: String = { + val name = context.getConfiguration.get(OrcRelation.ORC_COMPRESSION) + OrcRelation.extensionsForCompressionCodecNames.getOrElse(name, "") + } + + compressionExtension + ".orc" } } } @@ -210,23 +218,11 @@ private[orc] class OrcSerializer(dataSchema: StructType, conf: Configuration) } private[orc] class OrcOutputWriter( - stagingDir: String, - fileNamePrefix: String, + path: String, dataSchema: StructType, context: TaskAttemptContext) extends OutputWriter { - override val path: String = { - val compressionExtension: String = { - val name = context.getConfiguration.get(OrcRelation.ORC_COMPRESSION) - OrcRelation.extensionsForCompressionCodecNames.getOrElse(name, "") - } - // It has the `.orc` extension at the end because (de)compression tools - // such as gunzip would not be able to decompress this as the compression - // is not applied on this whole file but on each "stream" in ORC format. - new Path(stagingDir, fileNamePrefix + compressionExtension + ".orc").toString - } - private[this] val serializer = new OrcSerializer(dataSchema, context.getConfiguration) // `OrcRecordWriter.close()` creates an empty file if no rows are written at all. We use this diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala index 731540db17..abc7c8cc4d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.sources -import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} import org.apache.spark.TaskContext @@ -40,19 +39,16 @@ class CommitFailureTestSource extends SimpleTextSource { dataSchema: StructType): OutputWriterFactory = new OutputWriterFactory { override def newInstance( - stagingDir: String, - fileNamePrefix: String, + path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { - new SimpleTextOutputWriter(stagingDir, fileNamePrefix, context) { + new SimpleTextOutputWriter(path, context) { var failed = false TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) => failed = true SimpleTextRelation.callbackCalled = true } - override val path: String = new Path(stagingDir, fileNamePrefix).toString - override def write(row: Row): Unit = { if (SimpleTextRelation.failWriter) { sys.error("Intentional task writer failure for testing purpose.") @@ -67,6 +63,8 @@ class CommitFailureTestSource extends SimpleTextSource { } } } + + override def getFileExtension(context: TaskAttemptContext): String = "" } override def shortName(): String = "commit-failure-test" diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala index 9896b9bde9..64d0ecbeef 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala @@ -51,12 +51,13 @@ class SimpleTextSource extends TextBasedFileFormat with DataSourceRegister { SimpleTextRelation.lastHadoopConf = Option(job.getConfiguration) new OutputWriterFactory { override def newInstance( - stagingDir: String, - fileNamePrefix: String, + path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { - new SimpleTextOutputWriter(stagingDir, fileNamePrefix, context) + new SimpleTextOutputWriter(path, context) } + + override def getFileExtension(context: TaskAttemptContext): String = "" } } @@ -120,14 +121,11 @@ class SimpleTextSource extends TextBasedFileFormat with DataSourceRegister { } } -class SimpleTextOutputWriter( - stagingDir: String, fileNamePrefix: String, context: TaskAttemptContext) +class SimpleTextOutputWriter(path: String, context: TaskAttemptContext) extends OutputWriter { - override val path: String = new Path(stagingDir, fileNamePrefix).toString - private val recordWriter: RecordWriter[NullWritable, Text] = - new AppendingTextOutputFormat(new Path(stagingDir), fileNamePrefix).getRecordWriter(context) + new AppendingTextOutputFormat(path).getRecordWriter(context) override def write(row: Row): Unit = { val serialized = row.toSeq.map { v => @@ -141,15 +139,14 @@ class SimpleTextOutputWriter( } } -class AppendingTextOutputFormat(stagingDir: Path, fileNamePrefix: String) - extends TextOutputFormat[NullWritable, Text] { +class AppendingTextOutputFormat(path: String) extends TextOutputFormat[NullWritable, Text] { val numberFormat = NumberFormat.getInstance() numberFormat.setMinimumIntegerDigits(5) numberFormat.setGroupingUsed(false) override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { - new Path(stagingDir, fileNamePrefix) + new Path(path) } } |