diff options
author | Prashant Sharma <prashant.s@imaginea.com> | 2014-01-09 11:58:18 +0530 |
---|---|---|
committer | Prashant Sharma <prashant.s@imaginea.com> | 2014-01-09 12:26:30 +0530 |
commit | 59b03e015d581bbab74f1fe33a3ec1fd7840c3db (patch) | |
tree | 5ad775b8d56756f15743ef53d81ad8eb85312c39 | |
parent | 277b4a36c580e88d9ba60c5efe63753350197874 (diff) | |
download | spark-59b03e015d581bbab74f1fe33a3ec1fd7840c3db.tar.gz spark-59b03e015d581bbab74f1fe33a3ec1fd7840c3db.tar.bz2 spark-59b03e015d581bbab74f1fe33a3ec1fd7840c3db.zip |
Fixes corresponding to Reynolds feedback comments
4 files changed, 43 insertions, 32 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 97fec7f737..bceeaa0448 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -385,14 +385,14 @@ class SparkContext( * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minSplits) * }}} */ - def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, minSplits: Int, - cloneKeyValues: Boolean = true) (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F] - ): RDD[(K, V)] = { + def hadoopFile[K, V, F <: InputFormat[K, V]] + (path: String, minSplits: Int, cloneKeyValues: Boolean = true) + (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = { hadoopFile(path, - fm.runtimeClass.asInstanceOf[Class[F]], - km.runtimeClass.asInstanceOf[Class[K]], - vm.runtimeClass.asInstanceOf[Class[V]], - minSplits, + fm.runtimeClass.asInstanceOf[Class[F]], + km.runtimeClass.asInstanceOf[Class[K]], + vm.runtimeClass.asInstanceOf[Class[V]], + minSplits, cloneKeyValues = cloneKeyValues) } @@ -409,15 +409,15 @@ class SparkContext( hadoopFile[K, V, F](path, defaultMinSplits, cloneKeyValues) /** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */ - def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](path: String, - cloneKeyValues: Boolean = true) (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F] - ): RDD[(K, V)] = { + def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]] + (path: String, cloneKeyValues: Boolean = true) + (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = { newAPIHadoopFile( - path, - fm.runtimeClass.asInstanceOf[Class[F]], - km.runtimeClass.asInstanceOf[Class[K]], - vm.runtimeClass.asInstanceOf[Class[V]], - cloneKeyValues = cloneKeyValues) + path, + fm.runtimeClass.asInstanceOf[Class[F]], + km.runtimeClass.asInstanceOf[Class[K]], + vm.runtimeClass.asInstanceOf[Class[V]], + cloneKeyValues = cloneKeyValues) } /** diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 13949a1bdb..2da4611b9c 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -22,6 +22,7 @@ import java.io.EOFException import scala.reflect.ClassTag import org.apache.hadoop.conf.{Configuration, Configurable} +import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.InputFormat import org.apache.hadoop.mapred.InputSplit import org.apache.hadoop.mapred.JobConf @@ -91,7 +92,8 @@ class HadoopRDD[K: ClassTag, V: ClassTag]( inputFormatClass, keyClass, valueClass, - minSplits, cloneKeyValues) + minSplits, + cloneKeyValues) } protected val jobConfCacheKey = "rdd_%d_job_conf".format(id) @@ -162,10 +164,10 @@ class HadoopRDD[K: ClassTag, V: ClassTag]( // Register an on-task-completion callback to close the input stream. context.addOnCompleteCallback{ () => closeIfNeeded() } - val key: K = reader.createKey() + val keyCloneFunc = cloneWritables[K](getConf) val value: V = reader.createValue() - + val valueCloneFunc = cloneWritables[V](getConf) override def getNext() = { try { finished = !reader.next(key, value) @@ -174,7 +176,8 @@ class HadoopRDD[K: ClassTag, V: ClassTag]( finished = true } if (cloneKeyValues) { - (cloneWritables(key, getConf), cloneWritables(value, getConf)) + (keyCloneFunc(key.asInstanceOf[Writable]), + valueCloneFunc(value.asInstanceOf[Writable])) } else { (key, value) } diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 5428fc5691..e1f9995a9a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -92,7 +92,8 @@ class NewHadoopRDD[K: ClassTag, V: ClassTag]( // Register an on-task-completion callback to close the input stream. context.addOnCompleteCallback(() => close()) - + val keyCloneFunc = cloneWritables[K](conf) + val valueCloneFunc = cloneWritables[V](conf) var havePair = false var finished = false @@ -112,9 +113,11 @@ class NewHadoopRDD[K: ClassTag, V: ClassTag]( val key = reader.getCurrentKey val value = reader.getCurrentValue if (cloneKeyValues) { - (cloneWritables(key, conf), cloneWritables(value, conf)) - } else - (key, value) + (keyCloneFunc(key.asInstanceOf[Writable]), + valueCloneFunc(value.asInstanceOf[Writable])) + } else { + (key, value) + } } private def close() { diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 192806e178..23b72701c2 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -51,15 +51,20 @@ private[spark] object Utils extends Logging { * intention is to optimize, for example for NullWritable there is no need and for Long, int and * String creating a new object with value set would be faster. */ - def cloneWritables[T: ClassTag](obj: T, conf: Configuration): T = { - val cloned = classTag[T] match { - case ClassTag(_: Text) => new Text(obj.asInstanceOf[Text].getBytes) - case ClassTag(_: LongWritable) => new LongWritable(obj.asInstanceOf[LongWritable].get) - case ClassTag(_: IntWritable) => new IntWritable(obj.asInstanceOf[IntWritable].get) - case ClassTag(_: NullWritable) => obj // TODO: should we clone this ? - case _ => WritableUtils.clone(obj.asInstanceOf[Writable], conf) // slower way of cloning. - } - cloned.asInstanceOf[T] + def cloneWritables[T: ClassTag](conf: Configuration): Writable => T = { + val cloneFunc = classTag[T] match { + case ClassTag(_: Text) => + (w: Writable) => new Text(w.asInstanceOf[Text].getBytes).asInstanceOf[T] + case ClassTag(_: LongWritable) => + (w: Writable) => new LongWritable(w.asInstanceOf[LongWritable].get).asInstanceOf[T] + case ClassTag(_: IntWritable) => + (w: Writable) => new IntWritable(w.asInstanceOf[IntWritable].get).asInstanceOf[T] + case ClassTag(_: NullWritable) => + (w: Writable) => w.asInstanceOf[T] // TODO: should we clone this ? + case _ => + (w: Writable) => WritableUtils.clone(w, conf).asInstanceOf[T] // slower way of cloning. + } + cloneFunc } /** Serialize an object using Java serialization */ |