aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-03-31 07:48:37 +0800
committerCheng Lian <lian@databricks.com>2015-03-31 07:48:37 +0800
commitfde6945417355ae57500b67d034c9cad4f20d240 (patch)
tree94b8c986ac5174412e5055b60dd6f0dea760d871 /sql
parentf76d2e55b1a67bf5576e1aa001a0b872b9b3895a (diff)
downloadspark-fde6945417355ae57500b67d034c9cad4f20d240.tar.gz
spark-fde6945417355ae57500b67d034c9cad4f20d240.tar.bz2
spark-fde6945417355ae57500b67d034c9cad4f20d240.zip
[SPARK-6369] [SQL] Uses commit coordinator to help committing Hive and Parquet tables
This PR leverages the output commit coordinator introduced in #4066 to help committing Hive and Parquet tables. This PR extracts output commit code in `SparkHadoopWriter.commit` to `SparkHadoopMapRedUtil.commitTask`, and reuses it for committing Parquet and Hive tables on executor side. TODO - [ ] Add tests <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5139) <!-- Reviewable:end --> Author: Cheng Lian <lian@databricks.com> Closes #5139 from liancheng/spark-6369 and squashes the following commits: 72eb628 [Cheng Lian] Fixes typo in javadoc 9a4b82b [Cheng Lian] Adds javadoc and addresses @aarondav's comments dfdf3ef [Cheng Lian] Uses commit coordinator to help committing Hive and Parquet tables
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala1
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala17
4 files changed, 11 insertions, 22 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 5130d8ad5e..1c868da23e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -19,10 +19,9 @@ package org.apache.spark.sql.parquet
import java.io.IOException
import java.lang.{Long => JLong}
-import java.text.SimpleDateFormat
-import java.text.NumberFormat
+import java.text.{NumberFormat, SimpleDateFormat}
import java.util.concurrent.{Callable, TimeUnit}
-import java.util.{ArrayList, Collections, Date, List => JList}
+import java.util.{Date, List => JList}
import scala.collection.JavaConversions._
import scala.collection.mutable
@@ -43,12 +42,13 @@ import parquet.io.ParquetDecodingException
import parquet.schema.MessageType
import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row, _}
import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
-import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.types.StructType
import org.apache.spark.{Logging, SerializableWritable, TaskContext}
/**
@@ -356,7 +356,7 @@ private[sql] case class InsertIntoParquetTable(
} finally {
writer.close(hadoopContext)
}
- committer.commitTask(hadoopContext)
+ SparkHadoopMapRedUtil.commitTask(committer, hadoopContext, context)
1
}
val jobFormat = new AppendingParquetOutputFormat(taskIdOffset)
@@ -512,6 +512,7 @@ private[parquet] class FilteringParquetRowInputFormat
import parquet.filter2.compat.FilterCompat.Filter
import parquet.filter2.compat.RowGroupFilter
+
import org.apache.spark.sql.parquet.FilteringParquetRowInputFormat.blockLocationCache
val cacheMetadata = configuration.getBoolean(SQLConf.PARQUET_CACHE_METADATA, true)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 53f765ee26..19800ad88c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -42,6 +42,7 @@ import parquet.hadoop.{ParquetInputFormat, _}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.rdd.{NewHadoopPartition, NewHadoopRDD, RDD}
import org.apache.spark.sql.catalyst.expressions
@@ -669,7 +670,8 @@ private[sql] case class ParquetRelation2(
} finally {
writer.close(hadoopContext)
}
- committer.commitTask(hadoopContext)
+
+ SparkHadoopMapRedUtil.commitTask(committer, hadoopContext, context)
}
val jobFormat = new AppendingParquetOutputFormat(taskIdOffset)
/* apparently we need a TaskAttemptID to construct an OutputCommitter;
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index da53d30354..cdf012b511 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -72,7 +72,6 @@ case class InsertIntoHiveTable(
val outputFileFormatClassName = fileSinkConf.getTableInfo.getOutputFileFormatClassName
assert(outputFileFormatClassName != null, "Output format class not set")
conf.value.set("mapred.output.format.class", outputFileFormatClassName)
- conf.value.setOutputCommitter(classOf[FileOutputCommitter])
FileOutputFormat.setOutputPath(
conf.value,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index ba2bf67aed..8398da2681 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.hive
-import java.io.IOException
import java.text.NumberFormat
import java.util.Date
@@ -118,19 +117,7 @@ private[hive] class SparkHiveWriterContainer(
}
protected def commit() {
- if (committer.needsTaskCommit(taskContext)) {
- try {
- committer.commitTask(taskContext)
- logInfo (taID + ": Committed")
- } catch {
- case e: IOException =>
- logError("Error committing the output of task: " + taID.value, e)
- committer.abortTask(taskContext)
- throw e
- }
- } else {
- logInfo("No need to commit output of task: " + taID.value)
- }
+ SparkHadoopMapRedUtil.commitTask(committer, taskContext, jobID, splitID, attemptID)
}
private def setIDs(jobId: Int, splitId: Int, attemptId: Int) {
@@ -213,7 +200,7 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
.zip(row.toSeq.takeRight(dynamicPartColNames.length))
.map { case (col, rawVal) =>
val string = if (rawVal == null) null else String.valueOf(rawVal)
- val colString =
+ val colString =
if (string == null || string.isEmpty) {
defaultPartName
} else {