aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-10-31 22:23:38 -0700
committerReynold Xin <rxin@databricks.com>2016-10-31 22:23:38 -0700
commitd9d1465009fb40550467089ede315496552374c5 (patch)
tree47b2d98da776c7bb414676ffde825b34ee1a7ff2 /sql/hive
parent7d6c87155c740cf622c2c600a8ca64154d24c422 (diff)
downloadspark-d9d1465009fb40550467089ede315496552374c5.tar.gz
spark-d9d1465009fb40550467089ede315496552374c5.tar.bz2
spark-d9d1465009fb40550467089ede315496552374c5.zip
[SPARK-18024][SQL] Introduce an internal commit protocol API
## What changes were proposed in this pull request? This patch introduces an internal commit protocol API that is used by the batch data source to do write commits. It currently has only one implementation that uses Hadoop MapReduce's OutputCommitter API. In the future, this commit API can be used to unify streaming and batch commits. ## How was this patch tested? Should be covered by existing write tests. Author: Reynold Xin <rxin@databricks.com> Author: Eric Liang <ekl@databricks.com> Closes #15707 from rxin/SPARK-18024-2.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala28
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala10
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala19
3 files changed, 24 insertions, 33 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
index eba7aa386a..7c519a0743 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
@@ -83,11 +83,19 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
new OutputWriterFactory {
override def newInstance(
- stagingDir: String,
- fileNamePrefix: String,
+ path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
- new OrcOutputWriter(stagingDir, fileNamePrefix, dataSchema, context)
+ new OrcOutputWriter(path, dataSchema, context)
+ }
+
+ override def getFileExtension(context: TaskAttemptContext): String = {
+ val compressionExtension: String = {
+ val name = context.getConfiguration.get(OrcRelation.ORC_COMPRESSION)
+ OrcRelation.extensionsForCompressionCodecNames.getOrElse(name, "")
+ }
+
+ compressionExtension + ".orc"
}
}
}
@@ -210,23 +218,11 @@ private[orc] class OrcSerializer(dataSchema: StructType, conf: Configuration)
}
private[orc] class OrcOutputWriter(
- stagingDir: String,
- fileNamePrefix: String,
+ path: String,
dataSchema: StructType,
context: TaskAttemptContext)
extends OutputWriter {
- override val path: String = {
- val compressionExtension: String = {
- val name = context.getConfiguration.get(OrcRelation.ORC_COMPRESSION)
- OrcRelation.extensionsForCompressionCodecNames.getOrElse(name, "")
- }
- // It has the `.orc` extension at the end because (de)compression tools
- // such as gunzip would not be able to decompress this as the compression
- // is not applied on this whole file but on each "stream" in ORC format.
- new Path(stagingDir, fileNamePrefix + compressionExtension + ".orc").toString
- }
-
private[this] val serializer = new OrcSerializer(dataSchema, context.getConfiguration)
// `OrcRecordWriter.close()` creates an empty file if no rows are written at all. We use this
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala
index 731540db17..abc7c8cc4d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.sources
-import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.spark.TaskContext
@@ -40,19 +39,16 @@ class CommitFailureTestSource extends SimpleTextSource {
dataSchema: StructType): OutputWriterFactory =
new OutputWriterFactory {
override def newInstance(
- stagingDir: String,
- fileNamePrefix: String,
+ path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
- new SimpleTextOutputWriter(stagingDir, fileNamePrefix, context) {
+ new SimpleTextOutputWriter(path, context) {
var failed = false
TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) =>
failed = true
SimpleTextRelation.callbackCalled = true
}
- override val path: String = new Path(stagingDir, fileNamePrefix).toString
-
override def write(row: Row): Unit = {
if (SimpleTextRelation.failWriter) {
sys.error("Intentional task writer failure for testing purpose.")
@@ -67,6 +63,8 @@ class CommitFailureTestSource extends SimpleTextSource {
}
}
}
+
+ override def getFileExtension(context: TaskAttemptContext): String = ""
}
override def shortName(): String = "commit-failure-test"
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index 9896b9bde9..64d0ecbeef 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -51,12 +51,13 @@ class SimpleTextSource extends TextBasedFileFormat with DataSourceRegister {
SimpleTextRelation.lastHadoopConf = Option(job.getConfiguration)
new OutputWriterFactory {
override def newInstance(
- stagingDir: String,
- fileNamePrefix: String,
+ path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
- new SimpleTextOutputWriter(stagingDir, fileNamePrefix, context)
+ new SimpleTextOutputWriter(path, context)
}
+
+ override def getFileExtension(context: TaskAttemptContext): String = ""
}
}
@@ -120,14 +121,11 @@ class SimpleTextSource extends TextBasedFileFormat with DataSourceRegister {
}
}
-class SimpleTextOutputWriter(
- stagingDir: String, fileNamePrefix: String, context: TaskAttemptContext)
+class SimpleTextOutputWriter(path: String, context: TaskAttemptContext)
extends OutputWriter {
- override val path: String = new Path(stagingDir, fileNamePrefix).toString
-
private val recordWriter: RecordWriter[NullWritable, Text] =
- new AppendingTextOutputFormat(new Path(stagingDir), fileNamePrefix).getRecordWriter(context)
+ new AppendingTextOutputFormat(path).getRecordWriter(context)
override def write(row: Row): Unit = {
val serialized = row.toSeq.map { v =>
@@ -141,15 +139,14 @@ class SimpleTextOutputWriter(
}
}
-class AppendingTextOutputFormat(stagingDir: Path, fileNamePrefix: String)
- extends TextOutputFormat[NullWritable, Text] {
+class AppendingTextOutputFormat(path: String) extends TextOutputFormat[NullWritable, Text] {
val numberFormat = NumberFormat.getInstance()
numberFormat.setMinimumIntegerDigits(5)
numberFormat.setGroupingUsed(false)
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
- new Path(stagingDir, fileNamePrefix)
+ new Path(path)
}
}