aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala46
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala12
-rw-r--r--project/MimaExcludes.scala159
3 files changed, 115 insertions, 102 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 9d62d53fcb..29038b0359 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -125,7 +125,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
zeroBuffer.get(zeroArray)
lazy val cachedSerializer = SparkEnv.get.closureSerializer.newInstance()
- def createZero() = cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))
+ val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))
combineByKey[U]((v: V) => seqOp(createZero(), v), seqOp, combOp, partitioner)
}
@@ -171,7 +171,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
// When deserializing, use a lazy val to create just one instance of the serializer per task
lazy val cachedSerializer = SparkEnv.get.closureSerializer.newInstance()
- def createZero() = cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))
+ val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))
combineByKey[V]((v: V) => func(createZero(), v), func, func, partitioner)
}
@@ -214,22 +214,22 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
throw new SparkException("reduceByKeyLocally() does not support array keys")
}
- def reducePartition(iter: Iterator[(K, V)]): Iterator[JHashMap[K, V]] = {
+ val reducePartition = (iter: Iterator[(K, V)]) => {
val map = new JHashMap[K, V]
iter.foreach { case (k, v) =>
val old = map.get(k)
map.put(k, if (old == null) v else func(old, v))
}
Iterator(map)
- }
+ } : Iterator[JHashMap[K, V]]
- def mergeMaps(m1: JHashMap[K, V], m2: JHashMap[K, V]): JHashMap[K, V] = {
+ val mergeMaps = (m1: JHashMap[K, V], m2: JHashMap[K, V]) => {
m2.foreach { case (k, v) =>
val old = m1.get(k)
m1.put(k, if (old == null) v else func(old, v))
}
m1
- }
+ } : JHashMap[K, V]
self.mapPartitions(reducePartition).reduce(mergeMaps)
}
@@ -361,11 +361,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
// groupByKey shouldn't use map side combine because map side combine does not
// reduce the amount of data shuffled and requires all map side data be inserted
// into a hash table, leading to more objects in the old gen.
- def createCombiner(v: V) = ArrayBuffer(v)
- def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
- def mergeCombiners(c1: ArrayBuffer[V], c2: ArrayBuffer[V]) = c1 ++ c2
+ val createCombiner = (v: V) => ArrayBuffer(v)
+ val mergeValue = (buf: ArrayBuffer[V], v: V) => buf += v
+ val mergeCombiners = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => c1 ++ c2
val bufs = combineByKey[ArrayBuffer[V]](
- createCombiner _, mergeValue _, mergeCombiners _, partitioner, mapSideCombine=false)
+ createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine=false)
bufs.mapValues(_.toIterable)
}
@@ -710,14 +710,14 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
self.partitioner match {
case Some(p) =>
val index = p.getPartition(key)
- def process(it: Iterator[(K, V)]): Seq[V] = {
+ val process = (it: Iterator[(K, V)]) => {
val buf = new ArrayBuffer[V]
for ((k, v) <- it if k == key) {
buf += v
}
buf
- }
- val res = self.context.runJob(self, process _, Array(index), false)
+ } : Seq[V]
+ val res = self.context.runJob(self, process, Array(index), false)
res(0)
case None =>
self.filter(_._1 == key).map(_._2).collect()
@@ -840,7 +840,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
jobFormat.checkOutputSpecs(job)
}
- def writeShard(context: TaskContext, iter: Iterator[(K,V)]): Int = {
+ val writeShard = (context: TaskContext, iter: Iterator[(K,V)]) => {
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
@@ -861,19 +861,18 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val (k, v) = iter.next()
writer.write(k, v)
}
- }
- finally {
+ } finally {
writer.close(hadoopContext)
}
committer.commitTask(hadoopContext)
- return 1
- }
+ 1
+ } : Int
val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0)
val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
jobCommitter.setupJob(jobTaskContext)
- self.context.runJob(self, writeShard _)
+ self.context.runJob(self, writeShard)
jobCommitter.commitJob(jobTaskContext)
}
@@ -912,7 +911,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val writer = new SparkHadoopWriter(hadoopConf)
writer.preSetup()
- def writeToFile(context: TaskContext, iter: Iterator[(K, V)]) {
+ val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => {
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
@@ -921,19 +920,18 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
writer.open()
try {
var count = 0
- while(iter.hasNext) {
+ while (iter.hasNext) {
val record = iter.next()
count += 1
writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
}
- }
- finally {
+ } finally {
writer.close()
}
writer.commit()
}
- self.context.runJob(self, writeToFile _)
+ self.context.runJob(self, writeToFile)
writer.commitJob()
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index a25f263bea..88a918aebf 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -328,7 +328,7 @@ abstract class RDD[T: ClassTag](
: RDD[T] = {
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
- def distributePartition(index: Int, items: Iterator[T]): Iterator[(Int, T)] = {
+ val distributePartition = (index: Int, items: Iterator[T]) => {
var position = (new Random(index)).nextInt(numPartitions)
items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
@@ -336,7 +336,7 @@ abstract class RDD[T: ClassTag](
position = position + 1
(position, t)
}
- }
+ } : Iterator[(Int, T)]
// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
@@ -919,19 +919,19 @@ abstract class RDD[T: ClassTag](
throw new SparkException("countByValue() does not support arrays")
}
// TODO: This should perhaps be distributed by default.
- def countPartition(iter: Iterator[T]): Iterator[OpenHashMap[T,Long]] = {
+ val countPartition = (iter: Iterator[T]) => {
val map = new OpenHashMap[T,Long]
iter.foreach {
t => map.changeValue(t, 1L, _ + 1L)
}
Iterator(map)
- }
- def mergeMaps(m1: OpenHashMap[T,Long], m2: OpenHashMap[T,Long]): OpenHashMap[T,Long] = {
+ }: Iterator[OpenHashMap[T,Long]]
+ val mergeMaps = (m1: OpenHashMap[T,Long], m2: OpenHashMap[T,Long]) => {
m2.foreach { case (key, value) =>
m1.changeValue(key, value, _ + value)
}
m1
- }
+ }: OpenHashMap[T,Long]
val myResult = mapPartitions(countPartition).reduce(mergeMaps)
// Convert to a Scala mutable map
val mutableResult = scala.collection.mutable.Map[T,Long]()
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index d67c6571a0..3487f7c5c1 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -31,76 +31,91 @@ import com.typesafe.tools.mima.core._
* MimaBuild.excludeSparkClass("graphx.util.collection.GraphXPrimitiveKeyOpenHashMap")
*/
object MimaExcludes {
- def excludes(version: String) =
- version match {
- case v if v.startsWith("1.1") =>
- Seq(
- MimaBuild.excludeSparkPackage("deploy"),
- MimaBuild.excludeSparkPackage("graphx")
- ) ++
- Seq(
- // Adding new method to JavaRDLike trait - we should probably mark this as a developer API.
- ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.partitions"),
- // We made a mistake earlier (ed06500d3) in the Java API to use default parameter values
- // for countApproxDistinct* functions, which does not work in Java. We later removed
- // them, and use the following to tell Mima to not care about them.
- ProblemFilters.exclude[IncompatibleResultTypeProblem](
- "org.apache.spark.api.java.JavaPairRDD.countApproxDistinctByKey"),
- ProblemFilters.exclude[IncompatibleResultTypeProblem](
- "org.apache.spark.api.java.JavaPairRDD.countApproxDistinctByKey"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.api.java.JavaPairRDD.countApproxDistinct$default$1"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.api.java.JavaPairRDD.countApproxDistinctByKey$default$1"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.api.java.JavaRDD.countApproxDistinct$default$1"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.api.java.JavaRDDLike.countApproxDistinct$default$1"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.api.java.JavaDoubleRDD.countApproxDistinct$default$1"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.storage.MemoryStore.Entry"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.rdd.PairRDDFunctions.org$apache$spark$rdd$PairRDDFunctions$$"
- + "createZero$1")
- ) ++
- Seq(
- ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.FlumeReceiver.this")
- ) ++
- Seq( // Ignore some private methods in ALS.
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$^dateFeatures"),
- ProblemFilters.exclude[MissingMethodProblem]( // The only public constructor is the one without arguments.
- "org.apache.spark.mllib.recommendation.ALS.this"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$<init>$default$7")
- ) ++
- MimaBuild.excludeSparkClass("mllib.linalg.distributed.ColumnStatisticsAggregator") ++
- MimaBuild.excludeSparkClass("rdd.ZippedRDD") ++
- MimaBuild.excludeSparkClass("rdd.ZippedPartition") ++
- MimaBuild.excludeSparkClass("util.SerializableHyperLogLog") ++
- MimaBuild.excludeSparkClass("storage.Values") ++
- MimaBuild.excludeSparkClass("storage.Entry") ++
- MimaBuild.excludeSparkClass("storage.MemoryStore$Entry")
- case v if v.startsWith("1.0") =>
- Seq(
- MimaBuild.excludeSparkPackage("api.java"),
- MimaBuild.excludeSparkPackage("mllib"),
- MimaBuild.excludeSparkPackage("streaming")
- ) ++
- MimaBuild.excludeSparkClass("rdd.ClassTags") ++
- MimaBuild.excludeSparkClass("util.XORShiftRandom") ++
- MimaBuild.excludeSparkClass("graphx.EdgeRDD") ++
- MimaBuild.excludeSparkClass("graphx.VertexRDD") ++
- MimaBuild.excludeSparkClass("graphx.impl.GraphImpl") ++
- MimaBuild.excludeSparkClass("graphx.impl.RoutingTable") ++
- MimaBuild.excludeSparkClass("graphx.util.collection.PrimitiveKeyOpenHashMap") ++
- MimaBuild.excludeSparkClass("graphx.util.collection.GraphXPrimitiveKeyOpenHashMap") ++
- MimaBuild.excludeSparkClass("mllib.recommendation.MFDataGenerator") ++
- MimaBuild.excludeSparkClass("mllib.optimization.SquaredGradient") ++
- MimaBuild.excludeSparkClass("mllib.regression.RidgeRegressionWithSGD") ++
- MimaBuild.excludeSparkClass("mllib.regression.LassoWithSGD") ++
- MimaBuild.excludeSparkClass("mllib.regression.LinearRegressionWithSGD")
- case _ => Seq()
- }
+
+ def excludes(version: String) = version match {
+ case v if v.startsWith("1.1") =>
+ Seq(
+ MimaBuild.excludeSparkPackage("deploy"),
+ MimaBuild.excludeSparkPackage("graphx")
+ ) ++
+ closures.map(method => ProblemFilters.exclude[MissingMethodProblem](method)) ++
+ Seq(
+ // Adding new method to JavaRDLike trait - we should probably mark this as a developer API.
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.partitions"),
+ // We made a mistake earlier (ed06500d3) in the Java API to use default parameter values
+ // for countApproxDistinct* functions, which does not work in Java. We later removed
+ // them, and use the following to tell Mima to not care about them.
+ ProblemFilters.exclude[IncompatibleResultTypeProblem](
+ "org.apache.spark.api.java.JavaPairRDD.countApproxDistinctByKey"),
+ ProblemFilters.exclude[IncompatibleResultTypeProblem](
+ "org.apache.spark.api.java.JavaPairRDD.countApproxDistinctByKey"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.JavaPairRDD.countApproxDistinct$default$1"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.JavaPairRDD.countApproxDistinctByKey$default$1"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.JavaRDD.countApproxDistinct$default$1"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.JavaRDDLike.countApproxDistinct$default$1"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.JavaDoubleRDD.countApproxDistinct$default$1"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.storage.MemoryStore.Entry"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.rdd.PairRDDFunctions.org$apache$spark$rdd$PairRDDFunctions$$"
+ + "createZero$1")
+ ) ++
+ Seq(
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.FlumeReceiver.this")
+ ) ++
+ Seq( // Ignore some private methods in ALS.
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$^dateFeatures"),
+ ProblemFilters.exclude[MissingMethodProblem]( // The only public constructor is the one without arguments.
+ "org.apache.spark.mllib.recommendation.ALS.this"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$<init>$default$7")
+ ) ++
+ MimaBuild.excludeSparkClass("mllib.linalg.distributed.ColumnStatisticsAggregator") ++
+ MimaBuild.excludeSparkClass("rdd.ZippedRDD") ++
+ MimaBuild.excludeSparkClass("rdd.ZippedPartition") ++
+ MimaBuild.excludeSparkClass("util.SerializableHyperLogLog") ++
+ MimaBuild.excludeSparkClass("storage.Values") ++
+ MimaBuild.excludeSparkClass("storage.Entry") ++
+ MimaBuild.excludeSparkClass("storage.MemoryStore$Entry")
+ case v if v.startsWith("1.0") =>
+ Seq(
+ MimaBuild.excludeSparkPackage("api.java"),
+ MimaBuild.excludeSparkPackage("mllib"),
+ MimaBuild.excludeSparkPackage("streaming")
+ ) ++
+ MimaBuild.excludeSparkClass("rdd.ClassTags") ++
+ MimaBuild.excludeSparkClass("util.XORShiftRandom") ++
+ MimaBuild.excludeSparkClass("graphx.EdgeRDD") ++
+ MimaBuild.excludeSparkClass("graphx.VertexRDD") ++
+ MimaBuild.excludeSparkClass("graphx.impl.GraphImpl") ++
+ MimaBuild.excludeSparkClass("graphx.impl.RoutingTable") ++
+ MimaBuild.excludeSparkClass("graphx.util.collection.PrimitiveKeyOpenHashMap") ++
+ MimaBuild.excludeSparkClass("graphx.util.collection.GraphXPrimitiveKeyOpenHashMap") ++
+ MimaBuild.excludeSparkClass("mllib.recommendation.MFDataGenerator") ++
+ MimaBuild.excludeSparkClass("mllib.optimization.SquaredGradient") ++
+ MimaBuild.excludeSparkClass("mllib.regression.RidgeRegressionWithSGD") ++
+ MimaBuild.excludeSparkClass("mllib.regression.LassoWithSGD") ++
+ MimaBuild.excludeSparkClass("mllib.regression.LinearRegressionWithSGD")
+ case _ => Seq()
+ }
+
+ private val closures = Seq(
+ "org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$mergeMaps$1",
+ "org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$countPartition$1",
+ "org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$distributePartition$1",
+ "org.apache.spark.rdd.PairRDDFunctions.org$apache$spark$rdd$PairRDDFunctions$$mergeValue$1",
+ "org.apache.spark.rdd.PairRDDFunctions.org$apache$spark$rdd$PairRDDFunctions$$writeToFile$1",
+ "org.apache.spark.rdd.PairRDDFunctions.org$apache$spark$rdd$PairRDDFunctions$$reducePartition$1",
+ "org.apache.spark.rdd.PairRDDFunctions.org$apache$spark$rdd$PairRDDFunctions$$writeShard$1",
+ "org.apache.spark.rdd.PairRDDFunctions.org$apache$spark$rdd$PairRDDFunctions$$mergeCombiners$1",
+ "org.apache.spark.rdd.PairRDDFunctions.org$apache$spark$rdd$PairRDDFunctions$$process$1",
+ "org.apache.spark.rdd.PairRDDFunctions.org$apache$spark$rdd$PairRDDFunctions$$createCombiner$1",
+ "org.apache.spark.rdd.PairRDDFunctions.org$apache$spark$rdd$PairRDDFunctions$$mergeMaps$1"
+ )
}