aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorCodingCat <zhunansjtu@gmail.com>2014-03-18 11:06:18 -0700
committerMatei Zaharia <matei@databricks.com>2014-03-18 11:06:18 -0700
commit2fa26ec02fc2251102f89bb67523419fd7dd3757 (patch)
tree8963dc4db1a27e23279236abf0d971defa2e94d0 /core
parente7423d4040ebd1ec4105d8d4b9a4a6600b18c2ac (diff)
downloadspark-2fa26ec02fc2251102f89bb67523419fd7dd3757.tar.gz
spark-2fa26ec02fc2251102f89bb67523419fd7dd3757.tar.bz2
spark-2fa26ec02fc2251102f89bb67523419fd7dd3757.zip
SPARK-1102: Create a saveAsNewAPIHadoopDataset method
https://spark-project.atlassian.net/browse/SPARK-1102 Create a saveAsNewAPIHadoopDataset method By @mateiz: "Right now RDDs can only be saved as files using the new Hadoop API, not as "datasets" with no filename and just a JobConf. See http://codeforhire.com/2014/02/18/using-spark-with-mongodb/ for an example of how you have to give a bogus filename. For the old Hadoop API, we have saveAsHadoopDataset." Author: CodingCat <zhunansjtu@gmail.com> Closes #12 from CodingCat/SPARK-1102 and squashes the following commits: 6ba0c83 [CodingCat] add test cases for saveAsHadoopDataSet (new&old API) a8d11ba [CodingCat] style fix......... 95a6929 [CodingCat] code clean 7643c88 [CodingCat] change the parameter type back to Configuration a8583ee [CodingCat] Create a saveAsNewAPIHadoopDataset method
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala104
-rw-r--r--core/src/test/scala/org/apache/spark/FileSuite.scala39
3 files changed, 100 insertions, 53 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 0ff428c120..9596dbaf75 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -26,7 +26,7 @@ import com.google.common.base.Optional
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
-import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
+import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job}
import org.apache.spark.{HashPartitioner, Partitioner}
import org.apache.spark.Partitioner._
@@ -558,6 +558,14 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, conf)
}
+ /**
+ * Output the RDD to any Hadoop-supported storage system, using
+ * a Configuration object for that storage system.
+ */
+ def saveAsNewAPIHadoopDataset(conf: Configuration) {
+ rdd.saveAsNewAPIHadoopDataset(conf)
+ }
+
/** Output the RDD to any Hadoop-supported file system. */
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]](
path: String,
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index b0d322fe27..447deafff5 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -30,11 +30,11 @@ import scala.reflect.ClassTag
import com.clearspring.analytics.stream.cardinality.HyperLogLog
import org.apache.hadoop.conf.{Configurable, Configuration}
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
-import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job => NewAPIHadoopJob, RecordWriter => NewRecordWriter, JobContext, SparkHadoopMapReduceUtil}
+import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job => NewAPIHadoopJob, RecordWriter => NewRecordWriter, SparkHadoopMapReduceUtil}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
// SparkHadoopWriter and SparkHadoopMapReduceUtil are actually source files defined in Spark.
@@ -603,50 +603,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
val job = new NewAPIHadoopJob(conf)
job.setOutputKeyClass(keyClass)
job.setOutputValueClass(valueClass)
-
- val wrappedConf = new SerializableWritable(job.getConfiguration)
- val outpath = new Path(path)
- NewFileOutputFormat.setOutputPath(job, outpath)
- val jobFormat = outputFormatClass.newInstance
- jobFormat.checkOutputSpecs(job)
- val formatter = new SimpleDateFormat("yyyyMMddHHmm")
- val jobtrackerID = formatter.format(new Date())
- val stageId = self.id
- def writeShard(context: TaskContext, iter: Iterator[(K,V)]): Int = {
- // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
- // around by taking a mod. We expect that no task will be attempted 2 billion times.
- val attemptNumber = (context.attemptId % Int.MaxValue).toInt
- /* "reduce task" <split #> <attempt # = spark task #> */
- val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId,
- attemptNumber)
- val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
- val format = outputFormatClass.newInstance
- format match {
- case c: Configurable => c.setConf(wrappedConf.value)
- case _ => ()
- }
- val committer = format.getOutputCommitter(hadoopContext)
- committer.setupTask(hadoopContext)
- val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
- while (iter.hasNext) {
- val (k, v) = iter.next()
- writer.write(k, v)
- }
- writer.close(hadoopContext)
- committer.commitTask(hadoopContext)
- return 1
- }
-
- /* apparently we need a TaskAttemptID to construct an OutputCommitter;
- * however we're only going to use this local OutputCommitter for
- * setupJob/commitJob, so we just use a dummy "map" task.
- */
- val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0)
- val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
- val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
- jobCommitter.setupJob(jobTaskContext)
- self.context.runJob(self, writeShard _)
- jobCommitter.commitJob(jobTaskContext)
+ job.setOutputFormatClass(outputFormatClass)
+ job.getConfiguration.set("mapred.output.dir", path)
+ saveAsNewAPIHadoopDataset(job.getConfiguration)
}
/**
@@ -693,6 +652,59 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
}
/**
+ * Output the RDD to any Hadoop-supported storage system with new Hadoop API, using a Hadoop
+ * Configuration object for that storage system. The Conf should set an OutputFormat and any
+ * output paths required (e.g. a table name to write to) in the same way as it would be
+ * configured for a Hadoop MapReduce job.
+ */
+ def saveAsNewAPIHadoopDataset(conf: Configuration) {
+ val job = new NewAPIHadoopJob(conf)
+ val formatter = new SimpleDateFormat("yyyyMMddHHmm")
+ val jobtrackerID = formatter.format(new Date())
+ val stageId = self.id
+ val wrappedConf = new SerializableWritable(job.getConfiguration)
+ val outfmt = job.getOutputFormatClass
+ val jobFormat = outfmt.newInstance
+
+ if (jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) {
+ // FileOutputFormat ignores the filesystem parameter
+ jobFormat.checkOutputSpecs(job)
+ }
+
+ def writeShard(context: TaskContext, iter: Iterator[(K,V)]): Int = {
+ // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
+ // around by taking a mod. We expect that no task will be attempted 2 billion times.
+ val attemptNumber = (context.attemptId % Int.MaxValue).toInt
+ /* "reduce task" <split #> <attempt # = spark task #> */
+ val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId,
+ attemptNumber)
+ val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
+ val format = outfmt.newInstance
+ format match {
+ case c: Configurable => c.setConf(wrappedConf.value)
+ case _ => ()
+ }
+ val committer = format.getOutputCommitter(hadoopContext)
+ committer.setupTask(hadoopContext)
+ val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
+ while (iter.hasNext) {
+ val (k, v) = iter.next()
+ writer.write(k, v)
+ }
+ writer.close(hadoopContext)
+ committer.commitTask(hadoopContext)
+ return 1
+ }
+
+ val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0)
+ val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
+ val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
+ jobCommitter.setupJob(jobTaskContext)
+ self.context.runJob(self, writeShard _)
+ jobCommitter.commitJob(jobTaskContext)
+ }
+
+ /**
* Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for
* that storage system. The JobConf should set an OutputFormat and any output paths required
* (e.g. a table name to write to) in the same way as it would be configured for a Hadoop
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 76173608e9..01af940771 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -24,11 +24,12 @@ import scala.io.Source
import com.google.common.io.Files
import org.apache.hadoop.io._
import org.apache.hadoop.io.compress.DefaultCodec
-import org.apache.hadoop.mapred.FileAlreadyExistsException
+import org.apache.hadoop.mapred.{JobConf, FileAlreadyExistsException, TextOutputFormat}
+import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
+import org.apache.hadoop.mapreduce.Job
import org.scalatest.FunSuite
import org.apache.spark.SparkContext._
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
class FileSuite extends FunSuite with LocalSparkContext {
@@ -236,7 +237,7 @@ class FileSuite extends FunSuite with LocalSparkContext {
val tempdir = Files.createTempDir()
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
intercept[FileAlreadyExistsException] {
- randomRDD.saveAsNewAPIHadoopFile[TextOutputFormat[String, String]](tempdir.getPath)
+ randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempdir.getPath)
}
}
@@ -244,10 +245,36 @@ class FileSuite extends FunSuite with LocalSparkContext {
sc = new SparkContext("local", "test")
val tempdir = Files.createTempDir()
val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
- randomRDD.saveAsTextFile(tempdir.getPath + "/output")
- assert(new File(tempdir.getPath + "/output/part-00000").exists() === true)
+ randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempdir.getPath + "/output")
+ assert(new File(tempdir.getPath + "/output/part-r-00000").exists() === true)
intercept[FileAlreadyExistsException] {
- randomRDD.saveAsNewAPIHadoopFile[TextOutputFormat[String, String]](tempdir.getPath)
+ randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempdir.getPath)
}
}
+
+ test ("save Hadoop Dataset through old Hadoop API") {
+ sc = new SparkContext("local", "test")
+ val tempdir = Files.createTempDir()
+ val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
+ val job = new JobConf()
+ job.setOutputKeyClass(classOf[String])
+ job.setOutputValueClass(classOf[String])
+ job.set("mapred.output.format.class", classOf[TextOutputFormat[String, String]].getName)
+ job.set("mapred.output.dir", tempdir.getPath + "/outputDataset_old")
+ randomRDD.saveAsHadoopDataset(job)
+ assert(new File(tempdir.getPath + "/outputDataset_old/part-00000").exists() === true)
+ }
+
+ test ("save Hadoop Dataset through new Hadoop API") {
+ sc = new SparkContext("local", "test")
+ val tempdir = Files.createTempDir()
+ val randomRDD = sc.parallelize(Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
+ val job = new Job(sc.hadoopConfiguration)
+ job.setOutputKeyClass(classOf[String])
+ job.setOutputValueClass(classOf[String])
+ job.setOutputFormatClass(classOf[NewTextOutputFormat[String, String]])
+ job.getConfiguration.set("mapred.output.dir", tempdir.getPath + "/outputDataset_new")
+ randomRDD.saveAsNewAPIHadoopDataset(job.getConfiguration)
+ assert(new File(tempdir.getPath + "/outputDataset_new/part-r-00000").exists() === true)
+ }
}