aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/PairRDDFunctions.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/spark/PairRDDFunctions.scala')
-rw-r--r--core/src/main/scala/spark/PairRDDFunctions.scala60
1 files changed, 53 insertions, 7 deletions
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index e7408e4352..2b0e697337 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -52,7 +52,8 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
- mapSideCombine: Boolean = true): RDD[(K, C)] = {
+ mapSideCombine: Boolean = true,
+ serializerClass: String = null): RDD[(K, C)] = {
if (getKeyClass().isArray) {
if (mapSideCombine) {
throw new SparkException("Cannot use map-side combining with array keys.")
@@ -67,13 +68,13 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
self.mapPartitions(aggregator.combineValuesByKey(_), true)
} else if (mapSideCombine) {
val mapSideCombined = self.mapPartitions(aggregator.combineValuesByKey(_), true)
- val partitioned = new ShuffledRDD[K, C](mapSideCombined, partitioner)
+ val partitioned = new ShuffledRDD[K, C](mapSideCombined, partitioner, serializerClass)
partitioned.mapPartitions(aggregator.combineCombinersByKey(_), true)
} else {
// Don't apply map-side combiner.
// A sanity check to make sure mergeCombiners is not defined.
assert(mergeCombiners == null)
- val values = new ShuffledRDD[K, V](self, partitioner)
+ val values = new ShuffledRDD[K, V](self, partitioner, serializerClass)
values.mapPartitions(aggregator.combineValuesByKey(_), true)
}
}
@@ -89,6 +90,33 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
}
/**
+ * Merge the values for each key using an associative function and a neutral "zero value" which may
+ * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
+ * list concatenation, 0 for addition, or 1 for multiplication.).
+ */
+ def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = {
+ combineByKey[V]({v: V => func(zeroValue, v)}, func, func, partitioner)
+ }
+
+ /**
+ * Merge the values for each key using an associative function and a neutral "zero value" which may
+ * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
+ * list concatenation, 0 for addition, or 1 for multiplication.).
+ */
+ def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] = {
+ foldByKey(zeroValue, new HashPartitioner(numPartitions))(func)
+ }
+
+ /**
+ * Merge the values for each key using an associative function and a neutral "zero value" which may
+ * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
+ * list concatenation, 0 for addition, or 1 for multiplication.).
+ */
+ def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = {
+ foldByKey(zeroValue, defaultPartitioner(self))(func)
+ }
+
+ /**
* Merge the values for each key using an associative reduce function. This will also perform
* the merging locally on each mapper before sending results to a reducer, similarly to a
* "combiner" in MapReduce.
@@ -441,6 +469,23 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
}
/**
+ * Return an RDD with the pairs from `this` whose keys are not in `other`.
+ *
+ * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
+ * RDD will be <= us.
+ */
+ def subtractByKey[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, V)] =
+ subtractByKey(other, self.partitioner.getOrElse(new HashPartitioner(self.partitions.size)))
+
+ /** Return an RDD with the pairs from `this` whose keys are not in `other`. */
+ def subtractByKey[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)] =
+ subtractByKey(other, new HashPartitioner(numPartitions))
+
+ /** Return an RDD with the pairs from `this` whose keys are not in `other`. */
+ def subtractByKey[W: ClassManifest](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)] =
+ new SubtractedRDD[K, V, W](self, other, p)
+
+ /**
* Return the list of values in the RDD for key `key`. This operation is done efficiently if the
* RDD has a known partitioner by only searching the partition that the key maps to.
*/
@@ -501,8 +546,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
/* "reduce task" <split #> <attempt # = spark task #> */
- val attemptId = new TaskAttemptID(jobtrackerID,
- stageId, false, context.splitId, attemptNumber)
+ val attemptId = newTaskAttemptID(jobtrackerID, stageId, false, context.splitId, attemptNumber)
val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
val format = outputFormatClass.newInstance
val committer = format.getOutputCommitter(hadoopContext)
@@ -521,11 +565,12 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
* however we're only going to use this local OutputCommitter for
* setupJob/commitJob, so we just use a dummy "map" task.
*/
- val jobAttemptId = new TaskAttemptID(jobtrackerID, stageId, true, 0, 0)
+ val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, true, 0, 0)
val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
jobCommitter.setupJob(jobTaskContext)
val count = self.context.runJob(self, writeShard _).sum
+ jobCommitter.commitJob(jobTaskContext)
jobCommitter.cleanupJob(jobTaskContext)
}
@@ -593,6 +638,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
}
self.context.runJob(self, writeToFile _)
+ writer.commitJob()
writer.cleanup()
}
@@ -600,7 +646,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
* Return an RDD with the keys of each tuple.
*/
def keys: RDD[K] = self.map(_._1)
-
+
/**
* Return an RDD with the values of each tuple.
*/