aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala15
1 files changed, 12 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 2662d48c84..5428fc5691 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -20,11 +20,14 @@ package org.apache.spark.rdd
import java.text.SimpleDateFormat
import java.util.Date
+import scala.reflect.ClassTag
+
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
import org.apache.spark.{InterruptibleIterator, Logging, Partition, SerializableWritable, SparkContext, TaskContext}
+import org.apache.spark.util.Utils.cloneWritables
private[spark]
@@ -36,12 +39,13 @@ class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputS
override def hashCode(): Int = (41 * (41 + rddId) + index)
}
-class NewHadoopRDD[K, V](
+class NewHadoopRDD[K: ClassTag, V: ClassTag](
sc : SparkContext,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
- @transient conf: Configuration)
+ @transient conf: Configuration,
+ cloneKeyValues: Boolean)
extends RDD[(K, V)](sc, Nil)
with SparkHadoopMapReduceUtil
with Logging {
@@ -105,7 +109,12 @@ class NewHadoopRDD[K, V](
throw new java.util.NoSuchElementException("End of stream")
}
havePair = false
- (reader.getCurrentKey, reader.getCurrentValue)
+ val key = reader.getCurrentKey
+ val value = reader.getCurrentValue
+ if (cloneKeyValues) {
+ (cloneWritables(key, conf), cloneWritables(value, conf))
+ } else
+ (key, value)
}
private def close() {