aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-04-14 14:07:25 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-04-14 14:10:15 -0700
commita76b921a923ac37d3c73ee18d24df4bb611daba3 (patch)
tree0d6e2a17e3d49ace3a9c590dc23899588c03df76 /sql
parent4d4b24927417b2c17810e94d6d46c37491c68869 (diff)
downloadspark-a76b921a923ac37d3c73ee18d24df4bb611daba3.tar.gz
spark-a76b921a923ac37d3c73ee18d24df4bb611daba3.tar.bz2
spark-a76b921a923ac37d3c73ee18d24df4bb611daba3.zip
Revert "[SPARK-6352] [SQL] Add DirectParquetOutputCommitter"
This reverts commit b29663eeea440b1d1a288d41b5ddf67e77c5bd54. I'm reverting this because it broke test compilation for the Hadoop 1.x profiles.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala66
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala22
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala21
3 files changed, 0 insertions, 109 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala
deleted file mode 100644
index 25a66cb488..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.parquet
-
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
-
-import parquet.Log
-import parquet.hadoop.util.ContextUtil
-import parquet.hadoop.{ParquetFileReader, ParquetFileWriter, ParquetOutputCommitter}
-
-private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
- extends ParquetOutputCommitter(outputPath, context) {
- val LOG = Log.getLog(classOf[ParquetOutputCommitter])
-
- override def getWorkPath(): Path = outputPath
- override def abortTask(taskContext: TaskAttemptContext): Unit = {}
- override def commitTask(taskContext: TaskAttemptContext): Unit = {}
- override def needsTaskCommit(taskContext: TaskAttemptContext): Boolean = true
- override def setupJob(jobContext: JobContext): Unit = {}
- override def setupTask(taskContext: TaskAttemptContext): Unit = {}
-
- override def commitJob(jobContext: JobContext) {
- try {
- val configuration = ContextUtil.getConfiguration(jobContext)
- val fileSystem = outputPath.getFileSystem(configuration)
- val outputStatus = fileSystem.getFileStatus(outputPath)
- val footers = ParquetFileReader.readAllFootersInParallel(configuration, outputStatus)
- try {
- ParquetFileWriter.writeMetadataFile(configuration, outputPath, footers)
- if (configuration.getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true)) {
- val successPath = new Path(outputPath, FileOutputCommitter.SUCCEEDED_FILE_NAME)
- fileSystem.create(successPath).close()
- }
- } catch {
- case e: Exception => {
- LOG.warn("could not write summary file for " + outputPath, e)
- val metadataPath = new Path(outputPath, ParquetFileWriter.PARQUET_METADATA_FILE)
- if (fileSystem.exists(metadataPath)) {
- fileSystem.delete(metadataPath, true)
- }
- }
- }
- } catch {
- case e: Exception => LOG.warn("could not write summary file for " + outputPath, e)
- }
- }
-
-}
-
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 3724bda829..1c868da23e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -379,8 +379,6 @@ private[sql] case class InsertIntoParquetTable(
*/
private[parquet] class AppendingParquetOutputFormat(offset: Int)
extends parquet.hadoop.ParquetOutputFormat[Row] {
- var committer: OutputCommitter = null
-
// override to accept existing directories as valid output directory
override def checkOutputSpecs(job: JobContext): Unit = {}
@@ -405,26 +403,6 @@ private[parquet] class AppendingParquetOutputFormat(offset: Int)
private def getTaskAttemptID(context: TaskAttemptContext): TaskAttemptID = {
context.getClass.getMethod("getTaskAttemptID").invoke(context).asInstanceOf[TaskAttemptID]
}
-
- // override to create output committer from configuration
- override def getOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
- if (committer == null) {
- val output = getOutputPath(context)
- val cls = context.getConfiguration.getClass("spark.sql.parquet.output.committer.class",
- classOf[ParquetOutputCommitter], classOf[ParquetOutputCommitter])
- val ctor = cls.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
- committer = ctor.newInstance(output, context).asInstanceOf[ParquetOutputCommitter]
- }
- committer
- }
-
- // FileOutputFormat.getOutputPath takes JobConf in hadoop-1 but JobContext in hadoop-2
- private def getOutputPath(context: TaskAttemptContext): Path = {
- context.getConfiguration().get("mapred.output.dir") match {
- case null => null
- case name => new Path(name)
- }
- }
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
index 4d0bf7cf99..97c0f439ac 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
@@ -381,27 +381,6 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
}
}
}
-
- test("SPARK-6352 DirectParquetOutputCommitter") {
- // Write to a parquet file and let it fail.
- // _temporary should be missing if direct output committer works.
- try {
- configuration.set("spark.sql.parquet.output.committer.class",
- "org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
- sqlContext.udf.register("div0", (x: Int) => x / 0)
- withTempPath { dir =>
- intercept[org.apache.spark.SparkException] {
- sqlContext.sql("select div0(1)").saveAsParquetFile(dir.getCanonicalPath)
- }
- val path = new Path(dir.getCanonicalPath, "_temporary")
- val fs = path.getFileSystem(configuration)
- assert(!fs.exists(path))
- }
- }
- finally {
- configuration.unset("spark.sql.parquet.output.committer.class")
- }
- }
}
class ParquetDataSourceOnIOSuite extends ParquetIOSuiteBase with BeforeAndAfterAll {