aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2014-01-09 11:58:18 +0530
committerPrashant Sharma <prashant.s@imaginea.com>2014-01-09 12:26:30 +0530
commit59b03e015d581bbab74f1fe33a3ec1fd7840c3db (patch)
tree5ad775b8d56756f15743ef53d81ad8eb85312c39
parent277b4a36c580e88d9ba60c5efe63753350197874 (diff)
downloadspark-59b03e015d581bbab74f1fe33a3ec1fd7840c3db.tar.gz
spark-59b03e015d581bbab74f1fe33a3ec1fd7840c3db.tar.bz2
spark-59b03e015d581bbab74f1fe33a3ec1fd7840c3db.zip
Fixes corresponding to Reynolds feedback comments
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala30
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala23
4 files changed, 43 insertions, 32 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 97fec7f737..bceeaa0448 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -385,14 +385,14 @@ class SparkContext(
* val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minSplits)
* }}}
*/
- def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, minSplits: Int,
- cloneKeyValues: Boolean = true) (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]
- ): RDD[(K, V)] = {
+ def hadoopFile[K, V, F <: InputFormat[K, V]]
+ (path: String, minSplits: Int, cloneKeyValues: Boolean = true)
+ (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
hadoopFile(path,
- fm.runtimeClass.asInstanceOf[Class[F]],
- km.runtimeClass.asInstanceOf[Class[K]],
- vm.runtimeClass.asInstanceOf[Class[V]],
- minSplits,
+ fm.runtimeClass.asInstanceOf[Class[F]],
+ km.runtimeClass.asInstanceOf[Class[K]],
+ vm.runtimeClass.asInstanceOf[Class[V]],
+ minSplits,
cloneKeyValues = cloneKeyValues)
}
@@ -409,15 +409,15 @@ class SparkContext(
hadoopFile[K, V, F](path, defaultMinSplits, cloneKeyValues)
/** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
- def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](path: String,
- cloneKeyValues: Boolean = true) (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]
- ): RDD[(K, V)] = {
+ def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]
+ (path: String, cloneKeyValues: Boolean = true)
+ (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
newAPIHadoopFile(
- path,
- fm.runtimeClass.asInstanceOf[Class[F]],
- km.runtimeClass.asInstanceOf[Class[K]],
- vm.runtimeClass.asInstanceOf[Class[V]],
- cloneKeyValues = cloneKeyValues)
+ path,
+ fm.runtimeClass.asInstanceOf[Class[F]],
+ km.runtimeClass.asInstanceOf[Class[K]],
+ vm.runtimeClass.asInstanceOf[Class[V]],
+ cloneKeyValues = cloneKeyValues)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 13949a1bdb..2da4611b9c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -22,6 +22,7 @@ import java.io.EOFException
import scala.reflect.ClassTag
import org.apache.hadoop.conf.{Configuration, Configurable}
+import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.InputFormat
import org.apache.hadoop.mapred.InputSplit
import org.apache.hadoop.mapred.JobConf
@@ -91,7 +92,8 @@ class HadoopRDD[K: ClassTag, V: ClassTag](
inputFormatClass,
keyClass,
valueClass,
- minSplits, cloneKeyValues)
+ minSplits,
+ cloneKeyValues)
}
protected val jobConfCacheKey = "rdd_%d_job_conf".format(id)
@@ -162,10 +164,10 @@ class HadoopRDD[K: ClassTag, V: ClassTag](
// Register an on-task-completion callback to close the input stream.
context.addOnCompleteCallback{ () => closeIfNeeded() }
-
val key: K = reader.createKey()
+ val keyCloneFunc = cloneWritables[K](getConf)
val value: V = reader.createValue()
-
+ val valueCloneFunc = cloneWritables[V](getConf)
override def getNext() = {
try {
finished = !reader.next(key, value)
@@ -174,7 +176,8 @@ class HadoopRDD[K: ClassTag, V: ClassTag](
finished = true
}
if (cloneKeyValues) {
- (cloneWritables(key, getConf), cloneWritables(value, getConf))
+ (keyCloneFunc(key.asInstanceOf[Writable]),
+ valueCloneFunc(value.asInstanceOf[Writable]))
} else {
(key, value)
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 5428fc5691..e1f9995a9a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -92,7 +92,8 @@ class NewHadoopRDD[K: ClassTag, V: ClassTag](
// Register an on-task-completion callback to close the input stream.
context.addOnCompleteCallback(() => close())
-
+ val keyCloneFunc = cloneWritables[K](conf)
+ val valueCloneFunc = cloneWritables[V](conf)
var havePair = false
var finished = false
@@ -112,9 +113,11 @@ class NewHadoopRDD[K: ClassTag, V: ClassTag](
val key = reader.getCurrentKey
val value = reader.getCurrentValue
if (cloneKeyValues) {
- (cloneWritables(key, conf), cloneWritables(value, conf))
- } else
- (key, value)
+ (keyCloneFunc(key.asInstanceOf[Writable]),
+ valueCloneFunc(value.asInstanceOf[Writable]))
+ } else {
+ (key, value)
+ }
}
private def close() {
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 192806e178..23b72701c2 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -51,15 +51,20 @@ private[spark] object Utils extends Logging {
* intention is to optimize, for example for NullWritable there is no need and for Long, int and
* String creating a new object with value set would be faster.
*/
- def cloneWritables[T: ClassTag](obj: T, conf: Configuration): T = {
- val cloned = classTag[T] match {
- case ClassTag(_: Text) => new Text(obj.asInstanceOf[Text].getBytes)
- case ClassTag(_: LongWritable) => new LongWritable(obj.asInstanceOf[LongWritable].get)
- case ClassTag(_: IntWritable) => new IntWritable(obj.asInstanceOf[IntWritable].get)
- case ClassTag(_: NullWritable) => obj // TODO: should we clone this ?
- case _ => WritableUtils.clone(obj.asInstanceOf[Writable], conf) // slower way of cloning.
- }
- cloned.asInstanceOf[T]
+ def cloneWritables[T: ClassTag](conf: Configuration): Writable => T = {
+ val cloneFunc = classTag[T] match {
+ case ClassTag(_: Text) =>
+ (w: Writable) => new Text(w.asInstanceOf[Text].getBytes).asInstanceOf[T]
+ case ClassTag(_: LongWritable) =>
+ (w: Writable) => new LongWritable(w.asInstanceOf[LongWritable].get).asInstanceOf[T]
+ case ClassTag(_: IntWritable) =>
+ (w: Writable) => new IntWritable(w.asInstanceOf[IntWritable].get).asInstanceOf[T]
+ case ClassTag(_: NullWritable) =>
+ (w: Writable) => w.asInstanceOf[T] // TODO: should we clone this ?
+ case _ =>
+ (w: Writable) => WritableUtils.clone(w, conf).asInstanceOf[T] // slower way of cloning.
+ }
+ cloneFunc
}
/** Serialize an object using Java serialization */