aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorXiangrui Meng <meng@databricks.com>2015-08-12 23:04:59 -0700
committerXiangrui Meng <meng@databricks.com>2015-08-12 23:04:59 -0700
commit68f99571492f67596b3656e9f076deeb96616f4a (patch)
treebe2248de6c66c50ce13561c67c18af7e3ae86f2e
parentd0b18919d16e6a2f19159516bd2767b60b595279 (diff)
downloadspark-68f99571492f67596b3656e9f076deeb96616f4a.tar.gz
spark-68f99571492f67596b3656e9f076deeb96616f4a.tar.bz2
spark-68f99571492f67596b3656e9f076deeb96616f4a.zip
[SPARK-9918] [MLLIB] remove runs from k-means and rename epsilon to tol
This requires some discussion. I'm not sure whether `runs` is a useful parameter. It certainly complicates the implementation. We might want to optimize the k-means implementation with block matrix operations. In this case, having `runs` may not be worth the trade-off. Also it increases the communication cost in a single job, which might cause other issues. This PR also renames `epsilon` to `tol` to have consistent naming among algorithms. The Python constructor is updated to include all parameters. jkbradley yu-iskw Author: Xiangrui Meng <meng@databricks.com> Closes #8148 from mengxr/SPARK-9918 and squashes the following commits: 149b9e5 [Xiangrui Meng] fix constructor in Python and rename epsilon to tol 3cc15b3 [Xiangrui Meng] fix test and change initStep to initSteps in python a0a0274 [Xiangrui Meng] remove runs from k-means in the pipeline API
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala51
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala12
-rw-r--r--python/pyspark/ml/clustering.py63
3 files changed, 26 insertions, 100 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
index dc192add6c..47a18cdb31 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
@@ -18,8 +18,8 @@
package org.apache.spark.ml.clustering
import org.apache.spark.annotation.Experimental
-import org.apache.spark.ml.param.{Param, Params, IntParam, DoubleParam, ParamMap}
-import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasMaxIter, HasPredictionCol, HasSeed}
+import org.apache.spark.ml.param.{Param, Params, IntParam, ParamMap}
+import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util.{Identifiable, SchemaUtils}
import org.apache.spark.ml.{Estimator, Model}
import org.apache.spark.mllib.clustering.{KMeans => MLlibKMeans, KMeansModel => MLlibKMeansModel}
@@ -27,14 +27,13 @@ import org.apache.spark.mllib.linalg.{Vector, VectorUDT}
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.types.{IntegerType, StructType}
import org.apache.spark.sql.{DataFrame, Row}
-import org.apache.spark.util.Utils
/**
* Common params for KMeans and KMeansModel
*/
-private[clustering] trait KMeansParams
- extends Params with HasMaxIter with HasFeaturesCol with HasSeed with HasPredictionCol {
+private[clustering] trait KMeansParams extends Params with HasMaxIter with HasFeaturesCol
+ with HasSeed with HasPredictionCol with HasTol {
/**
* Set the number of clusters to create (k). Must be > 1. Default: 2.
@@ -46,31 +45,6 @@ private[clustering] trait KMeansParams
def getK: Int = $(k)
/**
- * Param the number of runs of the algorithm to execute in parallel. We initialize the algorithm
- * this many times with random starting conditions (configured by the initialization mode), then
- * return the best clustering found over any run. Must be >= 1. Default: 1.
- * @group param
- */
- final val runs = new IntParam(this, "runs",
- "number of runs of the algorithm to execute in parallel", (value: Int) => value >= 1)
-
- /** @group getParam */
- def getRuns: Int = $(runs)
-
- /**
- * Param the distance threshold within which we've consider centers to have converged.
- * If all centers move less than this Euclidean distance, we stop iterating one run.
- * Must be >= 0.0. Default: 1e-4
- * @group param
- */
- final val epsilon = new DoubleParam(this, "epsilon",
- "distance threshold within which we've consider centers to have converge",
- (value: Double) => value >= 0.0)
-
- /** @group getParam */
- def getEpsilon: Double = $(epsilon)
-
- /**
* Param for the initialization algorithm. This can be either "random" to choose random points as
* initial cluster centers, or "k-means||" to use a parallel variant of k-means++
* (Bahmani et al., Scalable K-Means++, VLDB 2012). Default: k-means||.
@@ -136,9 +110,9 @@ class KMeansModel private[ml] (
/**
* :: Experimental ::
- * K-means clustering with support for multiple parallel runs and a k-means++ like initialization
- * mode (the k-means|| algorithm by Bahmani et al). When multiple concurrent runs are requested,
- * they are executed together with joint passes over the data for efficiency.
+ * K-means clustering with support for k-means|| initialization proposed by Bahmani et al.
+ *
+ * @see [[http://dx.doi.org/10.14778/2180912.2180915 Bahmani et al., Scalable k-means++.]]
*/
@Experimental
class KMeans(override val uid: String) extends Estimator[KMeansModel] with KMeansParams {
@@ -146,10 +120,9 @@ class KMeans(override val uid: String) extends Estimator[KMeansModel] with KMean
setDefault(
k -> 2,
maxIter -> 20,
- runs -> 1,
initMode -> MLlibKMeans.K_MEANS_PARALLEL,
initSteps -> 5,
- epsilon -> 1e-4)
+ tol -> 1e-4)
override def copy(extra: ParamMap): KMeans = defaultCopy(extra)
@@ -174,10 +147,7 @@ class KMeans(override val uid: String) extends Estimator[KMeansModel] with KMean
def setMaxIter(value: Int): this.type = set(maxIter, value)
/** @group setParam */
- def setRuns(value: Int): this.type = set(runs, value)
-
- /** @group setParam */
- def setEpsilon(value: Double): this.type = set(epsilon, value)
+ def setTol(value: Double): this.type = set(tol, value)
/** @group setParam */
def setSeed(value: Long): this.type = set(seed, value)
@@ -191,8 +161,7 @@ class KMeans(override val uid: String) extends Estimator[KMeansModel] with KMean
.setInitializationSteps($(initSteps))
.setMaxIterations($(maxIter))
.setSeed($(seed))
- .setEpsilon($(epsilon))
- .setRuns($(runs))
+ .setEpsilon($(tol))
val parentModel = algo.run(rdd)
val model = new KMeansModel(uid, parentModel)
copyValues(model)
diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala
index 1f15ac02f4..688b0e31f9 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala
@@ -52,10 +52,9 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext {
assert(kmeans.getFeaturesCol === "features")
assert(kmeans.getPredictionCol === "prediction")
assert(kmeans.getMaxIter === 20)
- assert(kmeans.getRuns === 1)
assert(kmeans.getInitMode === MLlibKMeans.K_MEANS_PARALLEL)
assert(kmeans.getInitSteps === 5)
- assert(kmeans.getEpsilon === 1e-4)
+ assert(kmeans.getTol === 1e-4)
}
test("set parameters") {
@@ -64,21 +63,19 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext {
.setFeaturesCol("test_feature")
.setPredictionCol("test_prediction")
.setMaxIter(33)
- .setRuns(7)
.setInitMode(MLlibKMeans.RANDOM)
.setInitSteps(3)
.setSeed(123)
- .setEpsilon(1e-3)
+ .setTol(1e-3)
assert(kmeans.getK === 9)
assert(kmeans.getFeaturesCol === "test_feature")
assert(kmeans.getPredictionCol === "test_prediction")
assert(kmeans.getMaxIter === 33)
- assert(kmeans.getRuns === 7)
assert(kmeans.getInitMode === MLlibKMeans.RANDOM)
assert(kmeans.getInitSteps === 3)
assert(kmeans.getSeed === 123)
- assert(kmeans.getEpsilon === 1e-3)
+ assert(kmeans.getTol === 1e-3)
}
test("parameters validation") {
@@ -91,9 +88,6 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext {
intercept[IllegalArgumentException] {
new KMeans().setInitSteps(0)
}
- intercept[IllegalArgumentException] {
- new KMeans().setRuns(0)
- }
}
test("fit & transform") {
diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py
index 48338713a2..cb4c16e25a 100644
--- a/python/pyspark/ml/clustering.py
+++ b/python/pyspark/ml/clustering.py
@@ -19,7 +19,6 @@ from pyspark.ml.util import keyword_only
from pyspark.ml.wrapper import JavaEstimator, JavaModel
from pyspark.ml.param.shared import *
from pyspark.mllib.common import inherit_doc
-from pyspark.mllib.linalg import _convert_to_vector
__all__ = ['KMeans', 'KMeansModel']
@@ -35,7 +34,7 @@ class KMeansModel(JavaModel):
@inherit_doc
-class KMeans(JavaEstimator, HasFeaturesCol, HasMaxIter, HasSeed):
+class KMeans(JavaEstimator, HasFeaturesCol, HasPredictionCol, HasMaxIter, HasTol, HasSeed):
"""
K-means clustering with support for multiple parallel runs and a k-means++ like initialization
mode (the k-means|| algorithm by Bahmani et al). When multiple concurrent runs are requested,
@@ -45,7 +44,7 @@ class KMeans(JavaEstimator, HasFeaturesCol, HasMaxIter, HasSeed):
>>> data = [(Vectors.dense([0.0, 0.0]),), (Vectors.dense([1.0, 1.0]),),
... (Vectors.dense([9.0, 8.0]),), (Vectors.dense([8.0, 9.0]),)]
>>> df = sqlContext.createDataFrame(data, ["features"])
- >>> kmeans = KMeans().setK(2).setSeed(1).setFeaturesCol("features")
+ >>> kmeans = KMeans(k=2, seed=1)
>>> model = kmeans.fit(df)
>>> centers = model.clusterCenters()
>>> len(centers)
@@ -60,10 +59,6 @@ class KMeans(JavaEstimator, HasFeaturesCol, HasMaxIter, HasSeed):
# a placeholder to make it appear in the generated doc
k = Param(Params._dummy(), "k", "number of clusters to create")
- epsilon = Param(Params._dummy(), "epsilon",
- "distance threshold within which " +
- "we've consider centers to have converged")
- runs = Param(Params._dummy(), "runs", "number of runs of the algorithm to execute in parallel")
initMode = Param(Params._dummy(), "initMode",
"the initialization algorithm. This can be either \"random\" to " +
"choose random points as initial cluster centers, or \"k-means||\" " +
@@ -71,21 +66,21 @@ class KMeans(JavaEstimator, HasFeaturesCol, HasMaxIter, HasSeed):
initSteps = Param(Params._dummy(), "initSteps", "steps for k-means initialization mode")
@keyword_only
- def __init__(self, k=2, maxIter=20, runs=1, epsilon=1e-4, initMode="k-means||", initStep=5):
+ def __init__(self, featuresCol="features", predictionCol="prediction", k=2,
+ initMode="k-means||", initSteps=5, tol=1e-4, maxIter=20, seed=None):
+ """
+ __init__(self, featuresCol="features", predictionCol="prediction", k=2, \
+ initMode="k-means||", initSteps=5, tol=1e-4, maxIter=20, seed=None)
+ """
super(KMeans, self).__init__()
self._java_obj = self._new_java_obj("org.apache.spark.ml.clustering.KMeans", self.uid)
self.k = Param(self, "k", "number of clusters to create")
- self.epsilon = Param(self, "epsilon",
- "distance threshold within which " +
- "we've consider centers to have converged")
- self.runs = Param(self, "runs", "number of runs of the algorithm to execute in parallel")
- self.seed = Param(self, "seed", "random seed")
self.initMode = Param(self, "initMode",
"the initialization algorithm. This can be either \"random\" to " +
"choose random points as initial cluster centers, or \"k-means||\" " +
"to use a parallel variant of k-means++")
self.initSteps = Param(self, "initSteps", "steps for k-means initialization mode")
- self._setDefault(k=2, maxIter=20, runs=1, epsilon=1e-4, initMode="k-means||", initSteps=5)
+ self._setDefault(k=2, initMode="k-means||", initSteps=5, tol=1e-4, maxIter=20)
kwargs = self.__init__._input_kwargs
self.setParams(**kwargs)
@@ -93,9 +88,11 @@ class KMeans(JavaEstimator, HasFeaturesCol, HasMaxIter, HasSeed):
return KMeansModel(java_model)
@keyword_only
- def setParams(self, k=2, maxIter=20, runs=1, epsilon=1e-4, initMode="k-means||", initSteps=5):
+ def setParams(self, featuresCol="features", predictionCol="prediction", k=2,
+ initMode="k-means||", initSteps=5, tol=1e-4, maxIter=20, seed=None):
"""
- setParams(self, k=2, maxIter=20, runs=1, epsilon=1e-4, initMode="k-means||", initSteps=5):
+ setParams(self, featuresCol="features", predictionCol="prediction", k=2, \
+ initMode="k-means||", initSteps=5, tol=1e-4, maxIter=20, seed=None)
Sets params for KMeans.
"""
@@ -119,40 +116,6 @@ class KMeans(JavaEstimator, HasFeaturesCol, HasMaxIter, HasSeed):
"""
return self.getOrDefault(self.k)
- def setEpsilon(self, value):
- """
- Sets the value of :py:attr:`epsilon`.
-
- >>> algo = KMeans().setEpsilon(1e-5)
- >>> abs(algo.getEpsilon() - 1e-5) < 1e-5
- True
- """
- self._paramMap[self.epsilon] = value
- return self
-
- def getEpsilon(self):
- """
- Gets the value of `epsilon`
- """
- return self.getOrDefault(self.epsilon)
-
- def setRuns(self, value):
- """
- Sets the value of :py:attr:`runs`.
-
- >>> algo = KMeans().setRuns(10)
- >>> algo.getRuns()
- 10
- """
- self._paramMap[self.runs] = value
- return self
-
- def getRuns(self):
- """
- Gets the value of `runs`
- """
- return self.getOrDefault(self.runs)
-
def setInitMode(self, value):
"""
Sets the value of :py:attr:`initMode`.