aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-01-11 12:07:55 -0800
committerReynold Xin <rxin@apache.org>2014-01-11 12:07:55 -0800
commitee6e7f9b8cc56985787546882fba291cf9ad7667 (patch)
tree0cf54c5a30c84c974b5fba839a9ec6cd0bc07f68
parent4216178d5e81fad911b69e75f5a272e63d3d208a (diff)
parent59b03e015d581bbab74f1fe33a3ec1fd7840c3db (diff)
downloadspark-ee6e7f9b8cc56985787546882fba291cf9ad7667.tar.gz
spark-ee6e7f9b8cc56985787546882fba291cf9ad7667.tar.bz2
spark-ee6e7f9b8cc56985787546882fba291cf9ad7667.zip
Merge pull request #359 from ScrapCodes/clone-writables
We clone hadoop key and values by default and reuse objects if asked to. We try to clone for most common types of writables and we call WritableUtils.clone otherwise intention is to optimize, for example for NullWritable there is no need and for Long, int and String creating a new object with value set would be faster than doing copy on object hopefully. There is another way to do this PR where we ask for both key and values whether to clone them or not, but could not think of a use case for it except either of them is actually a NullWritable for which I have already worked around. So thought that would be unnecessary.
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala78
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala28
4 files changed, 106 insertions, 49 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 139048d5c7..d7e681d921 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -349,25 +349,27 @@ class SparkContext(
* other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
* etc).
*/
- def hadoopRDD[K, V](
+ def hadoopRDD[K: ClassTag, V: ClassTag](
conf: JobConf,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
- minSplits: Int = defaultMinSplits
+ minSplits: Int = defaultMinSplits,
+ cloneKeyValues: Boolean = true
): RDD[(K, V)] = {
// Add necessary security credentials to the JobConf before broadcasting it.
SparkHadoopUtil.get.addCredentials(conf)
- new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
+ new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits, cloneKeyValues)
}
/** Get an RDD for a Hadoop file with an arbitrary InputFormat */
- def hadoopFile[K, V](
+ def hadoopFile[K: ClassTag, V: ClassTag](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
- minSplits: Int = defaultMinSplits
+ minSplits: Int = defaultMinSplits,
+ cloneKeyValues: Boolean = true
): RDD[(K, V)] = {
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
@@ -379,7 +381,8 @@ class SparkContext(
inputFormatClass,
keyClass,
valueClass,
- minSplits)
+ minSplits,
+ cloneKeyValues)
}
/**
@@ -390,14 +393,15 @@ class SparkContext(
* val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minSplits)
* }}}
*/
- def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, minSplits: Int)
- (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F])
- : RDD[(K, V)] = {
+ def hadoopFile[K, V, F <: InputFormat[K, V]]
+ (path: String, minSplits: Int, cloneKeyValues: Boolean = true)
+ (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
hadoopFile(path,
- fm.runtimeClass.asInstanceOf[Class[F]],
- km.runtimeClass.asInstanceOf[Class[K]],
- vm.runtimeClass.asInstanceOf[Class[V]],
- minSplits)
+ fm.runtimeClass.asInstanceOf[Class[F]],
+ km.runtimeClass.asInstanceOf[Class[K]],
+ vm.runtimeClass.asInstanceOf[Class[V]],
+ minSplits,
+ cloneKeyValues = cloneKeyValues)
}
/**
@@ -408,61 +412,67 @@ class SparkContext(
* val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path)
* }}}
*/
- def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
+ def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, cloneKeyValues: Boolean = true)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] =
- hadoopFile[K, V, F](path, defaultMinSplits)
+ hadoopFile[K, V, F](path, defaultMinSplits, cloneKeyValues)
/** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
- def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](path: String)
+ def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]
+ (path: String, cloneKeyValues: Boolean = true)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
newAPIHadoopFile(
- path,
- fm.runtimeClass.asInstanceOf[Class[F]],
- km.runtimeClass.asInstanceOf[Class[K]],
- vm.runtimeClass.asInstanceOf[Class[V]])
+ path,
+ fm.runtimeClass.asInstanceOf[Class[F]],
+ km.runtimeClass.asInstanceOf[Class[K]],
+ vm.runtimeClass.asInstanceOf[Class[V]],
+ cloneKeyValues = cloneKeyValues)
}
/**
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
* and extra configuration options to pass to the input format.
*/
- def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
+ def newAPIHadoopFile[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]](
path: String,
fClass: Class[F],
kClass: Class[K],
vClass: Class[V],
- conf: Configuration = hadoopConfiguration): RDD[(K, V)] = {
+ conf: Configuration = hadoopConfiguration,
+ cloneKeyValues: Boolean = true): RDD[(K, V)] = {
val job = new NewHadoopJob(conf)
NewFileInputFormat.addInputPath(job, new Path(path))
val updatedConf = job.getConfiguration
- new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf)
+ new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf, cloneKeyValues)
}
/**
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
* and extra configuration options to pass to the input format.
*/
- def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
+ def newAPIHadoopRDD[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]](
conf: Configuration = hadoopConfiguration,
fClass: Class[F],
kClass: Class[K],
- vClass: Class[V]): RDD[(K, V)] = {
- new NewHadoopRDD(this, fClass, kClass, vClass, conf)
+ vClass: Class[V],
+ cloneKeyValues: Boolean = true): RDD[(K, V)] = {
+ new NewHadoopRDD(this, fClass, kClass, vClass, conf, cloneKeyValues)
}
/** Get an RDD for a Hadoop SequenceFile with given key and value types. */
- def sequenceFile[K, V](path: String,
+ def sequenceFile[K: ClassTag, V: ClassTag](path: String,
keyClass: Class[K],
valueClass: Class[V],
- minSplits: Int
+ minSplits: Int,
+ cloneKeyValues: Boolean = true
): RDD[(K, V)] = {
val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
- hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits)
+ hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits, cloneKeyValues)
}
/** Get an RDD for a Hadoop SequenceFile with given key and value types. */
- def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]): RDD[(K, V)] =
- sequenceFile(path, keyClass, valueClass, defaultMinSplits)
+ def sequenceFile[K: ClassTag, V: ClassTag](path: String, keyClass: Class[K], valueClass: Class[V],
+ cloneKeyValues: Boolean = true): RDD[(K, V)] =
+ sequenceFile(path, keyClass, valueClass, defaultMinSplits, cloneKeyValues)
/**
* Version of sequenceFile() for types implicitly convertible to Writables through a
@@ -480,8 +490,8 @@ class SparkContext(
* for the appropriate type. In addition, we pass the converter a ClassTag of its type to
* allow it to figure out the Writable class to use in the subclass case.
*/
- def sequenceFile[K, V](path: String, minSplits: Int = defaultMinSplits)
- (implicit km: ClassTag[K], vm: ClassTag[V],
+ def sequenceFile[K, V](path: String, minSplits: Int = defaultMinSplits,
+ cloneKeyValues: Boolean = true) (implicit km: ClassTag[K], vm: ClassTag[V],
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
: RDD[(K, V)] = {
val kc = kcf()
@@ -489,7 +499,7 @@ class SparkContext(
val format = classOf[SequenceFileInputFormat[Writable, Writable]]
val writables = hadoopFile(path, format,
kc.writableClass(km).asInstanceOf[Class[Writable]],
- vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits)
+ vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits, cloneKeyValues)
writables.map{case (k,v) => (kc.convert(k), vc.convert(v))}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 53f77a38f5..2da4611b9c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -19,7 +19,10 @@ package org.apache.spark.rdd
import java.io.EOFException
-import org.apache.hadoop.mapred.FileInputFormat
+import scala.reflect.ClassTag
+
+import org.apache.hadoop.conf.{Configuration, Configurable}
+import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.InputFormat
import org.apache.hadoop.mapred.InputSplit
import org.apache.hadoop.mapred.JobConf
@@ -31,7 +34,7 @@ import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.NextIterator
-import org.apache.hadoop.conf.{Configuration, Configurable}
+import org.apache.spark.util.Utils.cloneWritables
/**
@@ -62,14 +65,15 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
* @param valueClass Class of the value associated with the inputFormatClass.
* @param minSplits Minimum number of Hadoop Splits (HadoopRDD partitions) to generate.
*/
-class HadoopRDD[K, V](
+class HadoopRDD[K: ClassTag, V: ClassTag](
sc: SparkContext,
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
initLocalJobConfFuncOpt: Option[JobConf => Unit],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
- minSplits: Int)
+ minSplits: Int,
+ cloneKeyValues: Boolean)
extends RDD[(K, V)](sc, Nil) with Logging {
def this(
@@ -78,7 +82,8 @@ class HadoopRDD[K, V](
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
- minSplits: Int) = {
+ minSplits: Int,
+ cloneKeyValues: Boolean) = {
this(
sc,
sc.broadcast(new SerializableWritable(conf))
@@ -87,7 +92,8 @@ class HadoopRDD[K, V](
inputFormatClass,
keyClass,
valueClass,
- minSplits)
+ minSplits,
+ cloneKeyValues)
}
protected val jobConfCacheKey = "rdd_%d_job_conf".format(id)
@@ -158,10 +164,10 @@ class HadoopRDD[K, V](
// Register an on-task-completion callback to close the input stream.
context.addOnCompleteCallback{ () => closeIfNeeded() }
-
val key: K = reader.createKey()
+ val keyCloneFunc = cloneWritables[K](getConf)
val value: V = reader.createValue()
-
+ val valueCloneFunc = cloneWritables[V](getConf)
override def getNext() = {
try {
finished = !reader.next(key, value)
@@ -169,7 +175,12 @@ class HadoopRDD[K, V](
case eof: EOFException =>
finished = true
}
- (key, value)
+ if (cloneKeyValues) {
+ (keyCloneFunc(key.asInstanceOf[Writable]),
+ valueCloneFunc(value.asInstanceOf[Writable]))
+ } else {
+ (key, value)
+ }
}
override def close() {
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 73d15b9082..a34786495b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -20,11 +20,14 @@ package org.apache.spark.rdd
import java.text.SimpleDateFormat
import java.util.Date
+import scala.reflect.ClassTag
+
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
import org.apache.spark.{InterruptibleIterator, Logging, Partition, SerializableWritable, SparkContext, TaskContext}
+import org.apache.spark.util.Utils.cloneWritables
private[spark]
@@ -36,12 +39,13 @@ class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputS
override def hashCode(): Int = (41 * (41 + rddId) + index)
}
-class NewHadoopRDD[K, V](
+class NewHadoopRDD[K: ClassTag, V: ClassTag](
sc : SparkContext,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
- @transient conf: Configuration)
+ @transient conf: Configuration,
+ cloneKeyValues: Boolean)
extends RDD[(K, V)](sc, Nil)
with SparkHadoopMapReduceUtil
with Logging {
@@ -88,7 +92,8 @@ class NewHadoopRDD[K, V](
// Register an on-task-completion callback to close the input stream.
context.addOnCompleteCallback(() => close())
-
+ val keyCloneFunc = cloneWritables[K](conf)
+ val valueCloneFunc = cloneWritables[V](conf)
var havePair = false
var finished = false
@@ -105,7 +110,14 @@ class NewHadoopRDD[K, V](
throw new java.util.NoSuchElementException("End of stream")
}
havePair = false
- (reader.getCurrentKey, reader.getCurrentValue)
+ val key = reader.getCurrentKey
+ val value = reader.getCurrentValue
+ if (cloneKeyValues) {
+ (keyCloneFunc(key.asInstanceOf[Writable]),
+ valueCloneFunc(value.asInstanceOf[Writable]))
+ } else {
+ (key, value)
+ }
}
private def close() {
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 5f1253100b..23b72701c2 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -26,23 +26,47 @@ import scala.collection.JavaConversions._
import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
import scala.io.Source
-import scala.reflect.ClassTag
+import scala.reflect.{classTag, ClassTag}
import com.google.common.io.Files
import com.google.common.util.concurrent.ThreadFactoryBuilder
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
+import org.apache.hadoop.io._
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
import org.apache.spark.deploy.SparkHadoopUtil
import java.nio.ByteBuffer
-import org.apache.spark.{SparkConf, SparkContext, SparkException, Logging}
+import org.apache.spark.{SparkConf, SparkException, Logging}
/**
* Various utility methods used by Spark.
*/
private[spark] object Utils extends Logging {
+
+ /**
+ * We try to clone for most common types of writables and we call WritableUtils.clone otherwise
+ * intention is to optimize, for example for NullWritable there is no need and for Long, int and
+ * String creating a new object with value set would be faster.
+ */
+ def cloneWritables[T: ClassTag](conf: Configuration): Writable => T = {
+ val cloneFunc = classTag[T] match {
+ case ClassTag(_: Text) =>
+ (w: Writable) => new Text(w.asInstanceOf[Text].getBytes).asInstanceOf[T]
+ case ClassTag(_: LongWritable) =>
+ (w: Writable) => new LongWritable(w.asInstanceOf[LongWritable].get).asInstanceOf[T]
+ case ClassTag(_: IntWritable) =>
+ (w: Writable) => new IntWritable(w.asInstanceOf[IntWritable].get).asInstanceOf[T]
+ case ClassTag(_: NullWritable) =>
+ (w: Writable) => w.asInstanceOf[T] // TODO: should we clone this ?
+ case _ =>
+ (w: Writable) => WritableUtils.clone(w, conf).asInstanceOf[T] // slower way of cloning.
+ }
+ cloneFunc
+ }
+
/** Serialize an object using Java serialization */
def serialize[T](o: T): Array[Byte] = {
val bos = new ByteArrayOutputStream()