diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2011-07-13 23:04:06 -0400 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2011-07-13 23:04:06 -0400 |
commit | 9c0069188b47ee0d962ad03f8fc00733834ede8e (patch) | |
tree | 3ee4aa2903218536d892a3e4ca691e57d6164297 /core | |
parent | da8a3b89268c08402c4b1b6f00383063e8856d00 (diff) | |
download | spark-9c0069188b47ee0d962ad03f8fc00733834ede8e.tar.gz spark-9c0069188b47ee0d962ad03f8fc00733834ede8e.tar.bz2 spark-9c0069188b47ee0d962ad03f8fc00733834ede8e.zip |
Updated save code to allow non-file-based OutputFormats and added a test
for file-related stuff
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/HadoopFileWriter.scala | 64 | ||||
-rw-r--r-- | core/src/main/scala/spark/PairRDDFunctions.scala | 66 | ||||
-rw-r--r-- | core/src/main/scala/spark/RDD.scala | 4 | ||||
-rw-r--r-- | core/src/main/scala/spark/SequenceFileRDDFunctions.scala | 10 | ||||
-rw-r--r-- | core/src/test/scala/spark/FileSuite.scala | 100 | ||||
-rw-r--r-- | core/src/test/scala/spark/RDDSuite.scala | 7 |
6 files changed, 160 insertions, 91 deletions
diff --git a/core/src/main/scala/spark/HadoopFileWriter.scala b/core/src/main/scala/spark/HadoopFileWriter.scala index 3a4cd8464c..596b309d34 100644 --- a/core/src/main/scala/spark/HadoopFileWriter.scala +++ b/core/src/main/scala/spark/HadoopFileWriter.scala @@ -15,17 +15,15 @@ import java.util.Date import spark.SerializableWritable import spark.Logging -@serializable -class HadoopFileWriter (path: String, - keyClass: Class[_], - valueClass: Class[_], - outputFormatClass: Class[_ <: OutputFormat[AnyRef,AnyRef]], - outputCommitterClass: Class[_ <: OutputCommitter], - @transient jobConf: JobConf = null) extends Logging { - +/** + * Saves an RDD using a Hadoop OutputFormat as specified by a JobConf. The JobConf should + * also contain an output key class, an output value class, a filename to write to, etc + * exactly like in a Hadoop job. + */ +@serializable +class HadoopFileWriter (@transient jobConf: JobConf) extends Logging { private val now = new Date() - private val conf = new SerializableWritable[JobConf](if (jobConf == null) new JobConf() else jobConf) - private val confProvided = (jobConf != null) + private val conf = new SerializableWritable(jobConf) private var jobID = 0 private var splitID = 0 @@ -38,27 +36,6 @@ class HadoopFileWriter (path: String, @transient private var committer: OutputCommitter = null @transient private var jobContext: JobContext = null @transient private var taskContext: TaskAttemptContext = null - - def this (path: String, @transient jobConf: JobConf) - = this (path, - jobConf.getOutputKeyClass, - jobConf.getOutputValueClass, - jobConf.getOutputFormat().getClass.asInstanceOf[Class[OutputFormat[AnyRef,AnyRef]]], - jobConf.getOutputCommitter().getClass.asInstanceOf[Class[OutputCommitter]], - jobConf) - - def this (path: String, - keyClass: Class[_], - valueClass: Class[_], - outputFormatClass: Class[_ <: OutputFormat[AnyRef,AnyRef]], - outputCommitterClass: Class[_ <: OutputCommitter]) - - = this (path, - keyClass, - valueClass, - outputFormatClass, - outputCommitterClass, - null) def preSetup() { setIDs(0, 0, 0) @@ -80,8 +57,13 @@ class HadoopFileWriter (path: String, numfmt.setGroupingUsed(false) val outputName = "part-" + numfmt.format(splitID) - val fs = HadoopFileWriter.createPathFromString(path, conf.value) - .getFileSystem(conf.value) + val path = FileOutputFormat.getOutputPath(conf.value) + val fs: FileSystem = { + if (path != null) + path.getFileSystem(conf.value) + else + FileSystem.get(conf.value) + } getOutputCommitter().setupTask(getTaskContext()) writer = getOutputFormat().getRecordWriter(fs, conf.value, outputName, Reporter.NULL) @@ -161,17 +143,6 @@ class HadoopFileWriter (path: String, } private def setConfParams() { - if (!confProvided) { - conf.value.setOutputFormat(outputFormatClass) - conf.value.setOutputCommitter(outputCommitterClass) - conf.value.setOutputKeyClass(keyClass) - conf.value.setOutputValueClass(valueClass) - } else { - - } - - FileOutputFormat.setOutputPath(conf.value, HadoopFileWriter.createPathFromString(path, conf.value)) - conf.value.set("mapred.job.id", jID.value.toString); conf.value.set("mapred.tip.id", taID.value.getTaskID.toString); conf.value.set("mapred.task.id", taID.value.toString); @@ -197,9 +168,4 @@ object HadoopFileWriter { outputPath = outputPath.makeQualified(fs) return outputPath } - - def getInstance[K, V, F <: OutputFormat[K,V], C <: OutputCommitter](path: String) - (implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F], cm: ClassManifest[C]): HadoopFileWriter = { - new HadoopFileWriter(path, km.erasure, vm.erasure, fm.erasure.asInstanceOf[Class[OutputFormat[AnyRef,AnyRef]]], cm.erasure.asInstanceOf[Class[OutputCommitter]]) - } } diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index d179328ccf..f24b999e2a 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -12,17 +12,18 @@ import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.Map import scala.collection.mutable.HashMap -import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.io.BytesWritable +import org.apache.hadoop.io.NullWritable +import org.apache.hadoop.io.Text +import org.apache.hadoop.io.Writable +import org.apache.hadoop.mapred.FileOutputCommitter +import org.apache.hadoop.mapred.FileOutputFormat import org.apache.hadoop.mapred.HadoopFileWriter +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapred.OutputCommitter import org.apache.hadoop.mapred.OutputFormat -import org.apache.hadoop.mapred.TextOutputFormat import org.apache.hadoop.mapred.SequenceFileOutputFormat -import org.apache.hadoop.mapred.OutputCommitter -import org.apache.hadoop.mapred.FileOutputCommitter -import org.apache.hadoop.io.Writable -import org.apache.hadoop.io.NullWritable -import org.apache.hadoop.io.BytesWritable -import org.apache.hadoop.io.Text +import org.apache.hadoop.mapred.TextOutputFormat import SparkContext._ @@ -189,34 +190,41 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) ex (k, (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]])) } } - - def saveAsHadoopFile (path: String, jobConf: JobConf) { - saveAsHadoopFile(path, jobConf.getOutputKeyClass, jobConf.getOutputValueClass, jobConf.getOutputFormat().getClass.asInstanceOf[Class[OutputFormat[AnyRef,AnyRef]]], jobConf.getOutputCommitter().getClass.asInstanceOf[Class[OutputCommitter]], jobConf) - } - def saveAsHadoopFile [F <: OutputFormat[K,V], C <: OutputCommitter] (path: String) (implicit fm: ClassManifest[F], cm: ClassManifest[C]) { - saveAsHadoopFile(path, fm.erasure.asInstanceOf[Class[F]], cm.erasure.asInstanceOf[Class[C]]) - } - - def saveAsHadoopFile(path: String, outputFormatClass: Class[_ <: OutputFormat[K,V]], outputCommitterClass: Class[_ <: OutputCommitter]) { - saveAsHadoopFile(path, implicitly[ClassManifest[K]].erasure, implicitly[ClassManifest[V]].erasure, outputFormatClass, outputCommitterClass) + def saveAsHadoopFile [F <: OutputFormat[K, V]] (path: String) (implicit fm: ClassManifest[F]) { + saveAsHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]]) } - def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_,_]], outputCommitterClass: Class[_ <: OutputCommitter]) { - saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, outputCommitterClass, null) + def saveAsHadoopFile(path: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: OutputFormat[_, _]], + conf: JobConf = new JobConf) { + conf.setOutputKeyClass(keyClass) + conf.setOutputValueClass(valueClass) + conf.setOutputFormat(outputFormatClass) + conf.setOutputCommitter(classOf[FileOutputCommitter]) + FileOutputFormat.setOutputPath(conf, HadoopFileWriter.createPathFromString(path, conf)) + saveAsHadoopDataset(conf) } - private def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_,_]], outputCommitterClass: Class[_ <: OutputCommitter], jobConf: JobConf) { - logInfo ("Saving as hadoop file of type (" + keyClass.getSimpleName+ "," +valueClass.getSimpleName+ ")" ) - val writer = new HadoopFileWriter(path, - keyClass, - valueClass, - outputFormatClass.asInstanceOf[Class[OutputFormat[AnyRef,AnyRef]]], - outputCommitterClass.asInstanceOf[Class[OutputCommitter]], - null) + def saveAsHadoopDataset(conf: JobConf) { + val outputFormatClass = conf.getOutputFormat + val keyClass = conf.getOutputKeyClass + val valueClass = conf.getOutputValueClass + if (outputFormatClass == null) + throw new SparkException("Output format class not set") + if (keyClass == null) + throw new SparkException("Output key class not set") + if (valueClass == null) + throw new SparkException("Output value class not set") + + logInfo("Saving as hadoop file of type (" + keyClass.getSimpleName+ ", " + valueClass.getSimpleName+ ")") + + val writer = new HadoopFileWriter(conf) writer.preSetup() - def writeToFile (context: TaskContext, iter: Iterator[(K,V)]): HadoopFileWriter = { + def writeToFile(context: TaskContext, iter: Iterator[(K,V)]): HadoopFileWriter = { writer.setup(context.stageId, context.splitId, context.attemptId) writer.open() diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 624e56582d..2cddb5db9e 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -190,11 +190,11 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) { } def saveAsTextFile(path: String) { - this.map( x => (NullWritable.get(), new Text(x.toString))).saveAsHadoopFile[TextOutputFormat[NullWritable, Text], FileOutputCommitter](path) + this.map(x => (NullWritable.get(), new Text(x.toString))).saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path) } def saveAsObjectFile(path: String) { - this.glom.map( x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))) ).saveAsSequenceFile(path) + this.glom.map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x)))).saveAsSequenceFile(path) } } diff --git a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala index 8eb19c5436..7f591137c6 100644 --- a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala +++ b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala @@ -34,7 +34,6 @@ import SparkContext._ */ @serializable class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : ClassManifest](self: RDD[(K,V)]) extends Logging { - def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = { val c = { if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) @@ -55,14 +54,15 @@ class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : Cla val convertValue = !classOf[Writable].isAssignableFrom(self.getValueClass) logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," + valueClass.getSimpleName + ")" ) + val format = classOf[SequenceFileOutputFormat[Writable, Writable]] if (!convertKey && !convertValue) { - self.saveAsHadoopFile(path, keyClass, valueClass, classOf[SequenceFileOutputFormat[Writable,Writable]], classOf[FileOutputCommitter]) + self.saveAsHadoopFile(path, keyClass, valueClass, format) } else if (!convertKey && convertValue) { - self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, classOf[SequenceFileOutputFormat[Writable,Writable]], classOf[FileOutputCommitter]) + self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format) } else if (convertKey && !convertValue) { - self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(path, keyClass, valueClass, classOf[SequenceFileOutputFormat[Writable,Writable]], classOf[FileOutputCommitter]) + self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(path, keyClass, valueClass, format) } else if (convertKey && convertValue) { - self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, classOf[SequenceFileOutputFormat[Writable,Writable]], classOf[FileOutputCommitter]) + self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format) } } diff --git a/core/src/test/scala/spark/FileSuite.scala b/core/src/test/scala/spark/FileSuite.scala new file mode 100644 index 0000000000..17111f7753 --- /dev/null +++ b/core/src/test/scala/spark/FileSuite.scala @@ -0,0 +1,100 @@ +package spark + +import java.io.File + +import scala.io.Source + +import com.google.common.io.Files +import org.scalatest.FunSuite +import org.apache.hadoop.io._ + +import SparkContext._ + +class FileSuite extends FunSuite { + test("text files") { + val sc = new SparkContext("local", "test") + val tempDir = Files.createTempDir() + val outputDir = new File(tempDir, "output").getAbsolutePath + val nums = sc.makeRDD(1 to 4) + nums.saveAsTextFile(outputDir) + // Read the plain text file and check it's OK + val outputFile = new File(outputDir, "part-00000") + val content = Source.fromFile(outputFile).mkString + assert(content === "1\n2\n3\n4\n") + // Also try reading it in as a text file RDD + assert(sc.textFile(outputDir).collect().toList === List("1", "2", "3", "4")) + sc.stop() + } + + test("SequenceFiles") { + val sc = new SparkContext("local", "test") + val tempDir = Files.createTempDir() + val outputDir = new File(tempDir, "output").getAbsolutePath + val nums = sc.makeRDD(1 to 3).map(x => (x, "a" * x)) // (1,a), (2,aa), (3,aaa) + nums.saveAsSequenceFile(outputDir) + // Try reading the output back as a SequenceFile + val output = sc.sequenceFile[IntWritable, Text](outputDir) + assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)")) + sc.stop() + } + + test("SequenceFile with writable key") { + val sc = new SparkContext("local", "test") + val tempDir = Files.createTempDir() + val outputDir = new File(tempDir, "output").getAbsolutePath + val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), "a" * x)) + nums.saveAsSequenceFile(outputDir) + // Try reading the output back as a SequenceFile + val output = sc.sequenceFile[IntWritable, Text](outputDir) + assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)")) + sc.stop() + } + + test("SequenceFile with writable value") { + val sc = new SparkContext("local", "test") + val tempDir = Files.createTempDir() + val outputDir = new File(tempDir, "output").getAbsolutePath + val nums = sc.makeRDD(1 to 3).map(x => (x, new Text("a" * x))) + nums.saveAsSequenceFile(outputDir) + // Try reading the output back as a SequenceFile + val output = sc.sequenceFile[IntWritable, Text](outputDir) + assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)")) + sc.stop() + } + + test("SequenceFile with writable key and value") { + val sc = new SparkContext("local", "test") + val tempDir = Files.createTempDir() + val outputDir = new File(tempDir, "output").getAbsolutePath + val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x))) + nums.saveAsSequenceFile(outputDir) + // Try reading the output back as a SequenceFile + val output = sc.sequenceFile[IntWritable, Text](outputDir) + assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)")) + sc.stop() + } + + test("object files of ints") { + val sc = new SparkContext("local", "test") + val tempDir = Files.createTempDir() + val outputDir = new File(tempDir, "output").getAbsolutePath + val nums = sc.makeRDD(1 to 4) + nums.saveAsObjectFile(outputDir) + // Try reading the output back as an object file + val output = sc.objectFile[Int](outputDir) + assert(output.collect().toList === List(1, 2, 3, 4)) + sc.stop() + } + + test("object files of complex types") { + val sc = new SparkContext("local", "test") + val tempDir = Files.createTempDir() + val outputDir = new File(tempDir, "output").getAbsolutePath + val nums = sc.makeRDD(1 to 3).map(x => (x, "a" * x)) + nums.saveAsObjectFile(outputDir) + // Try reading the output back as an object file + val output = sc.objectFile[(Int, String)](outputDir) + assert(output.collect().toList === List((1, "a"), (2, "aa"), (3, "aaa"))) + sc.stop() + } +} diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index d31fdb7f8a..06d438d9e2 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -1,17 +1,12 @@ package spark import org.scalatest.FunSuite -import org.scalatest.prop.Checkers -import org.scalacheck.Arbitrary._ -import org.scalacheck.Gen -import org.scalacheck.Prop._ import SparkContext._ -import scala.collection.mutable.ArrayBuffer class RDDSuite extends FunSuite { test("basic operations") { val sc = new SparkContext("local", "test") - val nums = sc.parallelize(Array(1, 2, 3, 4), 2) + val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) assert(nums.collect().toList === List(1, 2, 3, 4)) assert(nums.reduce(_ + _) === 10) assert(nums.map(_.toString).collect().toList === List("1", "2", "3", "4")) |