aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-09-21 13:07:20 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-09-21 13:07:20 -0700
commitf5bf7dedb1a29a2949caeb7d43a0eb43af873779 (patch)
tree44e1cbca4e147849e5e8030299b66dfed164cc63 /core/src
parent7a766577a466377bf504fa2d8c3ca454844a6ea6 (diff)
downloadspark-f5bf7dedb1a29a2949caeb7d43a0eb43af873779.tar.gz
spark-f5bf7dedb1a29a2949caeb7d43a0eb43af873779.tar.bz2
spark-f5bf7dedb1a29a2949caeb7d43a0eb43af873779.zip
Revert "[SPARK-3595] Respect configured OutputCommitters when calling saveAsHadoopFile"
This reverts commit 7a766577a466377bf504fa2d8c3ca454844a6ea6. [NOTE: After some thought I decided not to merge this into 1.1 quite yet]
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala107
3 files changed, 25 insertions, 91 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
index 376e69cd99..f6703986bd 100644
--- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -116,7 +116,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
}
}
} else {
- logInfo ("No need to commit output of task: " + taID.value)
+ logWarning ("No need to commit output of task: " + taID.value)
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 51ba8c2d17..f6d9d12fe9 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -872,12 +872,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName)
hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
}
-
- // Use configured output committer if already set
- if (conf.getOutputCommitter == null) {
- hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
- }
-
+ hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
FileOutputFormat.setOutputPath(hadoopConf,
SparkHadoopWriter.createPathFromString(path, hadoopConf))
saveAsHadoopDataset(hadoopConf)
diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index e84cc69592..63d3ddb4af 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -17,21 +17,17 @@
package org.apache.spark.rdd
-import org.apache.hadoop.fs.FileSystem
-import org.apache.hadoop.mapred._
-import org.apache.hadoop.util.Progressable
-
-import scala.collection.mutable.{ArrayBuffer, HashSet}
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashSet
import scala.util.Random
+import org.scalatest.FunSuite
import com.google.common.io.Files
-import org.apache.hadoop.conf.{Configurable, Configuration}
-import org.apache.hadoop.mapreduce.{JobContext => NewJobContext, OutputCommitter => NewOutputCommitter,
-OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter,
-TaskAttemptContext => NewTaskAttempContext}
-import org.apache.spark.{Partitioner, SharedSparkContext}
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.conf.{Configuration, Configurable}
+
import org.apache.spark.SparkContext._
-import org.scalatest.FunSuite
+import org.apache.spark.{Partitioner, SharedSparkContext}
class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
test("aggregateByKey") {
@@ -471,7 +467,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
val pairs = sc.parallelize(Array((new Integer(1), new Integer(1))))
// No error, non-configurable formats still work
- pairs.saveAsNewAPIHadoopFile[NewFakeFormat]("ignored")
+ pairs.saveAsNewAPIHadoopFile[FakeFormat]("ignored")
/*
Check that configurable formats get configured:
@@ -482,17 +478,6 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
pairs.saveAsNewAPIHadoopFile[ConfigTestFormat]("ignored")
}
- test("saveAsHadoopFile should respect configured output committers") {
- val pairs = sc.parallelize(Array((new Integer(1), new Integer(1))))
- val conf = new JobConf()
- conf.setOutputCommitter(classOf[FakeOutputCommitter])
-
- FakeOutputCommitter.ran = false
- pairs.saveAsHadoopFile("ignored", pairs.keyClass, pairs.valueClass, classOf[FakeOutputFormat], conf)
-
- assert(FakeOutputCommitter.ran, "OutputCommitter was never called")
- }
-
test("lookup") {
val pairs = sc.parallelize(Array((1,2), (3,4), (5,6), (5,7)))
@@ -636,86 +621,40 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
and the test will therefore throw InstantiationException when saveAsNewAPIHadoopFile
tries to instantiate them with Class.newInstance.
*/
-
-/*
- * Original Hadoop API
- */
class FakeWriter extends RecordWriter[Integer, Integer] {
- override def write(key: Integer, value: Integer): Unit = ()
- override def close(reporter: Reporter): Unit = ()
-}
-
-class FakeOutputCommitter() extends OutputCommitter() {
- override def setupJob(jobContext: JobContext): Unit = ()
-
- override def needsTaskCommit(taskContext: TaskAttemptContext): Boolean = true
-
- override def setupTask(taskContext: TaskAttemptContext): Unit = ()
-
- override def commitTask(taskContext: TaskAttemptContext): Unit = {
- FakeOutputCommitter.ran = true
- ()
- }
-
- override def abortTask(taskContext: TaskAttemptContext): Unit = ()
-}
-
-/*
- * Used to communicate state between the test harness and the OutputCommitter.
- */
-object FakeOutputCommitter {
- var ran = false
-}
-
-class FakeOutputFormat() extends OutputFormat[Integer, Integer]() {
- override def getRecordWriter(
- ignored: FileSystem,
- job: JobConf, name: String,
- progress: Progressable): RecordWriter[Integer, Integer] = {
- new FakeWriter()
- }
-
- override def checkOutputSpecs(ignored: FileSystem, job: JobConf): Unit = ()
-}
-
-/*
- * New-style Hadoop API
- */
-class NewFakeWriter extends NewRecordWriter[Integer, Integer] {
-
- def close(p1: NewTaskAttempContext) = ()
+ def close(p1: TaskAttemptContext) = ()
def write(p1: Integer, p2: Integer) = ()
}
-class NewFakeCommitter extends NewOutputCommitter {
- def setupJob(p1: NewJobContext) = ()
+class FakeCommitter extends OutputCommitter {
+ def setupJob(p1: JobContext) = ()
- def needsTaskCommit(p1: NewTaskAttempContext): Boolean = false
+ def needsTaskCommit(p1: TaskAttemptContext): Boolean = false
- def setupTask(p1: NewTaskAttempContext) = ()
+ def setupTask(p1: TaskAttemptContext) = ()
- def commitTask(p1: NewTaskAttempContext) = ()
+ def commitTask(p1: TaskAttemptContext) = ()
- def abortTask(p1: NewTaskAttempContext) = ()
+ def abortTask(p1: TaskAttemptContext) = ()
}
-class NewFakeFormat() extends NewOutputFormat[Integer, Integer]() {
+class FakeFormat() extends OutputFormat[Integer, Integer]() {
- def checkOutputSpecs(p1: NewJobContext) = ()
+ def checkOutputSpecs(p1: JobContext) = ()
- def getRecordWriter(p1: NewTaskAttempContext): NewRecordWriter[Integer, Integer] = {
- new NewFakeWriter()
+ def getRecordWriter(p1: TaskAttemptContext): RecordWriter[Integer, Integer] = {
+ new FakeWriter()
}
- def getOutputCommitter(p1: NewTaskAttempContext): NewOutputCommitter = {
- new NewFakeCommitter()
+ def getOutputCommitter(p1: TaskAttemptContext): OutputCommitter = {
+ new FakeCommitter()
}
}
-class ConfigTestFormat() extends NewFakeFormat() with Configurable {
+class ConfigTestFormat() extends FakeFormat() with Configurable {
var setConfCalled = false
def setConf(p1: Configuration) = {
@@ -725,7 +664,7 @@ class ConfigTestFormat() extends NewFakeFormat() with Configurable {
def getConf: Configuration = null
- override def getRecordWriter(p1: NewTaskAttempContext): NewRecordWriter[Integer, Integer] = {
+ override def getRecordWriter(p1: TaskAttemptContext): RecordWriter[Integer, Integer] = {
assert(setConfCalled, "setConf was never called")
super.getRecordWriter(p1)
}