aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2011-07-13 23:09:33 -0400
committerMatei Zaharia <matei@eecs.berkeley.edu>2011-07-13 23:09:33 -0400
commitd0c795836476ff0188fd655f18e568bd6ae00d49 (patch)
treed665ee9351b0fb49ec7e1246b1027b1d4c0221b0 /core
parent080869c6ef9e01e192b1a7a59818a0c8bf29967d (diff)
parent9c0069188b47ee0d962ad03f8fc00733834ede8e (diff)
downloadspark-d0c795836476ff0188fd655f18e568bd6ae00d49.tar.gz
spark-d0c795836476ff0188fd655f18e568bd6ae00d49.tar.bz2
spark-d0c795836476ff0188fd655f18e568bd6ae00d49.zip
Merge branch 'master' into scala-2.9
Conflicts: core/src/main/scala/spark/HadoopFileWriter.scala
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/HadoopFileWriter.scala65
-rw-r--r--core/src/main/scala/spark/PairRDDFunctions.scala67
-rw-r--r--core/src/main/scala/spark/RDD.scala4
-rw-r--r--core/src/main/scala/spark/SequenceFileRDDFunctions.scala10
-rw-r--r--core/src/main/scala/spark/SimpleJob.scala2
-rw-r--r--core/src/test/scala/spark/FileSuite.scala100
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala7
7 files changed, 162 insertions, 93 deletions
diff --git a/core/src/main/scala/spark/HadoopFileWriter.scala b/core/src/main/scala/spark/HadoopFileWriter.scala
index c9bc88f480..596b309d34 100644
--- a/core/src/main/scala/spark/HadoopFileWriter.scala
+++ b/core/src/main/scala/spark/HadoopFileWriter.scala
@@ -15,17 +15,15 @@ import java.util.Date
import spark.SerializableWritable
import spark.Logging
-@serializable
-class HadoopFileWriter (path: String,
- keyClass: Class[_],
- valueClass: Class[_],
- outputFormatClass: Class[_ <: OutputFormat[AnyRef,AnyRef]],
- outputCommitterClass: Class[_ <: OutputCommitter],
- @transient jobConf: JobConf = null) extends Logging {
-
+/**
+ * Saves an RDD using a Hadoop OutputFormat as specified by a JobConf. The JobConf should
+ * also contain an output key class, an output value class, a filename to write to, etc
+ * exactly like in a Hadoop job.
+ */
+@serializable
+class HadoopFileWriter (@transient jobConf: JobConf) extends Logging {
private val now = new Date()
- private val conf = new SerializableWritable[JobConf](if (jobConf == null) new JobConf() else jobConf)
- private val confProvided = (jobConf != null)
+ private val conf = new SerializableWritable(jobConf)
private var jobID = 0
private var splitID = 0
@@ -38,27 +36,6 @@ class HadoopFileWriter (path: String,
@transient private var committer: OutputCommitter = null
@transient private var jobContext: JobContext = null
@transient private var taskContext: TaskAttemptContext = null
-
- def this (path: String, @transient jobConf: JobConf)
- = this (path,
- jobConf.getOutputKeyClass,
- jobConf.getOutputValueClass,
- jobConf.getOutputFormat().getClass.asInstanceOf[Class[OutputFormat[AnyRef,AnyRef]]],
- jobConf.getOutputCommitter().getClass.asInstanceOf[Class[OutputCommitter]],
- jobConf)
-
- def this (path: String,
- keyClass: Class[_],
- valueClass: Class[_],
- outputFormatClass: Class[_ <: OutputFormat[AnyRef,AnyRef]],
- outputCommitterClass: Class[_ <: OutputCommitter])
-
- = this (path,
- keyClass,
- valueClass,
- outputFormatClass,
- outputCommitterClass,
- null)
def preSetup() {
setIDs(0, 0, 0)
@@ -80,8 +57,13 @@ class HadoopFileWriter (path: String,
numfmt.setGroupingUsed(false)
val outputName = "part-" + numfmt.format(splitID)
- val fs = HadoopFileWriter.createPathFromString(path, conf.value)
- .getFileSystem(conf.value)
+ val path = FileOutputFormat.getOutputPath(conf.value)
+ val fs: FileSystem = {
+ if (path != null)
+ path.getFileSystem(conf.value)
+ else
+ FileSystem.get(conf.value)
+ }
getOutputCommitter().setupTask(getTaskContext())
writer = getOutputFormat().getRecordWriter(fs, conf.value, outputName, Reporter.NULL)
@@ -161,18 +143,6 @@ class HadoopFileWriter (path: String,
}
private def setConfParams() {
- if (!confProvided) {
- // conf.value.setOutputFormat(outputFormatClass) // Doesn't work in Scala 2.9 due to what may be a generics bug
- conf.value.set("mapred.output.format.class", outputFormatClass.getName)
- conf.value.setOutputCommitter(outputCommitterClass)
- conf.value.setOutputKeyClass(keyClass)
- conf.value.setOutputValueClass(valueClass)
- } else {
-
- }
-
- FileOutputFormat.setOutputPath(conf.value, HadoopFileWriter.createPathFromString(path, conf.value))
-
conf.value.set("mapred.job.id", jID.value.toString);
conf.value.set("mapred.tip.id", taID.value.getTaskID.toString);
conf.value.set("mapred.task.id", taID.value.toString);
@@ -198,9 +168,4 @@ object HadoopFileWriter {
outputPath = outputPath.makeQualified(fs)
return outputPath
}
-
- def getInstance[K, V, F <: OutputFormat[K,V], C <: OutputCommitter](path: String)
- (implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F], cm: ClassManifest[C]): HadoopFileWriter = {
- new HadoopFileWriter(path, km.erasure, vm.erasure, fm.erasure.asInstanceOf[Class[OutputFormat[AnyRef,AnyRef]]], cm.erasure.asInstanceOf[Class[OutputCommitter]])
- }
}
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index d179328ccf..c5ecb4c393 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -12,17 +12,18 @@ import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.Map
import scala.collection.mutable.HashMap
-import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.io.BytesWritable
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapred.FileOutputCommitter
+import org.apache.hadoop.mapred.FileOutputFormat
import org.apache.hadoop.mapred.HadoopFileWriter
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapred.OutputCommitter
import org.apache.hadoop.mapred.OutputFormat
-import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.mapred.SequenceFileOutputFormat
-import org.apache.hadoop.mapred.OutputCommitter
-import org.apache.hadoop.mapred.FileOutputCommitter
-import org.apache.hadoop.io.Writable
-import org.apache.hadoop.io.NullWritable
-import org.apache.hadoop.io.BytesWritable
-import org.apache.hadoop.io.Text
+import org.apache.hadoop.mapred.TextOutputFormat
import SparkContext._
@@ -189,34 +190,42 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) ex
(k, (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]]))
}
}
-
- def saveAsHadoopFile (path: String, jobConf: JobConf) {
- saveAsHadoopFile(path, jobConf.getOutputKeyClass, jobConf.getOutputValueClass, jobConf.getOutputFormat().getClass.asInstanceOf[Class[OutputFormat[AnyRef,AnyRef]]], jobConf.getOutputCommitter().getClass.asInstanceOf[Class[OutputCommitter]], jobConf)
- }
- def saveAsHadoopFile [F <: OutputFormat[K,V], C <: OutputCommitter] (path: String) (implicit fm: ClassManifest[F], cm: ClassManifest[C]) {
- saveAsHadoopFile(path, fm.erasure.asInstanceOf[Class[F]], cm.erasure.asInstanceOf[Class[C]])
- }
-
- def saveAsHadoopFile(path: String, outputFormatClass: Class[_ <: OutputFormat[K,V]], outputCommitterClass: Class[_ <: OutputCommitter]) {
- saveAsHadoopFile(path, implicitly[ClassManifest[K]].erasure, implicitly[ClassManifest[V]].erasure, outputFormatClass, outputCommitterClass)
+ def saveAsHadoopFile [F <: OutputFormat[K, V]] (path: String) (implicit fm: ClassManifest[F]) {
+ saveAsHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
}
- def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_,_]], outputCommitterClass: Class[_ <: OutputCommitter]) {
- saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, outputCommitterClass, null)
+ def saveAsHadoopFile(path: String,
+ keyClass: Class[_],
+ valueClass: Class[_],
+ outputFormatClass: Class[_ <: OutputFormat[_, _]],
+ conf: JobConf = new JobConf) {
+ conf.setOutputKeyClass(keyClass)
+ conf.setOutputValueClass(valueClass)
+ // conf.setOutputFormat(outputFormatClass) // Doesn't work in Scala 2.9 due to what may be a generics bug
+ conf.set("mapred.output.format.class", outputFormatClass.getName)
+ conf.setOutputCommitter(classOf[FileOutputCommitter])
+ FileOutputFormat.setOutputPath(conf, HadoopFileWriter.createPathFromString(path, conf))
+ saveAsHadoopDataset(conf)
}
- private def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_,_]], outputCommitterClass: Class[_ <: OutputCommitter], jobConf: JobConf) {
- logInfo ("Saving as hadoop file of type (" + keyClass.getSimpleName+ "," +valueClass.getSimpleName+ ")" )
- val writer = new HadoopFileWriter(path,
- keyClass,
- valueClass,
- outputFormatClass.asInstanceOf[Class[OutputFormat[AnyRef,AnyRef]]],
- outputCommitterClass.asInstanceOf[Class[OutputCommitter]],
- null)
+ def saveAsHadoopDataset(conf: JobConf) {
+ val outputFormatClass = conf.getOutputFormat
+ val keyClass = conf.getOutputKeyClass
+ val valueClass = conf.getOutputValueClass
+ if (outputFormatClass == null)
+ throw new SparkException("Output format class not set")
+ if (keyClass == null)
+ throw new SparkException("Output key class not set")
+ if (valueClass == null)
+ throw new SparkException("Output value class not set")
+
+ logInfo("Saving as hadoop file of type (" + keyClass.getSimpleName+ ", " + valueClass.getSimpleName+ ")")
+
+ val writer = new HadoopFileWriter(conf)
writer.preSetup()
- def writeToFile (context: TaskContext, iter: Iterator[(K,V)]): HadoopFileWriter = {
+ def writeToFile(context: TaskContext, iter: Iterator[(K,V)]): HadoopFileWriter = {
writer.setup(context.stageId, context.splitId, context.attemptId)
writer.open()
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 3610bb079f..fb91ffb198 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -190,11 +190,11 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
}
def saveAsTextFile(path: String) {
- this.map( x => (NullWritable.get(), new Text(x.toString))).saveAsHadoopFile[TextOutputFormat[NullWritable, Text], FileOutputCommitter](path)
+ this.map(x => (NullWritable.get(), new Text(x.toString))).saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
}
def saveAsObjectFile(path: String) {
- this.glom.map( x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))) ).saveAsSequenceFile(path)
+ this.glom.map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x)))).saveAsSequenceFile(path)
}
}
diff --git a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
index 8eb19c5436..7f591137c6 100644
--- a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
+++ b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
@@ -34,7 +34,6 @@ import SparkContext._
*/
@serializable
class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : ClassManifest](self: RDD[(K,V)]) extends Logging {
-
def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = {
val c = {
if (classOf[Writable].isAssignableFrom(classManifest[T].erasure))
@@ -55,14 +54,15 @@ class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : Cla
val convertValue = !classOf[Writable].isAssignableFrom(self.getValueClass)
logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," + valueClass.getSimpleName + ")" )
+ val format = classOf[SequenceFileOutputFormat[Writable, Writable]]
if (!convertKey && !convertValue) {
- self.saveAsHadoopFile(path, keyClass, valueClass, classOf[SequenceFileOutputFormat[Writable,Writable]], classOf[FileOutputCommitter])
+ self.saveAsHadoopFile(path, keyClass, valueClass, format)
} else if (!convertKey && convertValue) {
- self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, classOf[SequenceFileOutputFormat[Writable,Writable]], classOf[FileOutputCommitter])
+ self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format)
} else if (convertKey && !convertValue) {
- self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(path, keyClass, valueClass, classOf[SequenceFileOutputFormat[Writable,Writable]], classOf[FileOutputCommitter])
+ self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(path, keyClass, valueClass, format)
} else if (convertKey && convertValue) {
- self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, classOf[SequenceFileOutputFormat[Writable,Writable]], classOf[FileOutputCommitter])
+ self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format)
}
}
diff --git a/core/src/main/scala/spark/SimpleJob.scala b/core/src/main/scala/spark/SimpleJob.scala
index 00ff21369b..aa1610fb89 100644
--- a/core/src/main/scala/spark/SimpleJob.scala
+++ b/core/src/main/scala/spark/SimpleJob.scala
@@ -16,7 +16,7 @@ class SimpleJob(
extends Job(jobId) with Logging
{
// Maximum time to wait to run a task in a preferred location (in ms)
- val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "3000").toLong
+ val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "5000").toLong
// CPUs and memory to request per task
val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toInt
diff --git a/core/src/test/scala/spark/FileSuite.scala b/core/src/test/scala/spark/FileSuite.scala
new file mode 100644
index 0000000000..17111f7753
--- /dev/null
+++ b/core/src/test/scala/spark/FileSuite.scala
@@ -0,0 +1,100 @@
+package spark
+
+import java.io.File
+
+import scala.io.Source
+
+import com.google.common.io.Files
+import org.scalatest.FunSuite
+import org.apache.hadoop.io._
+
+import SparkContext._
+
+class FileSuite extends FunSuite {
+ test("text files") {
+ val sc = new SparkContext("local", "test")
+ val tempDir = Files.createTempDir()
+ val outputDir = new File(tempDir, "output").getAbsolutePath
+ val nums = sc.makeRDD(1 to 4)
+ nums.saveAsTextFile(outputDir)
+ // Read the plain text file and check it's OK
+ val outputFile = new File(outputDir, "part-00000")
+ val content = Source.fromFile(outputFile).mkString
+ assert(content === "1\n2\n3\n4\n")
+ // Also try reading it in as a text file RDD
+ assert(sc.textFile(outputDir).collect().toList === List("1", "2", "3", "4"))
+ sc.stop()
+ }
+
+ test("SequenceFiles") {
+ val sc = new SparkContext("local", "test")
+ val tempDir = Files.createTempDir()
+ val outputDir = new File(tempDir, "output").getAbsolutePath
+ val nums = sc.makeRDD(1 to 3).map(x => (x, "a" * x)) // (1,a), (2,aa), (3,aaa)
+ nums.saveAsSequenceFile(outputDir)
+ // Try reading the output back as a SequenceFile
+ val output = sc.sequenceFile[IntWritable, Text](outputDir)
+ assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
+ sc.stop()
+ }
+
+ test("SequenceFile with writable key") {
+ val sc = new SparkContext("local", "test")
+ val tempDir = Files.createTempDir()
+ val outputDir = new File(tempDir, "output").getAbsolutePath
+ val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), "a" * x))
+ nums.saveAsSequenceFile(outputDir)
+ // Try reading the output back as a SequenceFile
+ val output = sc.sequenceFile[IntWritable, Text](outputDir)
+ assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
+ sc.stop()
+ }
+
+ test("SequenceFile with writable value") {
+ val sc = new SparkContext("local", "test")
+ val tempDir = Files.createTempDir()
+ val outputDir = new File(tempDir, "output").getAbsolutePath
+ val nums = sc.makeRDD(1 to 3).map(x => (x, new Text("a" * x)))
+ nums.saveAsSequenceFile(outputDir)
+ // Try reading the output back as a SequenceFile
+ val output = sc.sequenceFile[IntWritable, Text](outputDir)
+ assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
+ sc.stop()
+ }
+
+ test("SequenceFile with writable key and value") {
+ val sc = new SparkContext("local", "test")
+ val tempDir = Files.createTempDir()
+ val outputDir = new File(tempDir, "output").getAbsolutePath
+ val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x)))
+ nums.saveAsSequenceFile(outputDir)
+ // Try reading the output back as a SequenceFile
+ val output = sc.sequenceFile[IntWritable, Text](outputDir)
+ assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
+ sc.stop()
+ }
+
+ test("object files of ints") {
+ val sc = new SparkContext("local", "test")
+ val tempDir = Files.createTempDir()
+ val outputDir = new File(tempDir, "output").getAbsolutePath
+ val nums = sc.makeRDD(1 to 4)
+ nums.saveAsObjectFile(outputDir)
+ // Try reading the output back as an object file
+ val output = sc.objectFile[Int](outputDir)
+ assert(output.collect().toList === List(1, 2, 3, 4))
+ sc.stop()
+ }
+
+ test("object files of complex types") {
+ val sc = new SparkContext("local", "test")
+ val tempDir = Files.createTempDir()
+ val outputDir = new File(tempDir, "output").getAbsolutePath
+ val nums = sc.makeRDD(1 to 3).map(x => (x, "a" * x))
+ nums.saveAsObjectFile(outputDir)
+ // Try reading the output back as an object file
+ val output = sc.objectFile[(Int, String)](outputDir)
+ assert(output.collect().toList === List((1, "a"), (2, "aa"), (3, "aaa")))
+ sc.stop()
+ }
+}
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index d31fdb7f8a..06d438d9e2 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -1,17 +1,12 @@
package spark
import org.scalatest.FunSuite
-import org.scalatest.prop.Checkers
-import org.scalacheck.Arbitrary._
-import org.scalacheck.Gen
-import org.scalacheck.Prop._
import SparkContext._
-import scala.collection.mutable.ArrayBuffer
class RDDSuite extends FunSuite {
test("basic operations") {
val sc = new SparkContext("local", "test")
- val nums = sc.parallelize(Array(1, 2, 3, 4), 2)
+ val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
assert(nums.collect().toList === List(1, 2, 3, 4))
assert(nums.reduce(_ + _) === 10)
assert(nums.map(_.toString).collect().toList === List("1", "2", "3", "4"))