aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMridul Muralidharan <mridul@gmail.com>2013-04-15 19:16:36 +0530
committerMridul Muralidharan <mridul@gmail.com>2013-04-15 19:16:36 +0530
commit19652a44be81f3b8fbbb9ecc4987dcd933d2eca9 (patch)
tree4093b69770afd410f3b0bfa8ae502bc8a2ff64d9 /core
parent54b3d45b816f26a9d3509c1f8bea70c6d99d3de0 (diff)
downloadspark-19652a44be81f3b8fbbb9ecc4987dcd933d2eca9.tar.gz
spark-19652a44be81f3b8fbbb9ecc4987dcd933d2eca9.tar.bz2
spark-19652a44be81f3b8fbbb9ecc4987dcd933d2eca9.zip
Fix issue with FileSuite failing
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/HadoopWriter.scala22
-rw-r--r--core/src/main/scala/spark/PairRDDFunctions.scala1
-rw-r--r--core/src/test/scala/spark/FileSuite.scala1
3 files changed, 7 insertions, 17 deletions
diff --git a/core/src/main/scala/spark/HadoopWriter.scala b/core/src/main/scala/spark/HadoopWriter.scala
index 80421b6328..5e8396edb9 100644
--- a/core/src/main/scala/spark/HadoopWriter.scala
+++ b/core/src/main/scala/spark/HadoopWriter.scala
@@ -2,14 +2,10 @@ package org.apache.hadoop.mapred
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
-import org.apache.hadoop.util.ReflectionUtils
-import org.apache.hadoop.io.NullWritable
-import org.apache.hadoop.io.Text
import java.text.SimpleDateFormat
import java.text.NumberFormat
import java.io.IOException
-import java.net.URI
import java.util.Date
import spark.Logging
@@ -25,8 +21,6 @@ import spark.SerializableWritable
*/
class HadoopWriter(@transient jobConf: JobConf) extends Logging with HadoopMapRedUtil with Serializable {
- println("Created HadoopWriter")
-
private val now = new Date()
private val conf = new SerializableWritable(jobConf)
@@ -43,7 +37,6 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with HadoopMapRe
@transient private var taskContext: TaskAttemptContext = null
def preSetup() {
- println("preSetup")
setIDs(0, 0, 0)
setConfParams()
@@ -53,20 +46,17 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with HadoopMapRe
def setup(jobid: Int, splitid: Int, attemptid: Int) {
- println("setup")
setIDs(jobid, splitid, attemptid)
setConfParams()
}
def open() {
- println("open")
val numfmt = NumberFormat.getInstance()
numfmt.setMinimumIntegerDigits(5)
numfmt.setGroupingUsed(false)
val outputName = "part-" + numfmt.format(splitID)
val path = FileOutputFormat.getOutputPath(conf.value)
- println("open outputName = " + outputName + ", fs for " + conf.value)
val fs: FileSystem = {
if (path != null) {
path.getFileSystem(conf.value)
@@ -81,7 +71,6 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with HadoopMapRe
}
def write(key: AnyRef, value: AnyRef) {
- println("write " + key + " = " + value)
if (writer!=null) {
//println (">>> Writing ("+key.toString+": " + key.getClass.toString + ", " + value.toString + ": " + value.getClass.toString + ")")
writer.write(key, value)
@@ -91,19 +80,16 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with HadoopMapRe
}
def close() {
- println("close")
writer.close(Reporter.NULL)
}
def commit() {
- println("commit")
val taCtxt = getTaskContext()
val cmtr = getOutputCommitter()
if (cmtr.needsTaskCommit(taCtxt)) {
try {
cmtr.commitTask(taCtxt)
logInfo (taID + ": Committed")
- println("Committed = " + taID)
} catch {
case e: IOException => {
logError("Error committing the output of task: " + taID.value, e)
@@ -112,13 +98,17 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with HadoopMapRe
}
}
} else {
- println("No need to commit")
logWarning ("No need to commit output of task: " + taID.value)
}
}
+ def commitJob() {
+ // always ? Or if cmtr.needsTaskCommit ?
+ val cmtr = getOutputCommitter()
+ cmtr.commitJob(getJobContext())
+ }
+
def cleanup() {
- println("cleanup")
getOutputCommitter().cleanupJob(getJobContext())
}
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index 39469fa3c8..9a6966b3f1 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -636,6 +636,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
}
self.context.runJob(self, writeToFile _)
+ writer.commitJob()
writer.cleanup()
}
diff --git a/core/src/test/scala/spark/FileSuite.scala b/core/src/test/scala/spark/FileSuite.scala
index a3840905f4..91b48c7456 100644
--- a/core/src/test/scala/spark/FileSuite.scala
+++ b/core/src/test/scala/spark/FileSuite.scala
@@ -18,7 +18,6 @@ class FileSuite extends FunSuite with LocalSparkContext {
val outputDir = new File(tempDir, "output").getAbsolutePath
val nums = sc.makeRDD(1 to 4)
nums.saveAsTextFile(outputDir)
- println("outputDir = " + outputDir)
// Read the plain text file and check it's OK
val outputFile = new File(outputDir, "part-00000")
val content = Source.fromFile(outputFile).mkString