diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-07-10 11:11:35 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-07-11 17:31:33 -0700 |
commit | e8ae77df2458a80e549c4e227b1bc509dc4ea2ce (patch) | |
tree | 990f8511e90137d95e930f508fdc7db9d29cc686 /core/src | |
parent | 0a472840030e4e7e84fe748f7bfa49f1ece599c5 (diff) | |
download | spark-e8ae77df2458a80e549c4e227b1bc509dc4ea2ce.tar.gz spark-e8ae77df2458a80e549c4e227b1bc509dc4ea2ce.tar.bz2 spark-e8ae77df2458a80e549c4e227b1bc509dc4ea2ce.zip |
Added more methods for loading/saving with new Hadoop API
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/spark/PairRDDFunctions.scala | 14 | ||||
-rw-r--r-- | core/src/main/scala/spark/SparkContext.scala | 20 |
2 files changed, 30 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index ea24c7897d..843badd9d1 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -13,6 +13,7 @@ import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap import scala.collection.JavaConversions._ +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.BytesWritable import org.apache.hadoop.io.NullWritable @@ -297,8 +298,9 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( path: String, keyClass: Class[_], valueClass: Class[_], - outputFormatClass: Class[_ <: NewOutputFormat[_, _]]) { - val job = new NewAPIHadoopJob + outputFormatClass: Class[_ <: NewOutputFormat[_, _]], + conf: Configuration) { + val job = new NewAPIHadoopJob(conf) job.setOutputKeyClass(keyClass) job.setOutputValueClass(valueClass) val wrappedConf = new SerializableWritable(job.getConfiguration) @@ -339,6 +341,14 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( jobCommitter.cleanupJob(jobTaskContext) } + def saveAsNewAPIHadoopFile( + path: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: NewOutputFormat[_, _]]) { + saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, new Configuration) + } + def saveAsHadoopFile( path: String, keyClass: Class[_], diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 4a6abf20b0..3d3fda1e47 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -202,8 +202,24 @@ class SparkContext( fClass: Class[F], kClass: Class[K], vClass: Class[V], - conf: Configuration - ): RDD[(K, V)] = new NewHadoopRDD(this, fClass, kClass, vClass, conf) + conf: Configuration): RDD[(K, V)] = { + val job = new NewHadoopJob(conf) + NewFileInputFormat.addInputPath(job, new Path(path)) + val updatedConf = job.getConfiguration + new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf) + } + + /** + * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat + * and extra configuration options to pass to the input format. + */ + def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]]( + conf: Configuration, + fClass: Class[F], + kClass: Class[K], + vClass: Class[V]): RDD[(K, V)] = { + new NewHadoopRDD(this, fClass, kClass, vClass, conf) + } /** Get an RDD for a Hadoop SequenceFile with given key and value types */ def sequenceFile[K, V](path: String, |