aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-05-18 12:17:10 -0700
committerMichael Armbrust <michael@databricks.com>2015-05-18 12:17:10 -0700
commit530397ba2f5c0fcabb86ba73048c95177ed0b9fc (patch)
treef3673314e0578a888ab53a7595aa23a000c1461f /sql/core
parent103c863c2ef3d9e6186cfc7d95251a9515e9f180 (diff)
downloadspark-530397ba2f5c0fcabb86ba73048c95177ed0b9fc.tar.gz
spark-530397ba2f5c0fcabb86ba73048c95177ed0b9fc.tar.bz2
spark-530397ba2f5c0fcabb86ba73048c95177ed0b9fc.zip
[SPARK-7567] [SQL] [follow-up] Use a new flag to set output committer based on mapreduce apis
cc liancheng marmbrus Author: Yin Huai <yhuai@databricks.com> Closes #6130 from yhuai/directOutput and squashes the following commits: 312b07d [Yin Huai] A data source can use spark.sql.sources.outputCommitterClass to override the output committer.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala29
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala3
4 files changed, 29 insertions, 9 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 6da910e332..77c6af27d1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -71,6 +71,10 @@ private[spark] object SQLConf {
// Whether to perform partition discovery when loading external data sources. Default to true.
val PARTITION_DISCOVERY_ENABLED = "spark.sql.sources.partitionDiscovery.enabled"
+ // The output committer class used by FSBasedRelation. The specified class needs to be a
+ // subclass of org.apache.hadoop.mapreduce.OutputCommitter.
+ val OUTPUT_COMMITTER_CLASS = "spark.sql.sources.outputCommitterClass"
+
// Whether to perform eager analysis when constructing a dataframe.
// Set to false when debugging requires the ability to look at invalid query plans.
val DATAFRAME_EAGER_ANALYSIS = "spark.sql.eagerAnalysis"
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index bcbdb1ebd2..fea54a2514 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -197,7 +197,7 @@ private[sql] class ParquetRelation2(
classOf[ParquetOutputCommitter])
conf.setClass(
- "mapred.output.committer.class",
+ SQLConf.OUTPUT_COMMITTER_CLASS,
committerClass,
classOf[ParquetOutputCommitter])
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index a09bb08de7..d54dbb0831 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -23,7 +23,7 @@ import scala.collection.mutable
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, FileOutputFormat}
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat}
import org.apache.hadoop.util.Shell
import parquet.hadoop.util.ContextUtil
@@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.RunnableCommand
-import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
+import org.apache.spark.sql.{SQLConf, DataFrame, SQLContext, SaveMode}
private[sql] case class InsertIntoDataSource(
logicalRelation: LogicalRelation,
@@ -287,24 +287,39 @@ private[sql] abstract class BaseWriterContainer(
protected def getWorkPath: String = {
outputCommitter match {
// FileOutputCommitter writes to a temporary location returned by `getWorkPath`.
- case f: FileOutputCommitter => f.getWorkPath.toString
+ case f: MapReduceFileOutputCommitter => f.getWorkPath.toString
case _ => outputPath
}
}
private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
val committerClass = context.getConfiguration.getClass(
- "mapred.output.committer.class", null, classOf[OutputCommitter])
+ SQLConf.OUTPUT_COMMITTER_CLASS, null, classOf[OutputCommitter])
Option(committerClass).map { clazz =>
- val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
- ctor.newInstance(new Path(outputPath), context)
+ // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
+ // has an associated output committer. To override this output committer,
+ // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
+ // If a data source needs to override the output committer, it needs to set the
+ // output committer in prepareForWrite method.
+ if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) {
+ // The specified output committer is a FileOutputCommitter.
+ // So, we will use the FileOutputCommitter-specified constructor.
+ val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
+ ctor.newInstance(new Path(outputPath), context)
+ } else {
+ // The specified output committer is just a OutputCommitter.
+ // So, we will use the no-argument constructor.
+ val ctor = clazz.getDeclaredConstructor()
+ ctor.newInstance()
+ }
}.getOrElse {
+ // If output committer class is not set, we will use the one associated with the
+ // file output format.
outputFormatClass.newInstance().getOutputCommitter(context)
}
}
-
private def setupIDs(jobId: Int, splitId: Int, attemptId: Int): Unit = {
this.jobId = SparkHadoopWriter.createJobID(new Date, jobId)
this.taskId = new TaskID(this.jobId, true, splitId)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 274ab44852..a82a6758d2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -527,7 +527,8 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
/**
* Prepares a write job and returns an [[OutputWriterFactory]]. Client side job preparation can
- * be put here. For example, user defined output committer can be configured here.
+ * be put here. For example, user defined output committer can be configured here
+ * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass.
*
* Note that the only side effect expected here is mutating `job` via its setters. Especially,
* Spark SQL caches [[BaseRelation]] instances for performance, mutating relation internal states