aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-01-11 12:35:10 -0800
committerReynold Xin <rxin@apache.org>2014-01-11 12:35:10 -0800
commitb0fbfccadc2bb308c3cbcd5a55157e63cc8916f6 (patch)
tree8ced57d9513a02d1c7b5ba4f31eeecb8ebe3abb2
parentee6e7f9b8cc56985787546882fba291cf9ad7667 (diff)
downloadspark-b0fbfccadc2bb308c3cbcd5a55157e63cc8916f6.tar.gz
spark-b0fbfccadc2bb308c3cbcd5a55157e63cc8916f6.tar.bz2
spark-b0fbfccadc2bb308c3cbcd5a55157e63cc8916f6.zip
Minor update for clone writables and more documentation.
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala20
3 files changed, 41 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index d7e681d921..16564615a9 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -345,9 +345,20 @@ class SparkContext(
}
/**
- * Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and any
- * other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
- * etc).
+ * Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and other
+ * necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable),
+ * using the older MapReduce API (`org.apache.hadoop.mapred`).
+ *
+ * @param conf JobConf for setting up the dataset
+ * @param inputFormatClass Class of the [[InputFormat]]
+ * @param keyClass Class of the keys
+ * @param valueClass Class of the values
+ * @param minSplits Minimum number of Hadoop Splits to generate.
+ * @param cloneKeyValues If true, explicitly clone the records produced by Hadoop RecordReader.
+ * Most RecordReader implementations reuse wrapper objects across multiple
+ * records, and can cause problems in RDD collect or aggregation operations.
+ * By default the records are cloned in Spark. However, application
+ * programmers can explicitly disable the cloning for better performance.
*/
def hadoopRDD[K: ClassTag, V: ClassTag](
conf: JobConf,
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 2da4611b9c..9a220d1147 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -45,14 +45,14 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
val inputSplit = new SerializableWritable[InputSplit](s)
- override def hashCode(): Int = (41 * (41 + rddId) + idx).toInt
+ override def hashCode(): Int = 41 * (41 + rddId) + idx
override val index: Int = idx
}
/**
* An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS,
- * sources in HBase, or S3).
+ * sources in HBase, or S3), using the older MapReduce API (`org.apache.hadoop.mapred`).
*
* @param sc The SparkContext to associate the RDD with.
* @param broadcastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed
@@ -64,6 +64,11 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
* @param keyClass Class of the key associated with the inputFormatClass.
* @param valueClass Class of the value associated with the inputFormatClass.
* @param minSplits Minimum number of Hadoop Splits (HadoopRDD partitions) to generate.
+ * @param cloneKeyValues If true, explicitly clone the records produced by Hadoop RecordReader.
+ * Most RecordReader implementations reuse wrapper objects across multiple
+ * records, and can cause problems in RDD collect or aggregation operations.
+ * By default the records are cloned in Spark. However, application
+ * programmers can explicitly disable the cloning for better performance.
*/
class HadoopRDD[K: ClassTag, V: ClassTag](
sc: SparkContext,
@@ -165,9 +170,9 @@ class HadoopRDD[K: ClassTag, V: ClassTag](
// Register an on-task-completion callback to close the input stream.
context.addOnCompleteCallback{ () => closeIfNeeded() }
val key: K = reader.createKey()
- val keyCloneFunc = cloneWritables[K](getConf)
+ val keyCloneFunc = cloneWritables[K](jobConf)
val value: V = reader.createValue()
- val valueCloneFunc = cloneWritables[V](getConf)
+ val valueCloneFunc = cloneWritables[V](jobConf)
override def getNext() = {
try {
finished = !reader.next(key, value)
@@ -176,8 +181,7 @@ class HadoopRDD[K: ClassTag, V: ClassTag](
finished = true
}
if (cloneKeyValues) {
- (keyCloneFunc(key.asInstanceOf[Writable]),
- valueCloneFunc(value.asInstanceOf[Writable]))
+ (keyCloneFunc(key.asInstanceOf[Writable]), valueCloneFunc(value.asInstanceOf[Writable]))
} else {
(key, value)
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index a34786495b..2f2d01115c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -36,9 +36,24 @@ class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputS
val serializableHadoopSplit = new SerializableWritable(rawSplit)
- override def hashCode(): Int = (41 * (41 + rddId) + index)
+ override def hashCode(): Int = 41 * (41 + rddId) + index
}
+/**
+ * An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS,
+ * sources in HBase, or S3), using the new MapReduce API (`org.apache.hadoop.mapreduce`).
+ *
+ * @param sc The SparkContext to associate the RDD with.
+ * @param inputFormatClass Storage format of the data to be read.
+ * @param keyClass Class of the key associated with the inputFormatClass.
+ * @param valueClass Class of the value associated with the inputFormatClass.
+ * @param conf The Hadoop configuration.
+ * @param cloneKeyValues If true, explicitly clone the records produced by Hadoop RecordReader.
+ * Most RecordReader implementations reuse wrapper objects across multiple
+ * records, and can cause problems in RDD collect or aggregation operations.
+ * By default the records are cloned in Spark. However, application
+ * programmers can explicitly disable the cloning for better performance.
+ */
class NewHadoopRDD[K: ClassTag, V: ClassTag](
sc : SparkContext,
inputFormatClass: Class[_ <: InputFormat[K, V]],
@@ -113,8 +128,7 @@ class NewHadoopRDD[K: ClassTag, V: ClassTag](
val key = reader.getCurrentKey
val value = reader.getCurrentValue
if (cloneKeyValues) {
- (keyCloneFunc(key.asInstanceOf[Writable]),
- valueCloneFunc(value.asInstanceOf[Writable]))
+ (keyCloneFunc(key.asInstanceOf[Writable]), valueCloneFunc(value.asInstanceOf[Writable]))
} else {
(key, value)
}