aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2011-07-06 20:37:34 -0400
committerMatei Zaharia <matei@eecs.berkeley.edu>2011-07-06 20:37:55 -0400
commit4b1646a25f7581cecae108553da13833e842e68a (patch)
treeaa28a73289db776e6e19d722ed04fee999414648 /core
parentb0ecf1ee41f2280a885a8005363e0d0b197e4d3b (diff)
downloadspark-4b1646a25f7581cecae108553da13833e842e68a.tar.gz
spark-4b1646a25f7581cecae108553da13833e842e68a.tar.bz2
spark-4b1646a25f7581cecae108553da13833e842e68a.zip
Support for non-filesystem-based Hadoop data sources
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/HadoopRDD.scala (renamed from core/src/main/scala/spark/HadoopFile.scala)28
-rw-r--r--core/src/main/scala/spark/SparkContext.scala28
2 files changed, 32 insertions, 24 deletions
diff --git a/core/src/main/scala/spark/HadoopFile.scala b/core/src/main/scala/spark/HadoopRDD.scala
index beb53ce1a5..b71c3aa522 100644
--- a/core/src/main/scala/spark/HadoopFile.scala
+++ b/core/src/main/scala/spark/HadoopRDD.scala
@@ -26,19 +26,19 @@ extends Split {
/**
- * An RDD that reads a Hadoop file (from HDFS, S3, the local filesystem, etc)
- * and represents it as a set of key-value pairs using a given InputFormat.
+ * An RDD that reads a Hadoop dataset as specified by a JobConf (e.g. files in
+ * HDFS, the local file system, or S3, tables in HBase, etc).
*/
-class HadoopFile[K, V](
+class HadoopRDD[K, V](
sc: SparkContext,
- path: String,
+ @transient conf: JobConf,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V])
extends RDD[(K, V)](sc) {
+ val serializableConf = new SerializableWritable(conf)
+
@transient val splits_ : Array[Split] = {
- val conf = new JobConf()
- FileInputFormat.setInputPaths(conf, path)
val inputFormat = createInputFormat(conf)
val inputSplits = inputFormat.getSplits(conf, sc.numCores)
val array = new Array[Split] (inputSplits.size)
@@ -67,9 +67,7 @@ extends RDD[(K, V)](sc) {
val split = theSplit.asInstanceOf[HadoopSplit]
var reader: RecordReader[K, V] = null
- val conf = new JobConf()
- val bufferSize = System.getProperty("spark.buffer.size", "65536")
- conf.set("io.file.buffer.size", bufferSize)
+ val conf = serializableConf.value
val fmt = createInputFormat(conf)
reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)
@@ -114,15 +112,3 @@ extends RDD[(K, V)](sc) {
override val dependencies: List[Dependency[_]] = Nil
}
-
-
-/**
- * Convenience class for Hadoop files read using TextInputFormat that
- * represents the file as an RDD of Strings.
- */
-class HadoopTextFile(sc: SparkContext, path: String)
-extends MappedRDD[String, (LongWritable, Text)](
- new HadoopFile(sc, path, classOf[TextInputFormat],
- classOf[LongWritable], classOf[Text]),
- { pair: (LongWritable, Text) => pair._2.toString }
-)
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 5f12b247a7..d5479a85d5 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -17,6 +17,9 @@ import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.ArrayWritable
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.io.Text
+import org.apache.hadoop.mapred.FileInputFormat
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapred.TextInputFormat
import spark.broadcast._
@@ -72,8 +75,23 @@ extends Logging {
def makeRDD[T: ClassManifest](seq: Seq[T]): RDD[T] =
parallelize(seq, numCores)
- def textFile(path: String): RDD[String] =
- new HadoopTextFile(this, path)
+ def textFile(path: String): RDD[String] = {
+ hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text])
+ .map(pair => pair._2.toString)
+ }
+
+ /**
+ * Get an RDD for a Hadoop-readable dataset from a Hadooop JobConf giving
+ * its InputFormat and any other necessary info (e.g. file name for a
+ * filesystem-based dataset, table name for HyperTable, etc).
+ */
+ def hadoopRDD[K, V](conf: JobConf,
+ inputFormatClass: Class[_ <: InputFormat[K, V]],
+ keyClass: Class[K],
+ valueClass: Class[V])
+ : RDD[(K, V)] = {
+ new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass)
+ }
/** Get an RDD for a Hadoop file with an arbitrary InputFormat */
def hadoopFile[K, V](path: String,
@@ -81,7 +99,11 @@ extends Logging {
keyClass: Class[K],
valueClass: Class[V])
: RDD[(K, V)] = {
- new HadoopFile(this, path, inputFormatClass, keyClass, valueClass)
+ val conf = new JobConf()
+ FileInputFormat.setInputPaths(conf, path)
+ val bufferSize = System.getProperty("spark.buffer.size", "65536")
+ conf.set("io.file.buffer.size", bufferSize)
+ new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass)
}
/**