diff options
author | Prashant Sharma <prashant.s@imaginea.com> | 2014-01-08 16:32:55 +0530 |
---|---|---|
committer | Prashant Sharma <prashant.s@imaginea.com> | 2014-01-08 16:32:55 +0530 |
commit | 277b4a36c580e88d9ba60c5efe63753350197874 (patch) | |
tree | 9f049d4cfc50302aaeab06cde5ddbe0c6f7d1ec3 | |
parent | c0f0155eca6405d0768a476f0be00594e478fce0 (diff) | |
download | spark-277b4a36c580e88d9ba60c5efe63753350197874.tar.gz spark-277b4a36c580e88d9ba60c5efe63753350197874.tar.bz2 spark-277b4a36c580e88d9ba60c5efe63753350197874.zip |
we clone hadoop key and values by default and reuse if specified.
4 files changed, 87 insertions, 41 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 0e47f4e442..97fec7f737 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -341,25 +341,27 @@ class SparkContext( * other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable, * etc). */ - def hadoopRDD[K, V]( + def hadoopRDD[K: ClassTag, V: ClassTag]( conf: JobConf, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], - minSplits: Int = defaultMinSplits + minSplits: Int = defaultMinSplits, + cloneKeyValues: Boolean = true ): RDD[(K, V)] = { // Add necessary security credentials to the JobConf before broadcasting it. SparkHadoopUtil.get.addCredentials(conf) - new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits) + new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits, cloneKeyValues) } /** Get an RDD for a Hadoop file with an arbitrary InputFormat */ - def hadoopFile[K, V]( + def hadoopFile[K: ClassTag, V: ClassTag]( path: String, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], - minSplits: Int = defaultMinSplits + minSplits: Int = defaultMinSplits, + cloneKeyValues: Boolean = true ): RDD[(K, V)] = { // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration)) @@ -371,7 +373,8 @@ class SparkContext( inputFormatClass, keyClass, valueClass, - minSplits) + minSplits, + cloneKeyValues) } /** @@ -382,14 +385,15 @@ class SparkContext( * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minSplits) * }}} */ - def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, minSplits: Int) - (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]) - : RDD[(K, V)] = { + def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, minSplits: Int, + cloneKeyValues: Boolean = true) (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F] + ): RDD[(K, V)] = { hadoopFile(path, fm.runtimeClass.asInstanceOf[Class[F]], km.runtimeClass.asInstanceOf[Class[K]], vm.runtimeClass.asInstanceOf[Class[V]], - minSplits) + minSplits, + cloneKeyValues = cloneKeyValues) } /** @@ -400,61 +404,67 @@ class SparkContext( * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path) * }}} */ - def hadoopFile[K, V, F <: InputFormat[K, V]](path: String) + def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, cloneKeyValues: Boolean = true) (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = - hadoopFile[K, V, F](path, defaultMinSplits) + hadoopFile[K, V, F](path, defaultMinSplits, cloneKeyValues) /** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */ - def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](path: String) - (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = { + def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](path: String, + cloneKeyValues: Boolean = true) (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F] + ): RDD[(K, V)] = { newAPIHadoopFile( path, fm.runtimeClass.asInstanceOf[Class[F]], km.runtimeClass.asInstanceOf[Class[K]], - vm.runtimeClass.asInstanceOf[Class[V]]) + vm.runtimeClass.asInstanceOf[Class[V]], + cloneKeyValues = cloneKeyValues) } /** * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat * and extra configuration options to pass to the input format. */ - def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]( + def newAPIHadoopFile[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]]( path: String, fClass: Class[F], kClass: Class[K], vClass: Class[V], - conf: Configuration = hadoopConfiguration): RDD[(K, V)] = { + conf: Configuration = hadoopConfiguration, + cloneKeyValues: Boolean = true): RDD[(K, V)] = { val job = new NewHadoopJob(conf) NewFileInputFormat.addInputPath(job, new Path(path)) val updatedConf = job.getConfiguration - new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf) + new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf, cloneKeyValues) } /** * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat * and extra configuration options to pass to the input format. */ - def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]]( + def newAPIHadoopRDD[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]]( conf: Configuration = hadoopConfiguration, fClass: Class[F], kClass: Class[K], - vClass: Class[V]): RDD[(K, V)] = { - new NewHadoopRDD(this, fClass, kClass, vClass, conf) + vClass: Class[V], + cloneKeyValues: Boolean = true): RDD[(K, V)] = { + new NewHadoopRDD(this, fClass, kClass, vClass, conf, cloneKeyValues) } /** Get an RDD for a Hadoop SequenceFile with given key and value types. */ - def sequenceFile[K, V](path: String, + def sequenceFile[K: ClassTag, V: ClassTag](path: String, keyClass: Class[K], valueClass: Class[V], - minSplits: Int + minSplits: Int, + cloneKeyValues: Boolean = true ): RDD[(K, V)] = { val inputFormatClass = classOf[SequenceFileInputFormat[K, V]] - hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits) + hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits, cloneKeyValues) } /** Get an RDD for a Hadoop SequenceFile with given key and value types. */ - def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]): RDD[(K, V)] = - sequenceFile(path, keyClass, valueClass, defaultMinSplits) + def sequenceFile[K: ClassTag, V: ClassTag](path: String, keyClass: Class[K], valueClass: Class[V], + cloneKeyValues: Boolean = true): RDD[(K, V)] = + sequenceFile(path, keyClass, valueClass, defaultMinSplits, cloneKeyValues) /** * Version of sequenceFile() for types implicitly convertible to Writables through a @@ -472,8 +482,8 @@ class SparkContext( * for the appropriate type. In addition, we pass the converter a ClassTag of its type to * allow it to figure out the Writable class to use in the subclass case. */ - def sequenceFile[K, V](path: String, minSplits: Int = defaultMinSplits) - (implicit km: ClassTag[K], vm: ClassTag[V], + def sequenceFile[K, V](path: String, minSplits: Int = defaultMinSplits, + cloneKeyValues: Boolean = true) (implicit km: ClassTag[K], vm: ClassTag[V], kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]) : RDD[(K, V)] = { val kc = kcf() @@ -481,7 +491,7 @@ class SparkContext( val format = classOf[SequenceFileInputFormat[Writable, Writable]] val writables = hadoopFile(path, format, kc.writableClass(km).asInstanceOf[Class[Writable]], - vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits) + vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits, cloneKeyValues) writables.map{case (k,v) => (kc.convert(k), vc.convert(v))} } diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 53f77a38f5..13949a1bdb 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -19,7 +19,9 @@ package org.apache.spark.rdd import java.io.EOFException -import org.apache.hadoop.mapred.FileInputFormat +import scala.reflect.ClassTag + +import org.apache.hadoop.conf.{Configuration, Configurable} import org.apache.hadoop.mapred.InputFormat import org.apache.hadoop.mapred.InputSplit import org.apache.hadoop.mapred.JobConf @@ -31,7 +33,7 @@ import org.apache.spark._ import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.util.NextIterator -import org.apache.hadoop.conf.{Configuration, Configurable} +import org.apache.spark.util.Utils.cloneWritables /** @@ -62,14 +64,15 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp * @param valueClass Class of the value associated with the inputFormatClass. * @param minSplits Minimum number of Hadoop Splits (HadoopRDD partitions) to generate. */ -class HadoopRDD[K, V]( +class HadoopRDD[K: ClassTag, V: ClassTag]( sc: SparkContext, broadcastedConf: Broadcast[SerializableWritable[Configuration]], initLocalJobConfFuncOpt: Option[JobConf => Unit], inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], - minSplits: Int) + minSplits: Int, + cloneKeyValues: Boolean) extends RDD[(K, V)](sc, Nil) with Logging { def this( @@ -78,7 +81,8 @@ class HadoopRDD[K, V]( inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], - minSplits: Int) = { + minSplits: Int, + cloneKeyValues: Boolean) = { this( sc, sc.broadcast(new SerializableWritable(conf)) @@ -87,7 +91,7 @@ class HadoopRDD[K, V]( inputFormatClass, keyClass, valueClass, - minSplits) + minSplits, cloneKeyValues) } protected val jobConfCacheKey = "rdd_%d_job_conf".format(id) @@ -169,7 +173,11 @@ class HadoopRDD[K, V]( case eof: EOFException => finished = true } - (key, value) + if (cloneKeyValues) { + (cloneWritables(key, getConf), cloneWritables(value, getConf)) + } else { + (key, value) + } } override def close() { diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 2662d48c84..5428fc5691 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -20,11 +20,14 @@ package org.apache.spark.rdd import java.text.SimpleDateFormat import java.util.Date +import scala.reflect.ClassTag + import org.apache.hadoop.conf.{Configurable, Configuration} import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce._ import org.apache.spark.{InterruptibleIterator, Logging, Partition, SerializableWritable, SparkContext, TaskContext} +import org.apache.spark.util.Utils.cloneWritables private[spark] @@ -36,12 +39,13 @@ class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputS override def hashCode(): Int = (41 * (41 + rddId) + index) } -class NewHadoopRDD[K, V]( +class NewHadoopRDD[K: ClassTag, V: ClassTag]( sc : SparkContext, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], - @transient conf: Configuration) + @transient conf: Configuration, + cloneKeyValues: Boolean) extends RDD[(K, V)](sc, Nil) with SparkHadoopMapReduceUtil with Logging { @@ -105,7 +109,12 @@ class NewHadoopRDD[K, V]( throw new java.util.NoSuchElementException("End of stream") } havePair = false - (reader.getCurrentKey, reader.getCurrentValue) + val key = reader.getCurrentKey + val value = reader.getCurrentValue + if (cloneKeyValues) { + (cloneWritables(key, conf), cloneWritables(value, conf)) + } else + (key, value) } private def close() { diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 5f1253100b..192806e178 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -26,23 +26,42 @@ import scala.collection.JavaConversions._ import scala.collection.Map import scala.collection.mutable.ArrayBuffer import scala.io.Source -import scala.reflect.ClassTag +import scala.reflect.{classTag, ClassTag} import com.google.common.io.Files import com.google.common.util.concurrent.ThreadFactoryBuilder +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, FileSystem, FileUtil} +import org.apache.hadoop.io._ import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance} import org.apache.spark.deploy.SparkHadoopUtil import java.nio.ByteBuffer -import org.apache.spark.{SparkConf, SparkContext, SparkException, Logging} +import org.apache.spark.{SparkConf, SparkException, Logging} /** * Various utility methods used by Spark. */ private[spark] object Utils extends Logging { + + /** + * We try to clone for most common types of writables and we call WritableUtils.clone otherwise + * intention is to optimize, for example for NullWritable there is no need and for Long, int and + * String creating a new object with value set would be faster. + */ + def cloneWritables[T: ClassTag](obj: T, conf: Configuration): T = { + val cloned = classTag[T] match { + case ClassTag(_: Text) => new Text(obj.asInstanceOf[Text].getBytes) + case ClassTag(_: LongWritable) => new LongWritable(obj.asInstanceOf[LongWritable].get) + case ClassTag(_: IntWritable) => new IntWritable(obj.asInstanceOf[IntWritable].get) + case ClassTag(_: NullWritable) => obj // TODO: should we clone this ? + case _ => WritableUtils.clone(obj.asInstanceOf[Writable], conf) // slower way of cloning. + } + cloned.asInstanceOf[T] + } + /** Serialize an object using Java serialization */ def serialize[T](o: T): Array[Byte] = { val bos = new ByteArrayOutputStream() |