aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorXiangrui Meng <meng@databricks.com>2015-01-28 17:26:03 -0800
committerXiangrui Meng <meng@databricks.com>2015-01-28 17:26:03 -0800
commit4ee79c71afc5175ba42b5e3d4088fe23db3e45d1 (patch)
treeaf05f349a568617cbd75a5db34c4ae6fd90a00de /mllib
parente80dc1c5a80cddba8b367cf5cdf9f71df5d87250 (diff)
downloadspark-4ee79c71afc5175ba42b5e3d4088fe23db3e45d1.tar.gz
spark-4ee79c71afc5175ba42b5e3d4088fe23db3e45d1.tar.bz2
spark-4ee79c71afc5175ba42b5e3d4088fe23db3e45d1.zip
[SPARK-5430] move treeReduce and treeAggregate from mllib to core
We have seen many use cases of `treeAggregate`/`treeReduce` outside the ML domain. Maybe it is time to move them to Core. pwendell Author: Xiangrui Meng <meng@databricks.com> Closes #4228 from mengxr/SPARK-5430 and squashes the following commits: 20ad40d [Xiangrui Meng] exclude tree* from mima e89a43e [Xiangrui Meng] fix compile and update java doc 3ae1a4b [Xiangrui Meng] add treeReduce/treeAggregate to Python 6f948c5 [Xiangrui Meng] add treeReduce/treeAggregate to JavaRDDLike d600b6c [Xiangrui Meng] move treeReduce and treeAggregate to core
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/feature/IDF.scala1
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala1
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala1
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala1
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala1
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala59
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/feature/StandardScalerSuite.scala1
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala18
8 files changed, 9 insertions, 74 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/IDF.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/IDF.scala
index 3260f27513..a89eea0e21 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/IDF.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/IDF.scala
@@ -22,7 +22,6 @@ import breeze.linalg.{DenseVector => BDV}
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors}
-import org.apache.spark.mllib.rdd.RDDFunctions._
import org.apache.spark.rdd.RDD
/**
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala
index 3c2091732f..2f2c6f94e9 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala
@@ -20,7 +20,6 @@ package org.apache.spark.mllib.feature
import org.apache.spark.Logging
import org.apache.spark.annotation.Experimental
import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors}
-import org.apache.spark.mllib.rdd.RDDFunctions._
import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
import org.apache.spark.rdd.RDD
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
index 02075edbab..ddca30c3c0 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
@@ -30,7 +30,6 @@ import org.apache.spark.Logging
import org.apache.spark.SparkContext._
import org.apache.spark.annotation.Experimental
import org.apache.spark.mllib.linalg._
-import org.apache.spark.mllib.rdd.RDDFunctions._
import org.apache.spark.mllib.stat.{MultivariateOnlineSummarizer, MultivariateStatisticalSummary}
import org.apache.spark.rdd.RDD
import org.apache.spark.util.random.XORShiftRandom
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
index 0857877951..4b7d0589c9 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
@@ -25,7 +25,6 @@ import org.apache.spark.annotation.{Experimental, DeveloperApi}
import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.linalg.{Vectors, Vector}
-import org.apache.spark.mllib.rdd.RDDFunctions._
/**
* Class used to solve an optimization problem using Gradient Descent.
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala
index d16d0daf08..d5e4f4ccbf 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala
@@ -26,7 +26,6 @@ import org.apache.spark.Logging
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.linalg.BLAS.axpy
-import org.apache.spark.mllib.rdd.RDDFunctions._
import org.apache.spark.rdd.RDD
/**
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala
index 57c0768084..78172843be 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala
@@ -21,10 +21,7 @@ import scala.language.implicitConversions
import scala.reflect.ClassTag
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.HashPartitioner
-import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
-import org.apache.spark.util.Utils
/**
* Machine learning specific RDD functions.
@@ -53,63 +50,25 @@ class RDDFunctions[T: ClassTag](self: RDD[T]) extends Serializable {
* Reduces the elements of this RDD in a multi-level tree pattern.
*
* @param depth suggested depth of the tree (default: 2)
- * @see [[org.apache.spark.rdd.RDD#reduce]]
+ * @see [[org.apache.spark.rdd.RDD#treeReduce]]
+ * @deprecated Use [[org.apache.spark.rdd.RDD#treeReduce]] instead.
*/
- def treeReduce(f: (T, T) => T, depth: Int = 2): T = {
- require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.")
- val cleanF = self.context.clean(f)
- val reducePartition: Iterator[T] => Option[T] = iter => {
- if (iter.hasNext) {
- Some(iter.reduceLeft(cleanF))
- } else {
- None
- }
- }
- val partiallyReduced = self.mapPartitions(it => Iterator(reducePartition(it)))
- val op: (Option[T], Option[T]) => Option[T] = (c, x) => {
- if (c.isDefined && x.isDefined) {
- Some(cleanF(c.get, x.get))
- } else if (c.isDefined) {
- c
- } else if (x.isDefined) {
- x
- } else {
- None
- }
- }
- RDDFunctions.fromRDD(partiallyReduced).treeAggregate(Option.empty[T])(op, op, depth)
- .getOrElse(throw new UnsupportedOperationException("empty collection"))
- }
+ @deprecated("Use RDD.treeReduce instead.", "1.3.0")
+ def treeReduce(f: (T, T) => T, depth: Int = 2): T = self.treeReduce(f, depth)
/**
* Aggregates the elements of this RDD in a multi-level tree pattern.
*
* @param depth suggested depth of the tree (default: 2)
- * @see [[org.apache.spark.rdd.RDD#aggregate]]
+ * @see [[org.apache.spark.rdd.RDD#treeAggregate]]
+ * @deprecated Use [[org.apache.spark.rdd.RDD#treeAggregate]] instead.
*/
+ @deprecated("Use RDD.treeAggregate instead.", "1.3.0")
def treeAggregate[U: ClassTag](zeroValue: U)(
seqOp: (U, T) => U,
combOp: (U, U) => U,
depth: Int = 2): U = {
- require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.")
- if (self.partitions.size == 0) {
- return Utils.clone(zeroValue, self.context.env.closureSerializer.newInstance())
- }
- val cleanSeqOp = self.context.clean(seqOp)
- val cleanCombOp = self.context.clean(combOp)
- val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
- var partiallyAggregated = self.mapPartitions(it => Iterator(aggregatePartition(it)))
- var numPartitions = partiallyAggregated.partitions.size
- val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2)
- // If creating an extra level doesn't help reduce the wall-clock time, we stop tree aggregation.
- while (numPartitions > scale + numPartitions / scale) {
- numPartitions /= scale
- val curNumPartitions = numPartitions
- partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex { (i, iter) =>
- iter.map((i % curNumPartitions, _))
- }.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values
- }
- partiallyAggregated.reduce(cleanCombOp)
+ self.treeAggregate(zeroValue)(seqOp, combOp, depth)
}
}
@@ -117,5 +76,5 @@ class RDDFunctions[T: ClassTag](self: RDD[T]) extends Serializable {
object RDDFunctions {
/** Implicit conversion from an RDD to RDDFunctions. */
- implicit def fromRDD[T: ClassTag](rdd: RDD[T]) = new RDDFunctions[T](rdd)
+ implicit def fromRDD[T: ClassTag](rdd: RDD[T]): RDDFunctions[T] = new RDDFunctions[T](rdd)
}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/StandardScalerSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/StandardScalerSuite.scala
index 4c93c0ca4f..e9e510b6f5 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/feature/StandardScalerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/StandardScalerSuite.scala
@@ -22,7 +22,6 @@ import org.scalatest.FunSuite
import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors}
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.mllib.util.TestingUtils._
-import org.apache.spark.mllib.rdd.RDDFunctions._
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, MultivariateOnlineSummarizer}
import org.apache.spark.rdd.RDD
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala
index 681ce92639..6d6c0aa5be 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala
@@ -46,22 +46,4 @@ class RDDFunctionsSuite extends FunSuite with MLlibTestSparkContext {
val expected = data.flatMap(x => x).sliding(3).toSeq.map(_.toSeq)
assert(sliding === expected)
}
-
- test("treeAggregate") {
- val rdd = sc.makeRDD(-1000 until 1000, 10)
- def seqOp = (c: Long, x: Int) => c + x
- def combOp = (c1: Long, c2: Long) => c1 + c2
- for (depth <- 1 until 10) {
- val sum = rdd.treeAggregate(0L)(seqOp, combOp, depth)
- assert(sum === -1000L)
- }
- }
-
- test("treeReduce") {
- val rdd = sc.makeRDD(-1000 until 1000, 10)
- for (depth <- 1 until 10) {
- val sum = rdd.treeReduce(_ + _, depth)
- assert(sum === -1000)
- }
- }
}