aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala78
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala28
4 files changed, 106 insertions, 49 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 139048d5c7..d7e681d921 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -349,25 +349,27 @@ class SparkContext(
* other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
* etc).
*/
- def hadoopRDD[K, V](
+ def hadoopRDD[K: ClassTag, V: ClassTag](
conf: JobConf,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
- minSplits: Int = defaultMinSplits
+ minSplits: Int = defaultMinSplits,
+ cloneKeyValues: Boolean = true
): RDD[(K, V)] = {
// Add necessary security credentials to the JobConf before broadcasting it.
SparkHadoopUtil.get.addCredentials(conf)
- new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
+ new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits, cloneKeyValues)
}
/** Get an RDD for a Hadoop file with an arbitrary InputFormat */
- def hadoopFile[K, V](
+ def hadoopFile[K: ClassTag, V: ClassTag](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
- minSplits: Int = defaultMinSplits
+ minSplits: Int = defaultMinSplits,
+ cloneKeyValues: Boolean = true
): RDD[(K, V)] = {
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
@@ -379,7 +381,8 @@ class SparkContext(
inputFormatClass,
keyClass,
valueClass,
- minSplits)
+ minSplits,
+ cloneKeyValues)
}
/**
@@ -390,14 +393,15 @@ class SparkContext(
* val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minSplits)
* }}}
*/
- def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, minSplits: Int)
- (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F])
- : RDD[(K, V)] = {
+ def hadoopFile[K, V, F <: InputFormat[K, V]]
+ (path: String, minSplits: Int, cloneKeyValues: Boolean = true)
+ (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
hadoopFile(path,
- fm.runtimeClass.asInstanceOf[Class[F]],
- km.runtimeClass.asInstanceOf[Class[K]],
- vm.runtimeClass.asInstanceOf[Class[V]],
- minSplits)
+ fm.runtimeClass.asInstanceOf[Class[F]],
+ km.runtimeClass.asInstanceOf[Class[K]],
+ vm.runtimeClass.asInstanceOf[Class[V]],
+ minSplits,
+ cloneKeyValues = cloneKeyValues)
}
/**
@@ -408,61 +412,67 @@ class SparkContext(
* val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path)
* }}}
*/
- def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
+ def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, cloneKeyValues: Boolean = true)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] =
- hadoopFile[K, V, F](path, defaultMinSplits)
+ hadoopFile[K, V, F](path, defaultMinSplits, cloneKeyValues)
/** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
- def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](path: String)
+ def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]
+ (path: String, cloneKeyValues: Boolean = true)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
newAPIHadoopFile(
- path,
- fm.runtimeClass.asInstanceOf[Class[F]],
- km.runtimeClass.asInstanceOf[Class[K]],
- vm.runtimeClass.asInstanceOf[Class[V]])
+ path,
+ fm.runtimeClass.asInstanceOf[Class[F]],
+ km.runtimeClass.asInstanceOf[Class[K]],
+ vm.runtimeClass.asInstanceOf[Class[V]],
+ cloneKeyValues = cloneKeyValues)
}
/**
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
* and extra configuration options to pass to the input format.
*/
- def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
+ def newAPIHadoopFile[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]](
path: String,
fClass: Class[F],
kClass: Class[K],
vClass: Class[V],
- conf: Configuration = hadoopConfiguration): RDD[(K, V)] = {
+ conf: Configuration = hadoopConfiguration,
+ cloneKeyValues: Boolean = true): RDD[(K, V)] = {
val job = new NewHadoopJob(conf)
NewFileInputFormat.addInputPath(job, new Path(path))
val updatedConf = job.getConfiguration
- new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf)
+ new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf, cloneKeyValues)
}
/**
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
* and extra configuration options to pass to the input format.
*/
- def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
+ def newAPIHadoopRDD[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]](
conf: Configuration = hadoopConfiguration,
fClass: Class[F],
kClass: Class[K],
- vClass: Class[V]): RDD[(K, V)] = {
- new NewHadoopRDD(this, fClass, kClass, vClass, conf)
+ vClass: Class[V],
+ cloneKeyValues: Boolean = true): RDD[(K, V)] = {
+ new NewHadoopRDD(this, fClass, kClass, vClass, conf, cloneKeyValues)
}
/** Get an RDD for a Hadoop SequenceFile with given key and value types. */
- def sequenceFile[K, V](path: String,
+ def sequenceFile[K: ClassTag, V: ClassTag](path: String,
keyClass: Class[K],
valueClass: Class[V],
- minSplits: Int
+ minSplits: Int,
+ cloneKeyValues: Boolean = true
): RDD[(K, V)] = {
val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
- hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits)
+ hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits, cloneKeyValues)
}
/** Get an RDD for a Hadoop SequenceFile with given key and value types. */
- def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]): RDD[(K, V)] =
- sequenceFile(path, keyClass, valueClass, defaultMinSplits)
+ def sequenceFile[K: ClassTag, V: ClassTag](path: String, keyClass: Class[K], valueClass: Class[V],
+ cloneKeyValues: Boolean = true): RDD[(K, V)] =
+ sequenceFile(path, keyClass, valueClass, defaultMinSplits, cloneKeyValues)
/**
* Version of sequenceFile() for types implicitly convertible to Writables through a
@@ -480,8 +490,8 @@ class SparkContext(
* for the appropriate type. In addition, we pass the converter a ClassTag of its type to
* allow it to figure out the Writable class to use in the subclass case.
*/
- def sequenceFile[K, V](path: String, minSplits: Int = defaultMinSplits)
- (implicit km: ClassTag[K], vm: ClassTag[V],
+ def sequenceFile[K, V](path: String, minSplits: Int = defaultMinSplits,
+ cloneKeyValues: Boolean = true) (implicit km: ClassTag[K], vm: ClassTag[V],
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
: RDD[(K, V)] = {
val kc = kcf()
@@ -489,7 +499,7 @@ class SparkContext(
val format = classOf[SequenceFileInputFormat[Writable, Writable]]
val writables = hadoopFile(path, format,
kc.writableClass(km).asInstanceOf[Class[Writable]],
- vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits)
+ vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits, cloneKeyValues)
writables.map{case (k,v) => (kc.convert(k), vc.convert(v))}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 53f77a38f5..2da4611b9c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -19,7 +19,10 @@ package org.apache.spark.rdd
import java.io.EOFException
-import org.apache.hadoop.mapred.FileInputFormat
+import scala.reflect.ClassTag
+
+import org.apache.hadoop.conf.{Configuration, Configurable}
+import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.InputFormat
import org.apache.hadoop.mapred.InputSplit
import org.apache.hadoop.mapred.JobConf
@@ -31,7 +34,7 @@ import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.NextIterator
-import org.apache.hadoop.conf.{Configuration, Configurable}
+import org.apache.spark.util.Utils.cloneWritables
/**
@@ -62,14 +65,15 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
* @param valueClass Class of the value associated with the inputFormatClass.
* @param minSplits Minimum number of Hadoop Splits (HadoopRDD partitions) to generate.
*/
-class HadoopRDD[K, V](
+class HadoopRDD[K: ClassTag, V: ClassTag](
sc: SparkContext,
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
initLocalJobConfFuncOpt: Option[JobConf => Unit],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
- minSplits: Int)
+ minSplits: Int,
+ cloneKeyValues: Boolean)
extends RDD[(K, V)](sc, Nil) with Logging {
def this(
@@ -78,7 +82,8 @@ class HadoopRDD[K, V](
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
- minSplits: Int) = {
+ minSplits: Int,
+ cloneKeyValues: Boolean) = {
this(
sc,
sc.broadcast(new SerializableWritable(conf))
@@ -87,7 +92,8 @@ class HadoopRDD[K, V](
inputFormatClass,
keyClass,
valueClass,
- minSplits)
+ minSplits,
+ cloneKeyValues)
}
protected val jobConfCacheKey = "rdd_%d_job_conf".format(id)
@@ -158,10 +164,10 @@ class HadoopRDD[K, V](
// Register an on-task-completion callback to close the input stream.
context.addOnCompleteCallback{ () => closeIfNeeded() }
-
val key: K = reader.createKey()
+ val keyCloneFunc = cloneWritables[K](getConf)
val value: V = reader.createValue()
-
+ val valueCloneFunc = cloneWritables[V](getConf)
override def getNext() = {
try {
finished = !reader.next(key, value)
@@ -169,7 +175,12 @@ class HadoopRDD[K, V](
case eof: EOFException =>
finished = true
}
- (key, value)
+ if (cloneKeyValues) {
+ (keyCloneFunc(key.asInstanceOf[Writable]),
+ valueCloneFunc(value.asInstanceOf[Writable]))
+ } else {
+ (key, value)
+ }
}
override def close() {
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 73d15b9082..a34786495b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -20,11 +20,14 @@ package org.apache.spark.rdd
import java.text.SimpleDateFormat
import java.util.Date
+import scala.reflect.ClassTag
+
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
import org.apache.spark.{InterruptibleIterator, Logging, Partition, SerializableWritable, SparkContext, TaskContext}
+import org.apache.spark.util.Utils.cloneWritables
private[spark]
@@ -36,12 +39,13 @@ class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputS
override def hashCode(): Int = (41 * (41 + rddId) + index)
}
-class NewHadoopRDD[K, V](
+class NewHadoopRDD[K: ClassTag, V: ClassTag](
sc : SparkContext,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
- @transient conf: Configuration)
+ @transient conf: Configuration,
+ cloneKeyValues: Boolean)
extends RDD[(K, V)](sc, Nil)
with SparkHadoopMapReduceUtil
with Logging {
@@ -88,7 +92,8 @@ class NewHadoopRDD[K, V](
// Register an on-task-completion callback to close the input stream.
context.addOnCompleteCallback(() => close())
-
+ val keyCloneFunc = cloneWritables[K](conf)
+ val valueCloneFunc = cloneWritables[V](conf)
var havePair = false
var finished = false
@@ -105,7 +110,14 @@ class NewHadoopRDD[K, V](
throw new java.util.NoSuchElementException("End of stream")
}
havePair = false
- (reader.getCurrentKey, reader.getCurrentValue)
+ val key = reader.getCurrentKey
+ val value = reader.getCurrentValue
+ if (cloneKeyValues) {
+ (keyCloneFunc(key.asInstanceOf[Writable]),
+ valueCloneFunc(value.asInstanceOf[Writable]))
+ } else {
+ (key, value)
+ }
}
private def close() {
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 5f1253100b..23b72701c2 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -26,23 +26,47 @@ import scala.collection.JavaConversions._
import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
import scala.io.Source
-import scala.reflect.ClassTag
+import scala.reflect.{classTag, ClassTag}
import com.google.common.io.Files
import com.google.common.util.concurrent.ThreadFactoryBuilder
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
+import org.apache.hadoop.io._
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
import org.apache.spark.deploy.SparkHadoopUtil
import java.nio.ByteBuffer
-import org.apache.spark.{SparkConf, SparkContext, SparkException, Logging}
+import org.apache.spark.{SparkConf, SparkException, Logging}
/**
* Various utility methods used by Spark.
*/
private[spark] object Utils extends Logging {
+
+ /**
+ * We try to clone for most common types of writables and we call WritableUtils.clone otherwise
+ * intention is to optimize, for example for NullWritable there is no need and for Long, int and
+ * String creating a new object with value set would be faster.
+ */
+ def cloneWritables[T: ClassTag](conf: Configuration): Writable => T = {
+ val cloneFunc = classTag[T] match {
+ case ClassTag(_: Text) =>
+ (w: Writable) => new Text(w.asInstanceOf[Text].getBytes).asInstanceOf[T]
+ case ClassTag(_: LongWritable) =>
+ (w: Writable) => new LongWritable(w.asInstanceOf[LongWritable].get).asInstanceOf[T]
+ case ClassTag(_: IntWritable) =>
+ (w: Writable) => new IntWritable(w.asInstanceOf[IntWritable].get).asInstanceOf[T]
+ case ClassTag(_: NullWritable) =>
+ (w: Writable) => w.asInstanceOf[T] // TODO: should we clone this ?
+ case _ =>
+ (w: Writable) => WritableUtils.clone(w, conf).asInstanceOf[T] // slower way of cloning.
+ }
+ cloneFunc
+ }
+
/** Serialize an object using Java serialization */
def serialize[T](o: T): Array[Byte] = {
val bos = new ByteArrayOutputStream()