aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-05-13 07:35:55 -0700
committerYin Huai <yhuai@databricks.com>2015-05-13 07:35:55 -0700
commit10c546e9d42a0f3fbf45c919e74f62c548ca8347 (patch)
tree6eac4749ccfa562c6ffddd2514b0c75b38376fa1 /sql
parent50c72708015fba15d0e78946f1f4ec262776bc38 (diff)
downloadspark-10c546e9d42a0f3fbf45c919e74f62c548ca8347.tar.gz
spark-10c546e9d42a0f3fbf45c919e74f62c548ca8347.tar.bz2
spark-10c546e9d42a0f3fbf45c919e74f62c548ca8347.zip
[SPARK-7599] [SQL] Don't restrict customized output committers to be subclasses of FileOutputCommitter
Author: Cheng Lian <lian@databricks.com> Closes #6118 from liancheng/spark-7599 and squashes the following commits: 31e1bd6 [Cheng Lian] Don't restrict customized output committers to be subclasses of FileOutputCommitter
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala20
1 files changed, 12 insertions, 8 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index 8372d2c34a..fe8be5b7fe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -244,7 +244,7 @@ private[sql] abstract class BaseWriterContainer(
@transient private val jobContext: JobContext = job
// The following fields are initialized and used on both driver and executor side.
- @transient protected var outputCommitter: FileOutputCommitter = _
+ @transient protected var outputCommitter: OutputCommitter = _
@transient private var jobId: JobID = _
@transient private var taskId: TaskID = _
@transient private var taskAttemptId: TaskAttemptID = _
@@ -282,14 +282,18 @@ private[sql] abstract class BaseWriterContainer(
initWriters()
}
- private def newOutputCommitter(context: TaskAttemptContext): FileOutputCommitter = {
- outputFormatClass.newInstance().getOutputCommitter(context) match {
- case f: FileOutputCommitter => f
- case f => sys.error(
- s"FileOutputCommitter or its subclass is expected, but got a ${f.getClass.getName}.")
+ protected def getWorkPath: String = {
+ outputCommitter match {
+ // FileOutputCommitter writes to a temporary location returned by `getWorkPath`.
+ case f: FileOutputCommitter => f.getWorkPath.toString
+ case _ => outputPath
}
}
+ private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
+ outputFormatClass.newInstance().getOutputCommitter(context)
+ }
+
private def setupIDs(jobId: Int, splitId: Int, attemptId: Int): Unit = {
this.jobId = SparkHadoopWriter.createJobID(new Date, jobId)
this.taskId = new TaskID(this.jobId, true, splitId)
@@ -339,7 +343,7 @@ private[sql] class DefaultWriterContainer(
override protected def initWriters(): Unit = {
writer = outputWriterClass.newInstance()
- writer.init(outputCommitter.getWorkPath.toString, dataSchema, taskAttemptContext)
+ writer.init(getWorkPath, dataSchema, taskAttemptContext)
}
override def outputWriterForRow(row: Row): OutputWriter = writer
@@ -381,7 +385,7 @@ private[sql] class DynamicPartitionWriterContainer(
}.mkString
outputWriters.getOrElseUpdate(partitionPath, {
- val path = new Path(outputCommitter.getWorkPath, partitionPath.stripPrefix(Path.SEPARATOR))
+ val path = new Path(getWorkPath, partitionPath.stripPrefix(Path.SEPARATOR))
val writer = outputWriterClass.newInstance()
writer.init(path.toString, dataSchema, taskAttemptContext)
writer