aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-06-23 17:24:26 -0700
committerCheng Lian <lian@databricks.com>2015-06-23 17:24:26 -0700
commit111d6b9b8a584b962b6ae80c7aa8c45845ce0099 (patch)
treebc5955310ec43cb175ea77a147fc3bd99340e27b /sql/core
parent7fb5ae5024284593204779ff463bfbdb4d1c6da5 (diff)
downloadspark-111d6b9b8a584b962b6ae80c7aa8c45845ce0099.tar.gz
spark-111d6b9b8a584b962b6ae80c7aa8c45845ce0099.tar.bz2
spark-111d6b9b8a584b962b6ae80c7aa8c45845ce0099.zip
[SPARK-8139] [SQL] Updates docs and comments of data sources and Parquet output committer options
This PR only applies to master branch (1.5.0-SNAPSHOT) since it references `org.apache.parquet` classes which only appear in Parquet 1.7.0. Author: Cheng Lian <lian@databricks.com> Closes #6683 from liancheng/output-committer-docs and squashes the following commits: b4648b8 [Cheng Lian] Removes spark.sql.sources.outputCommitterClass as it's not a public option ee63923 [Cheng Lian] Updates docs and comments of data sources and Parquet output committer options
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala30
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala34
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala4
3 files changed, 49 insertions, 19 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 16493c3d7c..265352647f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -22,6 +22,8 @@ import java.util.Properties
import scala.collection.immutable
import scala.collection.JavaConversions._
+import org.apache.parquet.hadoop.ParquetOutputCommitter
+
import org.apache.spark.sql.catalyst.CatalystConf
private[spark] object SQLConf {
@@ -252,9 +254,9 @@ private[spark] object SQLConf {
val PARQUET_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.parquet.filterPushdown",
defaultValue = Some(false),
- doc = "Turn on Parquet filter pushdown optimization. This feature is turned off by default" +
- " because of a known bug in Paruet 1.6.0rc3 " +
- "(<a href=\"https://issues.apache.org/jira/browse/PARQUET-136\">PARQUET-136</a>). However, " +
+ doc = "Turn on Parquet filter pushdown optimization. This feature is turned off by default " +
+ "because of a known bug in Parquet 1.6.0rc3 " +
+ "(PARQUET-136, https://issues.apache.org/jira/browse/PARQUET-136). However, " +
"if your table doesn't contain any nullable string or binary columns, it's still safe to " +
"turn this feature on.")
@@ -262,11 +264,21 @@ private[spark] object SQLConf {
defaultValue = Some(true),
doc = "<TODO>")
+ val PARQUET_OUTPUT_COMMITTER_CLASS = stringConf(
+ key = "spark.sql.parquet.output.committer.class",
+ defaultValue = Some(classOf[ParquetOutputCommitter].getName),
+ doc = "The output committer class used by Parquet. The specified class needs to be a " +
+ "subclass of org.apache.hadoop.mapreduce.OutputCommitter. Typically, it's also a subclass " +
+ "of org.apache.parquet.hadoop.ParquetOutputCommitter. NOTE: 1. Instead of SQLConf, this " +
+ "option must be set in Hadoop Configuration. 2. This option overrides " +
+ "\"spark.sql.sources.outputCommitterClass\"."
+ )
+
val ORC_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.orc.filterPushdown",
defaultValue = Some(false),
doc = "<TODO>")
- val HIVE_VERIFY_PARTITIONPATH = booleanConf("spark.sql.hive.verifyPartitionPath",
+ val HIVE_VERIFY_PARTITION_PATH = booleanConf("spark.sql.hive.verifyPartitionPath",
defaultValue = Some(true),
doc = "<TODO>")
@@ -325,9 +337,13 @@ private[spark] object SQLConf {
defaultValue = Some(true),
doc = "<TODO>")
- // The output committer class used by FSBasedRelation. The specified class needs to be a
+ // The output committer class used by HadoopFsRelation. The specified class needs to be a
// subclass of org.apache.hadoop.mapreduce.OutputCommitter.
- // NOTE: This property should be set in Hadoop `Configuration` rather than Spark `SQLConf`
+ //
+ // NOTE:
+ //
+ // 1. Instead of SQLConf, this option *must be set in Hadoop Configuration*.
+ // 2. This option can be overriden by "spark.sql.parquet.output.committer.class".
val OUTPUT_COMMITTER_CLASS =
stringConf("spark.sql.sources.outputCommitterClass", isPublic = false)
@@ -415,7 +431,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
private[spark] def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED)
/** When true uses verifyPartitionPath to prune the path which is not exists. */
- private[spark] def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITIONPATH)
+ private[spark] def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH)
/** When true the planner will use the external sort, which may spill to disk. */
private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala
index 62c4e92ebe..1551afd7b7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala
@@ -17,19 +17,35 @@
package org.apache.spark.sql.parquet
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
-
+import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
import org.apache.parquet.Log
import org.apache.parquet.hadoop.util.ContextUtil
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetFileWriter, ParquetOutputCommitter, ParquetOutputFormat}
+/**
+ * An output committer for writing Parquet files. In stead of writing to the `_temporary` folder
+ * like what [[ParquetOutputCommitter]] does, this output committer writes data directly to the
+ * destination folder. This can be useful for data stored in S3, where directory operations are
+ * relatively expensive.
+ *
+ * To enable this output committer, users may set the "spark.sql.parquet.output.committer.class"
+ * property via Hadoop [[Configuration]]. Not that this property overrides
+ * "spark.sql.sources.outputCommitterClass".
+ *
+ * *NOTE*
+ *
+ * NEVER use [[DirectParquetOutputCommitter]] when appending data, because currently there's
+ * no safe way undo a failed appending job (that's why both `abortTask()` and `abortJob()` are
+ * left * empty).
+ */
private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
extends ParquetOutputCommitter(outputPath, context) {
val LOG = Log.getLog(classOf[ParquetOutputCommitter])
- override def getWorkPath(): Path = outputPath
+ override def getWorkPath: Path = outputPath
override def abortTask(taskContext: TaskAttemptContext): Unit = {}
override def commitTask(taskContext: TaskAttemptContext): Unit = {}
override def needsTaskCommit(taskContext: TaskAttemptContext): Boolean = true
@@ -46,13 +62,11 @@ private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: T
val footers = ParquetFileReader.readAllFootersInParallel(configuration, outputStatus)
try {
ParquetFileWriter.writeMetadataFile(configuration, outputPath, footers)
- } catch {
- case e: Exception => {
- LOG.warn("could not write summary file for " + outputPath, e)
- val metadataPath = new Path(outputPath, ParquetFileWriter.PARQUET_METADATA_FILE)
- if (fileSystem.exists(metadataPath)) {
- fileSystem.delete(metadataPath, true)
- }
+ } catch { case e: Exception =>
+ LOG.warn("could not write summary file for " + outputPath, e)
+ val metadataPath = new Path(outputPath, ParquetFileWriter.PARQUET_METADATA_FILE)
+ if (fileSystem.exists(metadataPath)) {
+ fileSystem.delete(metadataPath, true)
}
}
} catch {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index e049d54bf5..1d353bd8e1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -178,11 +178,11 @@ private[sql] class ParquetRelation2(
val committerClass =
conf.getClass(
- "spark.sql.parquet.output.committer.class",
+ SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,
classOf[ParquetOutputCommitter],
classOf[ParquetOutputCommitter])
- if (conf.get("spark.sql.parquet.output.committer.class") == null) {
+ if (conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key) == null) {
logInfo("Using default output committer for Parquet: " +
classOf[ParquetOutputCommitter].getCanonicalName)
} else {