diff options
author | Xiangrui Meng <meng@databricks.com> | 2014-07-23 00:58:55 -0700 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-07-23 00:58:55 -0700 |
commit | 4c7243e109c713bdfb87891748800109ffbaae07 (patch) | |
tree | 4e280534354b05c0a313336e51b9555e25d8c6d1 /mllib | |
parent | 6c2be93f081f33e9e97e1231b0084a6a0eb4fa22 (diff) | |
download | spark-4c7243e109c713bdfb87891748800109ffbaae07.tar.gz spark-4c7243e109c713bdfb87891748800109ffbaae07.tar.bz2 spark-4c7243e109c713bdfb87891748800109ffbaae07.zip |
[SPARK-2617] Correct doc and usages of preservesPartitioning
The name `preservesPartitioning` is ambiguous: 1) preserves the indices of partitions, 2) preserves the partitioner. The latter is correct and `preservesPartitioning` should really be called `preservesPartitioner` to avoid confusion. Unfortunately, this is already part of the API and we cannot change. We should be clear in the doc and fix wrong usages.
This PR
1. adds notes in `maPartitions*`,
2. makes `RDD.sample` preserve partitioner,
3. changes `preservesPartitioning` to false in `RDD.zip` because the keys of the first RDD are no longer the keys of the zipped RDD,
4. fixes some wrong usages in MLlib.
Author: Xiangrui Meng <meng@databricks.com>
Closes #1526 from mengxr/preserve-partitioner and squashes the following commits:
b361e65 [Xiangrui Meng] update doc based on pwendell's comments
3b1ba19 [Xiangrui Meng] update doc
357575c [Xiangrui Meng] fix unit test
20b4816 [Xiangrui Meng] Merge branch 'master' into preserve-partitioner
d1caa65 [Xiangrui Meng] add doc to explain preservesPartitioning fix wrong usage of preservesPartitioning make sample preserse partitioning
Diffstat (limited to 'mllib')
4 files changed, 9 insertions, 9 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala index 079743742d..1af40de2c7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala @@ -103,11 +103,11 @@ class BinaryClassificationMetrics(scoreAndLabels: RDD[(Double, Double)]) extends mergeValue = (c: BinaryLabelCounter, label: Double) => c += label, mergeCombiners = (c1: BinaryLabelCounter, c2: BinaryLabelCounter) => c1 += c2 ).sortByKey(ascending = false) - val agg = counts.values.mapPartitions({ iter => + val agg = counts.values.mapPartitions { iter => val agg = new BinaryLabelCounter() iter.foreach(agg += _) Iterator(agg) - }, preservesPartitioning = true).collect() + }.collect() val partitionwiseCumulativeCounts = agg.scanLeft(new BinaryLabelCounter())( (agg: BinaryLabelCounter, c: BinaryLabelCounter) => agg.clone() += c) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index f4c403bc78..8c2b044ea7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -377,9 +377,9 @@ class RowMatrix( s"Only support dense matrix at this time but found ${B.getClass.getName}.") val Bb = rows.context.broadcast(B.toBreeze.asInstanceOf[BDM[Double]].toDenseVector.toArray) - val AB = rows.mapPartitions({ iter => + val AB = rows.mapPartitions { iter => val Bi = Bb.value - iter.map(row => { + iter.map { row => val v = BDV.zeros[Double](k) var i = 0 while (i < k) { @@ -387,8 +387,8 @@ class RowMatrix( i += 1 } Vectors.fromBreeze(v) - }) - }, preservesPartitioning = true) + } + } new RowMatrix(AB, nRows, B.numCols) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index 15e8855db6..5356790cb5 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -430,7 +430,7 @@ class ALS private ( val inLinkBlock = makeInLinkBlock(numProductBlocks, ratings, productPartitioner) val outLinkBlock = makeOutLinkBlock(numProductBlocks, ratings, productPartitioner) Iterator.single((blockId, (inLinkBlock, outLinkBlock))) - }, true) + }, preservesPartitioning = true) val inLinks = links.mapValues(_._1) val outLinks = links.mapValues(_._2) inLinks.persist(StorageLevel.MEMORY_AND_DISK) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala index aaf92a1a88..30de24ad89 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala @@ -264,8 +264,8 @@ object MLUtils { (1 to numFolds).map { fold => val sampler = new BernoulliSampler[T]((fold - 1) / numFoldsF, fold / numFoldsF, complement = false) - val validation = new PartitionwiseSampledRDD(rdd, sampler, seed) - val training = new PartitionwiseSampledRDD(rdd, sampler.cloneComplement(), seed) + val validation = new PartitionwiseSampledRDD(rdd, sampler, true, seed) + val training = new PartitionwiseSampledRDD(rdd, sampler.cloneComplement(), true, seed) (training, validation) }.toArray } |