diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala | 15 |
1 files changed, 12 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 2662d48c84..5428fc5691 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -20,11 +20,14 @@ package org.apache.spark.rdd import java.text.SimpleDateFormat import java.util.Date +import scala.reflect.ClassTag + import org.apache.hadoop.conf.{Configurable, Configuration} import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce._ import org.apache.spark.{InterruptibleIterator, Logging, Partition, SerializableWritable, SparkContext, TaskContext} +import org.apache.spark.util.Utils.cloneWritables private[spark] @@ -36,12 +39,13 @@ class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputS override def hashCode(): Int = (41 * (41 + rddId) + index) } -class NewHadoopRDD[K, V]( +class NewHadoopRDD[K: ClassTag, V: ClassTag]( sc : SparkContext, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], - @transient conf: Configuration) + @transient conf: Configuration, + cloneKeyValues: Boolean) extends RDD[(K, V)](sc, Nil) with SparkHadoopMapReduceUtil with Logging { @@ -105,7 +109,12 @@ class NewHadoopRDD[K, V]( throw new java.util.NoSuchElementException("End of stream") } havePair = false - (reader.getCurrentKey, reader.getCurrentValue) + val key = reader.getCurrentKey + val value = reader.getCurrentValue + if (cloneKeyValues) { + (cloneWritables(key, conf), cloneWritables(value, conf)) + } else + (key, value) } private def close() { |