aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorXin Ren <iamshrek@126.com>2016-04-21 16:29:39 -0700
committerJoseph K. Bradley <joseph@databricks.com>2016-04-21 16:29:39 -0700
commit6d1e4c4a65541cbf78284005de1776dc49efa9f4 (patch)
tree0e49bc338e994ffc0eb6a0f8077d3e4232133def
parent411454475a031869eb7dc0c5fd84f41b3fdfa295 (diff)
downloadspark-6d1e4c4a65541cbf78284005de1776dc49efa9f4.tar.gz
spark-6d1e4c4a65541cbf78284005de1776dc49efa9f4.tar.bz2
spark-6d1e4c4a65541cbf78284005de1776dc49efa9f4.zip
[SPARK-14569][ML] Log instrumentation in KMeans
## What changes were proposed in this pull request? https://issues.apache.org/jira/browse/SPARK-14569 Log instrumentation in KMeans: - featuresCol - predictionCol - k - initMode - initSteps - maxIter - seed - tol - summary ## How was this patch tested? Manually test on local machine, by running and checking output of org.apache.spark.examples.ml.KMeansExample Author: Xin Ren <iamshrek@126.com> Closes #12432 from keypointt/SPARK-14569.
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala9
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala4
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala16
3 files changed, 23 insertions, 6 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
index b324196842..bf2ab98673 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
@@ -264,6 +264,9 @@ class KMeans @Since("1.5.0") (
override def fit(dataset: Dataset[_]): KMeansModel = {
val rdd = dataset.select(col($(featuresCol))).rdd.map { case Row(point: Vector) => point }
+ val instr = Instrumentation.create(this, rdd)
+ instr.logParams(featuresCol, predictionCol, k, initMode, initSteps, maxIter, seed, tol)
+
val algo = new MLlibKMeans()
.setK($(k))
.setInitializationMode($(initMode))
@@ -271,11 +274,13 @@ class KMeans @Since("1.5.0") (
.setMaxIterations($(maxIter))
.setSeed($(seed))
.setEpsilon($(tol))
- val parentModel = algo.run(rdd)
+ val parentModel = algo.run(rdd, Option(instr))
val model = copyValues(new KMeansModel(uid, parentModel).setParent(this))
val summary = new KMeansSummary(
model.transform(dataset), $(predictionCol), $(featuresCol), $(k))
- model.setSummary(summary)
+ val m = model.setSummary(summary)
+ instr.logSuccess(m)
+ m
}
@Since("1.5.0")
diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala b/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala
index 7e57cefc44..869104e090 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql.Dataset
* @param dataset the training dataset
* @tparam E the type of the estimator
*/
-private[ml] class Instrumentation[E <: Estimator[_]] private (
+private[spark] class Instrumentation[E <: Estimator[_]] private (
estimator: E, dataset: RDD[_]) extends Logging {
private val id = Instrumentation.counter.incrementAndGet()
@@ -95,7 +95,7 @@ private[ml] class Instrumentation[E <: Estimator[_]] private (
/**
* Some common methods for logging information about a training session.
*/
-private[ml] object Instrumentation {
+private[spark] object Instrumentation {
private val counter = new AtomicLong(0)
/**
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
index 8ff0b83e8b..ff77090990 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
@@ -21,6 +21,8 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.annotation.Since
import org.apache.spark.internal.Logging
+import org.apache.spark.ml.clustering.{KMeans => NewKMeans}
+import org.apache.spark.ml.util.Instrumentation
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.linalg.BLAS.{axpy, scal}
import org.apache.spark.mllib.util.MLUtils
@@ -212,6 +214,12 @@ class KMeans private (
*/
@Since("0.8.0")
def run(data: RDD[Vector]): KMeansModel = {
+ run(data, None)
+ }
+
+ private[spark] def run(
+ data: RDD[Vector],
+ instr: Option[Instrumentation[NewKMeans]]): KMeansModel = {
if (data.getStorageLevel == StorageLevel.NONE) {
logWarning("The input data is not directly cached, which may hurt performance if its"
@@ -224,7 +232,7 @@ class KMeans private (
val zippedData = data.zip(norms).map { case (v, norm) =>
new VectorWithNorm(v, norm)
}
- val model = runAlgorithm(zippedData)
+ val model = runAlgorithm(zippedData, instr)
norms.unpersist()
// Warn at the end of the run as well, for increased visibility.
@@ -238,7 +246,9 @@ class KMeans private (
/**
* Implementation of K-Means algorithm.
*/
- private def runAlgorithm(data: RDD[VectorWithNorm]): KMeansModel = {
+ private def runAlgorithm(
+ data: RDD[VectorWithNorm],
+ instr: Option[Instrumentation[NewKMeans]]): KMeansModel = {
val sc = data.sparkContext
@@ -274,6 +284,8 @@ class KMeans private (
val iterationStartTime = System.nanoTime()
+ instr.map(_.logNumFeatures(centers(0)(0).vector.size))
+
// Execute iterations of Lloyd's algorithm until all runs have converged
while (iteration < maxIterations && !activeRuns.isEmpty) {
type WeightedPoint = (Vector, Long)