aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-07-10 11:11:35 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-07-11 17:31:33 -0700
commite8ae77df2458a80e549c4e227b1bc509dc4ea2ce (patch)
tree990f8511e90137d95e930f508fdc7db9d29cc686 /core/src
parent0a472840030e4e7e84fe748f7bfa49f1ece599c5 (diff)
downloadspark-e8ae77df2458a80e549c4e227b1bc509dc4ea2ce.tar.gz
spark-e8ae77df2458a80e549c4e227b1bc509dc4ea2ce.tar.bz2
spark-e8ae77df2458a80e549c4e227b1bc509dc4ea2ce.zip
Added more methods for loading/saving with new Hadoop API
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/spark/PairRDDFunctions.scala14
-rw-r--r--core/src/main/scala/spark/SparkContext.scala20
2 files changed, 30 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index ea24c7897d..843badd9d1 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -13,6 +13,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.NullWritable
@@ -297,8 +298,9 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
path: String,
keyClass: Class[_],
valueClass: Class[_],
- outputFormatClass: Class[_ <: NewOutputFormat[_, _]]) {
- val job = new NewAPIHadoopJob
+ outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
+ conf: Configuration) {
+ val job = new NewAPIHadoopJob(conf)
job.setOutputKeyClass(keyClass)
job.setOutputValueClass(valueClass)
val wrappedConf = new SerializableWritable(job.getConfiguration)
@@ -339,6 +341,14 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
jobCommitter.cleanupJob(jobTaskContext)
}
+ def saveAsNewAPIHadoopFile(
+ path: String,
+ keyClass: Class[_],
+ valueClass: Class[_],
+ outputFormatClass: Class[_ <: NewOutputFormat[_, _]]) {
+ saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, new Configuration)
+ }
+
def saveAsHadoopFile(
path: String,
keyClass: Class[_],
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 4a6abf20b0..3d3fda1e47 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -202,8 +202,24 @@ class SparkContext(
fClass: Class[F],
kClass: Class[K],
vClass: Class[V],
- conf: Configuration
- ): RDD[(K, V)] = new NewHadoopRDD(this, fClass, kClass, vClass, conf)
+ conf: Configuration): RDD[(K, V)] = {
+ val job = new NewHadoopJob(conf)
+ NewFileInputFormat.addInputPath(job, new Path(path))
+ val updatedConf = job.getConfiguration
+ new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf)
+ }
+
+ /**
+ * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
+ * and extra configuration options to pass to the input format.
+ */
+ def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
+ conf: Configuration,
+ fClass: Class[F],
+ kClass: Class[K],
+ vClass: Class[V]): RDD[(K, V)] = {
+ new NewHadoopRDD(this, fClass, kClass, vClass, conf)
+ }
/** Get an RDD for a Hadoop SequenceFile with given key and value types */
def sequenceFile[K, V](path: String,