aboutsummaryrefslogtreecommitdiff
path: root/mllib/src/main/scala
diff options
context:
space:
mode:
authorDB Tsai <dbt@netflix.com>2016-05-17 12:51:07 -0700
committerXiangrui Meng <meng@databricks.com>2016-05-17 12:51:07 -0700
commite2efe0529acd748f26dbaa41331d1733ed256237 (patch)
treefe1a5aeeadfbf220b5dbe1429e0235153db8117b /mllib/src/main/scala
parent9f176dd3918129a72282a6b7a12e2899cbb6dac9 (diff)
downloadspark-e2efe0529acd748f26dbaa41331d1733ed256237.tar.gz
spark-e2efe0529acd748f26dbaa41331d1733ed256237.tar.bz2
spark-e2efe0529acd748f26dbaa41331d1733ed256237.zip
[SPARK-14615][ML] Use the new ML Vector and Matrix in the ML pipeline based algorithms
## What changes were proposed in this pull request? Once SPARK-14487 and SPARK-14549 are merged, we will migrate to use the new vector and matrix type in the new ml pipeline based apis. ## How was this patch tested? Unit tests Author: DB Tsai <dbt@netflix.com> Author: Liang-Chi Hsieh <simonh@tw.ibm.com> Author: Xiangrui Meng <meng@databricks.com> Closes #12627 from dbtsai/SPARK-14615-NewML.
Diffstat (limited to 'mllib/src/main/scala')
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/Predictor.scala4
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/ann/Layer.scala22
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/attribute/AttributeGroup.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala4
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala4
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala4
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala5
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala4
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala9
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala4
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala16
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala9
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala18
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala18
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/ChiSqSelector.scala21
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/DCT.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/ElementwiseProduct.scala6
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/HashingTF.scala3
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala15
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/Interaction.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/LabeledPoint.scala38
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala8
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala9
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/Normalizer.scala5
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala21
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/PolynomialExpansion.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala15
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/VectorSlicer.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala3
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala3
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/param/params.scala7
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala3
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala4
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/regression/GBTRegressor.scala4
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala6
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala4
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala4
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/tree/Split.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/tree/impl/DecisionTreeMetadata.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/tree/impl/GradientBoostedTrees.scala4
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/tree/impl/TreePoint.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/tree/treeModels.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/util/MetadataUtils.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala195
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala4
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala22
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala22
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala9
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/tree/GradientBoostedTrees.scala12
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/tree/RandomForest.scala4
67 files changed, 494 insertions, 132 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala b/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala
index 81140d1f7b..569a5fb993 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala
@@ -18,11 +18,11 @@
package org.apache.spark.ml
import org.apache.spark.annotation.{DeveloperApi, Since}
+import org.apache.spark.ml.feature.LabeledPoint
+import org.apache.spark.ml.linalg.{Vector, VectorUDT}
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util.SchemaUtils
-import org.apache.spark.mllib.linalg.{Vector, VectorUDT}
-import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.functions._
diff --git a/mllib/src/main/scala/org/apache/spark/ml/ann/Layer.scala b/mllib/src/main/scala/org/apache/spark/ml/ann/Layer.scala
index a27ee51874..0a569c4917 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/ann/Layer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/ann/Layer.scala
@@ -21,7 +21,9 @@ import java.util.Random
import breeze.linalg.{*, axpy => Baxpy, DenseMatrix => BDM, DenseVector => BDV, Vector => BV}
-import org.apache.spark.mllib.linalg.{Vector, Vectors}
+import org.apache.spark.ml.linalg.{Vector, Vectors}
+import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors}
+import org.apache.spark.mllib.linalg.VectorImplicits._
import org.apache.spark.mllib.optimization._
import org.apache.spark.rdd.RDD
import org.apache.spark.util.random.XORShiftRandom
@@ -580,10 +582,10 @@ private[ann] object FeedForwardModel {
*/
private[ann] class ANNGradient(topology: Topology, dataStacker: DataStacker) extends Gradient {
override def compute(
- data: Vector,
+ data: OldVector,
label: Double,
- weights: Vector,
- cumGradient: Vector): Double = {
+ weights: OldVector,
+ cumGradient: OldVector): Double = {
val (input, target, realBatchSize) = dataStacker.unstack(data)
val model = topology.model(weights)
model.computeGradient(input, target, cumGradient, realBatchSize)
@@ -657,15 +659,15 @@ private[ann] class DataStacker(stackSize: Int, inputSize: Int, outputSize: Int)
private[ann] class ANNUpdater extends Updater {
override def compute(
- weightsOld: Vector,
- gradient: Vector,
+ weightsOld: OldVector,
+ gradient: OldVector,
stepSize: Double,
iter: Int,
- regParam: Double): (Vector, Double) = {
+ regParam: Double): (OldVector, Double) = {
val thisIterStepSize = stepSize
val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector
Baxpy(-thisIterStepSize, gradient.toBreeze, brzWeights)
- (Vectors.fromBreeze(brzWeights), 0)
+ (OldVectors.fromBreeze(brzWeights), 0)
}
}
@@ -808,7 +810,9 @@ private[ml] class FeedForwardTrainer(
getWeights
}
// TODO: deprecate standard optimizer because it needs Vector
- val newWeights = optimizer.optimize(dataStacker.stack(data), w)
+ val newWeights = optimizer.optimize(dataStacker.stack(data).map { v =>
+ (v._1, OldVectors.fromML(v._2))
+ }, w)
topology.model(newWeights)
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/attribute/AttributeGroup.scala b/mllib/src/main/scala/org/apache/spark/ml/attribute/AttributeGroup.scala
index 2c29eeb01a..12b9732a4c 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/attribute/AttributeGroup.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/attribute/AttributeGroup.scala
@@ -20,7 +20,7 @@ package org.apache.spark.ml.attribute
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.mllib.linalg.VectorUDT
+import org.apache.spark.ml.linalg.VectorUDT
import org.apache.spark.sql.types.{Metadata, MetadataBuilder, StructField}
/**
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala
index bc5fe35ad4..e35b04a1cf 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala
@@ -20,10 +20,10 @@ package org.apache.spark.ml.classification
import org.apache.spark.SparkException
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.ml.{PredictionModel, Predictor, PredictorParams}
+import org.apache.spark.ml.feature.LabeledPoint
+import org.apache.spark.ml.linalg.{Vector, VectorUDT}
import org.apache.spark.ml.param.shared.HasRawPredictionCol
import org.apache.spark.ml.util.{MetadataUtils, SchemaUtils}
-import org.apache.spark.mllib.linalg.{Vector, VectorUDT}
-import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.functions._
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala
index 31a69d49a0..881dcefb79 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala
@@ -22,13 +22,13 @@ import org.json4s.{DefaultFormats, JObject}
import org.json4s.JsonDSL._
import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.ml.feature.LabeledPoint
+import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.tree._
import org.apache.spark.ml.tree.DecisionTreeModelReadWrite._
import org.apache.spark.ml.tree.impl.RandomForest
import org.apache.spark.ml.util._
-import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors}
-import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo, Strategy => OldStrategy}
import org.apache.spark.mllib.tree.model.{DecisionTreeModel => OldDecisionTreeModel}
import org.apache.spark.rdd.RDD
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala
index acc04582b8..f843df449c 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala
@@ -24,14 +24,14 @@ import org.json4s.JsonDSL._
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.internal.Logging
import org.apache.spark.ml.{PredictionModel, Predictor}
+import org.apache.spark.ml.feature.LabeledPoint
+import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.regression.DecisionTreeRegressionModel
import org.apache.spark.ml.tree._
import org.apache.spark.ml.tree.impl.GradientBoostedTrees
import org.apache.spark.ml.util._
import org.apache.spark.ml.util.DefaultParamsReader.Metadata
-import org.apache.spark.mllib.linalg.Vector
-import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo}
import org.apache.spark.mllib.tree.model.{GradientBoostedTreesModel => OldGBTModel}
import org.apache.spark.rdd.RDD
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
index ffd03e55b5..ac2cd8726b 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
@@ -27,12 +27,13 @@ import org.apache.spark.SparkException
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.internal.Logging
import org.apache.spark.ml.feature.Instance
+import org.apache.spark.ml.linalg._
+import org.apache.spark.ml.linalg.BLAS._
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util._
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
-import org.apache.spark.mllib.linalg._
-import org.apache.spark.mllib.linalg.BLAS._
+import org.apache.spark.mllib.linalg.VectorImplicits._
import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala
index 72cf55f6bb..683ae4aaf4 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala
@@ -24,11 +24,11 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.ml.{PredictionModel, Predictor, PredictorParams}
import org.apache.spark.ml.ann.{FeedForwardTopology, FeedForwardTrainer}
+import org.apache.spark.ml.feature.LabeledPoint
+import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared.{HasMaxIter, HasSeed, HasStepSize, HasTol}
import org.apache.spark.ml.util._
-import org.apache.spark.mllib.linalg.{Vector, Vectors}
-import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.sql.Dataset
/** Params for Multilayer Perceptron. */
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala
index 267d63b51e..a98bdeca6b 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala
@@ -22,14 +22,14 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.ml.PredictorParams
+import org.apache.spark.ml.linalg._
import org.apache.spark.ml.param.{DoubleParam, Param, ParamMap, ParamValidators}
import org.apache.spark.ml.util._
import org.apache.spark.mllib.classification.{NaiveBayes => OldNaiveBayes}
import org.apache.spark.mllib.classification.{NaiveBayesModel => OldNaiveBayesModel}
-import org.apache.spark.mllib.linalg._
-import org.apache.spark.mllib.regression.LabeledPoint
+import org.apache.spark.mllib.regression.{LabeledPoint => OldLabeledPoint}
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Dataset}
+import org.apache.spark.sql.Dataset
/**
* Params for Naive Bayes Classifiers.
@@ -102,7 +102,8 @@ class NaiveBayes @Since("1.5.0") (
setDefault(modelType -> OldNaiveBayes.Multinomial)
override protected def train(dataset: Dataset[_]): NaiveBayesModel = {
- val oldDataset: RDD[LabeledPoint] = extractLabeledPoints(dataset)
+ val oldDataset: RDD[OldLabeledPoint] =
+ extractLabeledPoints(dataset).map(OldLabeledPoint.fromML)
val oldModel = OldNaiveBayes.train(oldDataset, $(smoothing), $(modelType))
NaiveBayesModel.fromOld(oldModel, this)
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
index f10c60a78d..047a378b79 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala
@@ -32,9 +32,9 @@ import org.apache.spark.SparkContext
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.ml._
import org.apache.spark.ml.attribute._
+import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.param.{Param, ParamMap, ParamPair, Params}
import org.apache.spark.ml.util._
-import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala
index d00fee12b0..59277d0f42 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala
@@ -18,9 +18,9 @@
package org.apache.spark.ml.classification
import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.ml.linalg.{DenseVector, Vector, Vectors, VectorUDT}
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util.SchemaUtils
-import org.apache.spark.mllib.linalg.{DenseVector, Vector, Vectors, VectorUDT}
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DataType, StructType}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala
index 1d33ae83c2..b3c074f839 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala
@@ -21,13 +21,13 @@ import org.json4s.{DefaultFormats, JObject}
import org.json4s.JsonDSL._
import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.ml.feature.LabeledPoint
+import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.tree._
import org.apache.spark.ml.tree.impl.RandomForest
import org.apache.spark.ml.util._
import org.apache.spark.ml.util.DefaultParamsReader.Metadata
-import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors}
-import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo}
import org.apache.spark.mllib.tree.model.{RandomForestModel => OldRandomForestModel}
import org.apache.spark.rdd.RDD
diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala
index 6cc9117da3..138e059f94 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala
@@ -21,12 +21,14 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.ml.{Estimator, Model}
+import org.apache.spark.ml.linalg.{Vector, VectorUDT}
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util._
-import org.apache.spark.mllib.clustering.
- {BisectingKMeans => MLlibBisectingKMeans, BisectingKMeansModel => MLlibBisectingKMeansModel}
-import org.apache.spark.mllib.linalg.{Vector, VectorUDT}
+import org.apache.spark.mllib.clustering.{BisectingKMeans => MLlibBisectingKMeans, BisectingKMeansModel => MLlibBisectingKMeansModel}
+import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors}
+import org.apache.spark.mllib.linalg.VectorImplicits._
+import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.types.{IntegerType, StructType}
@@ -105,7 +107,7 @@ class BisectingKMeansModel private[ml] (
private[clustering] def predict(features: Vector): Int = parentModel.predict(features)
@Since("2.0.0")
- def clusterCenters: Array[Vector] = parentModel.clusterCenters
+ def clusterCenters: Array[Vector] = parentModel.clusterCenters.map(_.asML)
/**
* Computes the sum of squared distances between the input points and their corresponding cluster
@@ -115,7 +117,7 @@ class BisectingKMeansModel private[ml] (
def computeCost(dataset: Dataset[_]): Double = {
SchemaUtils.checkColumnType(dataset.schema, $(featuresCol), new VectorUDT)
val data = dataset.select(col($(featuresCol))).rdd.map { case Row(point: Vector) => point }
- parentModel.computeCost(data)
+ parentModel.computeCost(data.map(OldVectors.fromML))
}
@Since("2.0.0")
@@ -216,7 +218,9 @@ class BisectingKMeans @Since("2.0.0") (
@Since("2.0.0")
override def fit(dataset: Dataset[_]): BisectingKMeansModel = {
- val rdd = dataset.select(col($(featuresCol))).rdd.map { case Row(point: Vector) => point }
+ val rdd: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map {
+ case Row(point: Vector) => OldVectors.fromML(point)
+ }
val bkm = new MLlibBisectingKMeans()
.setK($(k))
diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala
index ac86e4ce25..63ca812609 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala
@@ -32,6 +32,7 @@ import org.apache.spark.ml.util._
import org.apache.spark.mllib.clustering.{GaussianMixture => MLlibGM}
import org.apache.spark.mllib.linalg.{Matrices => OldMatrices, Matrix => OldMatrix,
Vector => OldVector, Vectors => OldVectors, VectorUDT => OldVectorUDT}
+import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext}
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.types.{IntegerType, StructType}
@@ -94,8 +95,8 @@ class GaussianMixtureModel private[ml] (
@Since("2.0.0")
override def transform(dataset: Dataset[_]): DataFrame = {
- val predUDF = udf((vector: OldVector) => predict(vector.asML))
- val probUDF = udf((vector: OldVector) => OldVectors.fromML(predictProbability(vector.asML)))
+ val predUDF = udf((vector: Vector) => predict(vector))
+ val probUDF = udf((vector: Vector) => predictProbability(vector))
dataset.withColumn($(predictionCol), predUDF(col($(featuresCol))))
.withColumn($(probabilityCol), probUDF(col($(featuresCol))))
}
@@ -296,7 +297,9 @@ class GaussianMixture @Since("2.0.0") (
@Since("2.0.0")
override def fit(dataset: Dataset[_]): GaussianMixtureModel = {
- val rdd = dataset.select(col($(featuresCol))).rdd.map { case Row(point: OldVector) => point }
+ val rdd: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map {
+ case Row(point: Vector) => OldVectors.fromML(point)
+ }
val algo = new MLlibGM()
.setK($(k))
diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
index 42a25396ad..41c0aec0ec 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
@@ -22,11 +22,14 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.ml.{Estimator, Model}
+import org.apache.spark.ml.linalg.{Vector, VectorUDT}
import org.apache.spark.ml.param.{IntParam, Param, ParamMap, Params}
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util._
import org.apache.spark.mllib.clustering.{KMeans => MLlibKMeans, KMeansModel => MLlibKMeansModel}
-import org.apache.spark.mllib.linalg.{Vector, VectorUDT}
+import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors}
+import org.apache.spark.mllib.linalg.VectorImplicits._
+import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.types.{IntegerType, StructType}
@@ -127,7 +130,7 @@ class KMeansModel private[ml] (
private[clustering] def predict(features: Vector): Int = parentModel.predict(features)
@Since("1.5.0")
- def clusterCenters: Array[Vector] = parentModel.clusterCenters
+ def clusterCenters: Array[Vector] = parentModel.clusterCenters.map(_.asML)
/**
* Return the K-means cost (sum of squared distances of points to their nearest center) for this
@@ -137,7 +140,9 @@ class KMeansModel private[ml] (
@Since("2.0.0")
def computeCost(dataset: Dataset[_]): Double = {
SchemaUtils.checkColumnType(dataset.schema, $(featuresCol), new VectorUDT)
- val data = dataset.select(col($(featuresCol))).rdd.map { case Row(point: Vector) => point }
+ val data: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map {
+ case Row(point: Vector) => OldVectors.fromML(point)
+ }
parentModel.computeCost(data)
}
@@ -210,7 +215,8 @@ object KMeansModel extends MLReadable[KMeansModel] {
val dataPath = new Path(path, "data").toString
val data: Dataset[Data] = sqlContext.read.parquet(dataPath).as[Data]
val clusterCenters = data.collect().sortBy(_.clusterIdx).map(_.clusterCenter)
- val model = new KMeansModel(metadata.uid, new MLlibKMeansModel(clusterCenters))
+ val model = new KMeansModel(metadata.uid,
+ new MLlibKMeansModel(clusterCenters.map(OldVectors.fromML)))
DefaultParamsReader.getAndSetParams(model, metadata)
model
@@ -277,7 +283,9 @@ class KMeans @Since("1.5.0") (
@Since("2.0.0")
override def fit(dataset: Dataset[_]): KMeansModel = {
- val rdd = dataset.select(col($(featuresCol))).rdd.map { case Row(point: Vector) => point }
+ val rdd: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map {
+ case Row(point: Vector) => OldVectors.fromML(point)
+ }
val instr = Instrumentation.create(this, rdd)
instr.logParams(featuresCol, predictionCol, k, initMode, initSteps, maxIter, seed, tol)
diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
index 38ecc5a102..5a83b28700 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.annotation.{DeveloperApi, Experimental, Since}
import org.apache.spark.internal.Logging
import org.apache.spark.ml.{Estimator, Model}
+import org.apache.spark.ml.linalg.{Matrix, Vector, Vectors, VectorUDT}
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared.{HasCheckpointInterval, HasFeaturesCol, HasMaxIter, HasSeed}
import org.apache.spark.ml.util._
@@ -30,7 +31,10 @@ import org.apache.spark.mllib.clustering.{DistributedLDAModel => OldDistributedL
LDAOptimizer => OldLDAOptimizer, LocalLDAModel => OldLocalLDAModel,
OnlineLDAOptimizer => OldOnlineLDAOptimizer}
import org.apache.spark.mllib.impl.PeriodicCheckpointer
-import org.apache.spark.mllib.linalg.{Matrix, Vector, Vectors, VectorUDT}
+import org.apache.spark.mllib.linalg.{Matrices => OldMatrices, Vector => OldVector,
+ Vectors => OldVectors}
+import org.apache.spark.mllib.linalg.MatrixImplicits._
+import org.apache.spark.mllib.linalg.VectorImplicits._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.functions.{col, monotonicallyIncreasingId, udf}
@@ -405,7 +409,11 @@ sealed abstract class LDAModel private[ml] (
@Since("2.0.0")
override def transform(dataset: Dataset[_]): DataFrame = {
if ($(topicDistributionCol).nonEmpty) {
- val t = udf(oldLocalModel.getTopicDistributionMethod(sparkSession.sparkContext))
+
+ // TODO: Make the transformer natively in ml framework to avoid extra conversion.
+ val transformer = oldLocalModel.getTopicDistributionMethod(sparkSession.sparkContext)
+
+ val t = udf { (v: Vector) => transformer(OldVectors.fromML(v)).asML }
dataset.withColumn($(topicDistributionCol), t(col($(featuresCol)))).toDF
} else {
logWarning("LDAModel.transform was called without any output columns. Set an output column" +
@@ -437,7 +445,7 @@ sealed abstract class LDAModel private[ml] (
* collecting a large amount of data to the driver (on the order of vocabSize x k).
*/
@Since("1.6.0")
- def topicsMatrix: Matrix = oldLocalModel.topicsMatrix
+ def topicsMatrix: Matrix = oldLocalModel.topicsMatrix.asML
/** Indicates whether this instance is of type [[DistributedLDAModel]] */
@Since("1.6.0")
@@ -872,13 +880,13 @@ class LDA @Since("1.6.0") (
private[clustering] object LDA extends DefaultParamsReadable[LDA] {
/** Get dataset for spark.mllib LDA */
- def getOldDataset(dataset: Dataset[_], featuresCol: String): RDD[(Long, Vector)] = {
+ def getOldDataset(dataset: Dataset[_], featuresCol: String): RDD[(Long, OldVector)] = {
dataset
.withColumn("docId", monotonicallyIncreasingId())
.select("docId", featuresCol)
.rdd
.map { case Row(docId: Long, features: Vector) =>
- (docId, features)
+ (docId, OldVectors.fromML(features))
}
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala
index 0cbc391d96..bff72b20e1 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/BinaryClassificationEvaluator.scala
@@ -18,11 +18,11 @@
package org.apache.spark.ml.evaluation
import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.ml.linalg.{Vector, VectorUDT}
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable, SchemaUtils}
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
-import org.apache.spark.mllib.linalg.{Vector, VectorUDT}
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DoubleType
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala
index 35bc46a5f3..318c8b8b2f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala
@@ -22,10 +22,10 @@ import scala.collection.mutable.ArrayBuilder
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.attribute.BinaryAttribute
+import org.apache.spark.ml.linalg._
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}
import org.apache.spark.ml.util._
-import org.apache.spark.mllib.linalg._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/ChiSqSelector.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/ChiSqSelector.scala
index 29f55a7f71..e73a8f5d66 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/ChiSqSelector.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/ChiSqSelector.scala
@@ -22,12 +22,14 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.ml._
import org.apache.spark.ml.attribute.{AttributeGroup, _}
+import org.apache.spark.ml.linalg.{Vector, VectorUDT}
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util._
import org.apache.spark.mllib.feature
-import org.apache.spark.mllib.linalg.{Vector, VectorUDT}
-import org.apache.spark.mllib.regression.LabeledPoint
+import org.apache.spark.mllib.linalg.{Vectors => OldVectors}
+import org.apache.spark.mllib.regression.{LabeledPoint => OldLabeledPoint}
+import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DoubleType, StructField, StructType}
@@ -80,10 +82,11 @@ final class ChiSqSelector(override val uid: String)
@Since("2.0.0")
override def fit(dataset: Dataset[_]): ChiSqSelectorModel = {
transformSchema(dataset.schema, logging = true)
- val input = dataset.select(col($(labelCol)).cast(DoubleType), col($(featuresCol))).rdd.map {
- case Row(label: Double, features: Vector) =>
- LabeledPoint(label, features)
- }
+ val input: RDD[OldLabeledPoint] =
+ dataset.select(col($(labelCol)).cast(DoubleType), col($(featuresCol))).rdd.map {
+ case Row(label: Double, features: Vector) =>
+ OldLabeledPoint(label, OldVectors.fromML(features))
+ }
val chiSqSelector = new feature.ChiSqSelector($(numTopFeatures)).fit(input)
copyValues(new ChiSqSelectorModel(uid, chiSqSelector).setParent(this))
}
@@ -132,7 +135,11 @@ final class ChiSqSelectorModel private[ml] (
override def transform(dataset: Dataset[_]): DataFrame = {
val transformedSchema = transformSchema(dataset.schema, logging = true)
val newField = transformedSchema.last
- val selector = udf { chiSqSelector.transform _ }
+
+ // TODO: Make the transformer natively in ml framework to avoid extra conversion.
+ val transformer: Vector => Vector = v => chiSqSelector.transform(OldVectors.fromML(v)).asML
+
+ val selector = udf(transformer)
dataset.withColumn($(outputCol), selector(col($(featuresCol))), newField.metadata)
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala
index 3fbfce9d48..fc4885bf4b 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala
@@ -21,10 +21,10 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.ml.{Estimator, Model}
+import org.apache.spark.ml.linalg.{Vectors, VectorUDT}
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}
import org.apache.spark.ml.util._
-import org.apache.spark.mllib.linalg.{Vectors, VectorUDT}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.functions._
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/DCT.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/DCT.scala
index a6f878151d..301358ef12 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/DCT.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/DCT.scala
@@ -21,9 +21,9 @@ import edu.emory.mathcs.jtransforms.dct._
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.ml.UnaryTransformer
+import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT}
import org.apache.spark.ml.param.BooleanParam
import org.apache.spark.ml.util._
-import org.apache.spark.mllib.linalg.{Vector, Vectors, VectorUDT}
import org.apache.spark.sql.types.DataType
/**
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/ElementwiseProduct.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/ElementwiseProduct.scala
index 1b0a9a12e8..91989c3d2f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/ElementwiseProduct.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/ElementwiseProduct.scala
@@ -19,10 +19,12 @@ package org.apache.spark.ml.feature
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.ml.UnaryTransformer
+import org.apache.spark.ml.linalg.{Vector, VectorUDT}
import org.apache.spark.ml.param.Param
import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable}
import org.apache.spark.mllib.feature
-import org.apache.spark.mllib.linalg.{Vector, VectorUDT}
+import org.apache.spark.mllib.linalg.{Vectors => OldVectors}
+import org.apache.spark.mllib.linalg.VectorImplicits._
import org.apache.spark.sql.types.DataType
/**
@@ -52,7 +54,7 @@ class ElementwiseProduct(override val uid: String)
override protected def createTransformFunc: Vector => Vector = {
require(params.contains(scalingVec), s"transformation requires a weight vector")
val elemScaler = new feature.ElementwiseProduct($(scalingVec))
- elemScaler.transform
+ v => elemScaler.transform(v)
}
override protected def outputDataType: DataType = new VectorUDT()
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/HashingTF.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/HashingTF.scala
index 66ae91cfc0..94e1825ba6 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/HashingTF.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/HashingTF.scala
@@ -86,7 +86,8 @@ class HashingTF(override val uid: String)
override def transform(dataset: Dataset[_]): DataFrame = {
val outputSchema = transformSchema(dataset.schema)
val hashingTF = new feature.HashingTF($(numFeatures)).setBinary($(binary))
- val t = udf { terms: Seq[_] => hashingTF.transform(terms) }
+ // TODO: Make the hashingTF.transform natively in ml framework to avoid extra conversion.
+ val t = udf { terms: Seq[_] => hashingTF.transform(terms).asML }
val metadata = outputSchema($(outputCol)).metadata
dataset.select(col("*"), t(col($(inputCol))).as($(outputCol), metadata))
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala
index 5075b78c98..f85f4c65af 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala
@@ -21,11 +21,13 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.ml._
+import org.apache.spark.ml.linalg.{Vector, VectorUDT}
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util._
import org.apache.spark.mllib.feature
-import org.apache.spark.mllib.linalg.{Vector, VectorUDT}
+import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors}
+import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
@@ -79,7 +81,9 @@ final class IDF(override val uid: String) extends Estimator[IDFModel] with IDFBa
@Since("2.0.0")
override def fit(dataset: Dataset[_]): IDFModel = {
transformSchema(dataset.schema, logging = true)
- val input = dataset.select($(inputCol)).rdd.map { case Row(v: Vector) => v }
+ val input: RDD[OldVector] = dataset.select($(inputCol)).rdd.map {
+ case Row(v: Vector) => OldVectors.fromML(v)
+ }
val idf = new feature.IDF($(minDocFreq)).fit(input)
copyValues(new IDFModel(uid, idf).setParent(this))
}
@@ -119,7 +123,8 @@ class IDFModel private[ml] (
@Since("2.0.0")
override def transform(dataset: Dataset[_]): DataFrame = {
transformSchema(dataset.schema, logging = true)
- val idf = udf { vec: Vector => idfModel.transform(vec) }
+ // TODO: Make the idfModel.transform natively in ml framework to avoid extra conversion.
+ val idf = udf { vec: Vector => idfModel.transform(OldVectors.fromML(vec)).asML }
dataset.withColumn($(outputCol), idf(col($(inputCol))))
}
@@ -134,7 +139,7 @@ class IDFModel private[ml] (
/** Returns the IDF vector. */
@Since("1.6.0")
- def idf: Vector = idfModel.idf
+ def idf: Vector = idfModel.idf.asML
@Since("1.6.0")
override def write: MLWriter = new IDFModelWriter(this)
@@ -166,7 +171,7 @@ object IDFModel extends MLReadable[IDFModel] {
.select("idf")
.head()
val idf = data.getAs[Vector](0)
- val model = new IDFModel(metadata.uid, new feature.IDFModel(idf))
+ val model = new IDFModel(metadata.uid, new feature.IDFModel(OldVectors.fromML(idf)))
DefaultParamsReader.getAndSetParams(model, metadata)
model
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala
index 12176757ae..cce3ca45cc 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala
@@ -17,7 +17,7 @@
package org.apache.spark.ml.feature
-import org.apache.spark.mllib.linalg.Vector
+import org.apache.spark.ml.linalg.Vector
/**
* Class that represents an instance of weighted data point with label and features.
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Interaction.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Interaction.scala
index 9ca34e9ae2..fa65ff9879 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/Interaction.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Interaction.scala
@@ -26,7 +26,7 @@ import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util._
import org.apache.spark.ml.Transformer
-import org.apache.spark.mllib.linalg.{Vector, Vectors, VectorUDT}
+import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT}
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/LabeledPoint.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/LabeledPoint.scala
new file mode 100644
index 0000000000..f7f1d42039
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/LabeledPoint.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.feature
+
+import scala.beans.BeanInfo
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.ml.linalg.Vector
+
+/**
+ * Class that represents the features and labels of a data point.
+ *
+ * @param label Label for this data point.
+ * @param features List of features for this data point.
+ */
+@Since("2.0.0")
+@Experimental
+@BeanInfo
+case class LabeledPoint(@Since("2.0.0") label: Double, @Since("2.0.0") features: Vector) {
+ override def toString: String = {
+ s"($label,$features)"
+ }
+}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala
index e9df600c8a..0dffba93ac 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala
@@ -21,11 +21,13 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.ml.{Estimator, Model}
+import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT}
import org.apache.spark.ml.param.{ParamMap, Params}
import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}
import org.apache.spark.ml.util._
-import org.apache.spark.mllib.linalg.{Vector, Vectors, VectorUDT}
+import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors}
import org.apache.spark.mllib.stat.Statistics
+import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructField, StructType}
@@ -69,7 +71,9 @@ class MaxAbsScaler @Since("2.0.0") (override val uid: String)
@Since("2.0.0")
override def fit(dataset: Dataset[_]): MaxAbsScalerModel = {
transformSchema(dataset.schema, logging = true)
- val input = dataset.select($(inputCol)).rdd.map { case Row(v: Vector) => v }
+ val input: RDD[OldVector] = dataset.select($(inputCol)).rdd.map {
+ case Row(v: Vector) => OldVectors.fromML(v)
+ }
val summary = Statistics.colStats(input)
val minVals = summary.min.toArray
val maxVals = summary.max.toArray
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala
index 125becbb8a..c6ff639f29 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala
@@ -21,11 +21,14 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.ml.{Estimator, Model}
+import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT}
import org.apache.spark.ml.param.{DoubleParam, ParamMap, Params}
import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}
import org.apache.spark.ml.util._
-import org.apache.spark.mllib.linalg.{Vector, Vectors, VectorUDT}
+import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors}
+import org.apache.spark.mllib.linalg.VectorImplicits._
import org.apache.spark.mllib.stat.Statistics
+import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructField, StructType}
@@ -106,7 +109,9 @@ class MinMaxScaler(override val uid: String)
@Since("2.0.0")
override def fit(dataset: Dataset[_]): MinMaxScalerModel = {
transformSchema(dataset.schema, logging = true)
- val input = dataset.select($(inputCol)).rdd.map { case Row(v: Vector) => v }
+ val input: RDD[OldVector] = dataset.select($(inputCol)).rdd.map {
+ case Row(v: Vector) => OldVectors.fromML(v)
+ }
val summary = Statistics.colStats(input)
copyValues(new MinMaxScalerModel(uid, summary.min, summary.max).setParent(this))
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Normalizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Normalizer.scala
index a603b3f833..942ac7ebdb 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/Normalizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Normalizer.scala
@@ -19,10 +19,11 @@ package org.apache.spark.ml.feature
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.ml.UnaryTransformer
+import org.apache.spark.ml.linalg.{Vector, VectorUDT}
import org.apache.spark.ml.param.{DoubleParam, ParamValidators}
import org.apache.spark.ml.util._
import org.apache.spark.mllib.feature
-import org.apache.spark.mllib.linalg.{Vector, VectorUDT}
+import org.apache.spark.mllib.linalg.{Vectors => OldVectors}
import org.apache.spark.sql.types.DataType
/**
@@ -52,7 +53,7 @@ class Normalizer(override val uid: String)
override protected def createTransformFunc: Vector => Vector = {
val normalizer = new feature.Normalizer($(p))
- normalizer.transform
+ vector => normalizer.transform(OldVectors.fromML(vector)).asML
}
override protected def outputDataType: DataType = new VectorUDT()
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala
index 99357793db..3d1e6dd818 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala
@@ -20,10 +20,10 @@ package org.apache.spark.ml.feature
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.attribute._
+import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}
import org.apache.spark.ml.util._
-import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.types.{DoubleType, NumericType, StructType}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala
index 9cf722e121..141d3b924b 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala
@@ -21,11 +21,16 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.ml._
+import org.apache.spark.ml.linalg._
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util._
import org.apache.spark.mllib.feature
-import org.apache.spark.mllib.linalg._
+import org.apache.spark.mllib.linalg.{DenseMatrix => OldDenseMatrix, DenseVector => OldDenseVector,
+ Matrices => OldMatrices, Vector => OldVector, Vectors => OldVectors}
+import org.apache.spark.mllib.linalg.MatrixImplicits._
+import org.apache.spark.mllib.linalg.VectorImplicits._
+import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructField, StructType}
@@ -71,7 +76,9 @@ class PCA (override val uid: String) extends Estimator[PCAModel] with PCAParams
@Since("2.0.0")
override def fit(dataset: Dataset[_]): PCAModel = {
transformSchema(dataset.schema, logging = true)
- val input = dataset.select($(inputCol)).rdd.map { case Row(v: Vector) => v}
+ val input: RDD[OldVector] = dataset.select($(inputCol)).rdd.map {
+ case Row(v: Vector) => OldVectors.fromML(v)
+ }
val pca = new feature.PCA(k = $(k))
val pcaModel = pca.fit(input)
copyValues(new PCAModel(uid, pcaModel.pc, pcaModel.explainedVariance).setParent(this))
@@ -128,8 +135,14 @@ class PCAModel private[ml] (
@Since("2.0.0")
override def transform(dataset: Dataset[_]): DataFrame = {
transformSchema(dataset.schema, logging = true)
- val pcaModel = new feature.PCAModel($(k), pc, explainedVariance)
- val pcaOp = udf { pcaModel.transform _ }
+ val pcaModel = new feature.PCAModel($(k),
+ OldMatrices.fromML(pc).asInstanceOf[OldDenseMatrix],
+ OldVectors.fromML(explainedVariance).asInstanceOf[OldDenseVector])
+
+ // TODO: Make the transformer natively in ml framework to avoid extra conversion.
+ val transformer: Vector => Vector = v => pcaModel.transform(OldVectors.fromML(v)).asML
+
+ val pcaOp = udf(transformer)
dataset.withColumn($(outputCol), pcaOp(col($(inputCol))))
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/PolynomialExpansion.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/PolynomialExpansion.scala
index 0a9b9719c1..a01867701b 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/PolynomialExpansion.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/PolynomialExpansion.scala
@@ -21,9 +21,9 @@ import scala.collection.mutable
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.ml.UnaryTransformer
+import org.apache.spark.ml.linalg._
import org.apache.spark.ml.param.{IntParam, ParamMap, ParamValidators}
import org.apache.spark.ml.util._
-import org.apache.spark.mllib.linalg._
import org.apache.spark.sql.types.DataType
/**
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala
index a2f3d44132..c0feaa01fc 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala
@@ -25,10 +25,10 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.ml.{Estimator, Model, Pipeline, PipelineModel, PipelineStage, Transformer}
import org.apache.spark.ml.attribute.AttributeGroup
+import org.apache.spark.ml.linalg.VectorUDT
import org.apache.spark.ml.param.{Param, ParamMap}
import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasLabelCol}
import org.apache.spark.ml.util._
-import org.apache.spark.mllib.linalg.VectorUDT
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.types._
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala
index cf52710ab8..19aecff038 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala
@@ -20,7 +20,7 @@ package org.apache.spark.ml.feature
import scala.collection.mutable
import scala.util.parsing.combinator.RegexParsers
-import org.apache.spark.mllib.linalg.VectorUDT
+import org.apache.spark.ml.linalg.VectorUDT
import org.apache.spark.sql.types._
/**
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
index 626e97efb4..9d084b520c 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala
@@ -21,11 +21,14 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.ml._
+import org.apache.spark.ml.linalg.{Vector, VectorUDT}
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util._
import org.apache.spark.mllib.feature
-import org.apache.spark.mllib.linalg.{Vector, VectorUDT}
+import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors}
+import org.apache.spark.mllib.linalg.VectorImplicits._
+import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructField, StructType}
@@ -93,7 +96,9 @@ class StandardScaler(override val uid: String) extends Estimator[StandardScalerM
@Since("2.0.0")
override def fit(dataset: Dataset[_]): StandardScalerModel = {
transformSchema(dataset.schema, logging = true)
- val input = dataset.select($(inputCol)).rdd.map { case Row(v: Vector) => v }
+ val input: RDD[OldVector] = dataset.select($(inputCol)).rdd.map {
+ case Row(v: Vector) => OldVectors.fromML(v)
+ }
val scaler = new feature.StandardScaler(withMean = $(withMean), withStd = $(withStd))
val scalerModel = scaler.fit(input)
copyValues(new StandardScalerModel(uid, scalerModel.std, scalerModel.mean).setParent(this))
@@ -145,7 +150,11 @@ class StandardScalerModel private[ml] (
override def transform(dataset: Dataset[_]): DataFrame = {
transformSchema(dataset.schema, logging = true)
val scaler = new feature.StandardScalerModel(std, mean, $(withStd), $(withMean))
- val scale = udf { scaler.transform _ }
+
+ // TODO: Make the transformer natively in ml framework to avoid extra conversion.
+ val transformer: Vector => Vector = v => scaler.transform(OldVectors.fromML(v)).asML
+
+ val scale = udf(transformer)
dataset.withColumn($(outputCol), scale(col($(inputCol))))
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala
index 4d3e46e488..1bc24202b7 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala
@@ -23,10 +23,10 @@ import org.apache.spark.SparkException
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NumericAttribute, UnresolvedAttribute}
+import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util._
-import org.apache.spark.mllib.linalg.{Vector, Vectors, VectorUDT}
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala
index 68b699d569..2bc9d225ac 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala
@@ -27,10 +27,10 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.ml.{Estimator, Model}
import org.apache.spark.ml.attribute._
+import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, VectorUDT}
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util._
-import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, VectorUDT}
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.types.{StructField, StructType}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorSlicer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorSlicer.scala
index 7a9468b87b..103738cd91 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorSlicer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorSlicer.scala
@@ -20,10 +20,10 @@ package org.apache.spark.ml.feature
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.attribute.{Attribute, AttributeGroup}
+import org.apache.spark.ml.linalg._
import org.apache.spark.ml.param.{IntArrayParam, ParamMap, StringArrayParam}
import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}
import org.apache.spark.ml.util._
-import org.apache.spark.mllib.linalg._
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala
index c49e263df0..1469bfd5e8 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala
@@ -22,11 +22,12 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkContext
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.ml.{Estimator, Model}
+import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors, VectorUDT}
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util._
import org.apache.spark.mllib.feature
-import org.apache.spark.mllib.linalg.{BLAS, Vector, Vectors, VectorUDT}
+import org.apache.spark.mllib.linalg.VectorImplicits._
import org.apache.spark.sql.{DataFrame, Dataset, SQLContext}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
diff --git a/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala b/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala
index c29f7f86e9..0b9b2ff5c5 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.types._
* User-defined type for [[Vector]] in [[mllib-local]] which allows easy interaction with SQL
* via [[org.apache.spark.sql.Dataset]].
*/
-private[ml] class VectorUDT extends UserDefinedType[Vector] {
+private[spark] class VectorUDT extends UserDefinedType[Vector] {
override def sqlType: StructType = {
// type: 0 = sparse, 1 = dense
diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala
index a2b52835e1..6ed193cf57 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala
@@ -19,7 +19,7 @@ package org.apache.spark.ml.optim
import org.apache.spark.internal.Logging
import org.apache.spark.ml.feature.Instance
-import org.apache.spark.mllib.linalg._
+import org.apache.spark.ml.linalg._
import org.apache.spark.rdd.RDD
/**
diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala
index 7d21302f96..8f5f4427e1 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala
@@ -19,7 +19,8 @@ package org.apache.spark.ml.optim
import org.apache.spark.internal.Logging
import org.apache.spark.ml.feature.Instance
-import org.apache.spark.mllib.linalg._
+import org.apache.spark.ml.linalg._
+import org.apache.spark.mllib.linalg.CholeskyDecomposition
import org.apache.spark.rdd.RDD
/**
diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala
index c368aadd23..82f2de7ccd 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala
@@ -29,8 +29,9 @@ import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.apache.spark.annotation.{DeveloperApi, Experimental, Since}
+import org.apache.spark.ml.linalg.{Vector, Vectors}
+import org.apache.spark.ml.linalg.JsonVectorConverter
import org.apache.spark.ml.util.Identifiable
-import org.apache.spark.mllib.linalg.{Vector, Vectors}
/**
* :: DeveloperApi ::
@@ -92,7 +93,7 @@ class Param[T](val parent: String, val name: String, val doc: String, val isVali
case x: String =>
compact(render(JString(x)))
case v: Vector =>
- v.toJson
+ JsonVectorConverter.toJson(v)
case _ =>
throw new NotImplementedError(
"The default jsonEncode only supports string and vector. " +
@@ -128,7 +129,7 @@ private[ml] object Param {
val keys = v.map(_._1)
assert(keys.contains("type") && keys.contains("values"),
s"Expect a JSON serialized vector but cannot find fields 'type' and 'values' in $json.")
- Vectors.fromJson(json).asInstanceOf[T]
+ JsonVectorConverter.fromJson(json).asInstanceOf[T]
case _ =>
throw new NotImplementedError(
"The default jsonDecode only supports string and vector. " +
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
index 89ba6ab5d2..cc16c2f038 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
@@ -27,10 +27,11 @@ import org.apache.spark.SparkException
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.internal.Logging
import org.apache.spark.ml.{Estimator, Model}
+import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors, VectorUDT}
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util._
-import org.apache.spark.mllib.linalg.{BLAS, Vector, Vectors, VectorUDT}
+import org.apache.spark.mllib.linalg.VectorImplicits._
import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala
index 874d2a81db..c4df9d1112 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala
@@ -23,13 +23,13 @@ import org.json4s.JsonDSL._
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.ml.{PredictionModel, Predictor}
+import org.apache.spark.ml.feature.LabeledPoint
+import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.tree._
import org.apache.spark.ml.tree.DecisionTreeModelReadWrite._
import org.apache.spark.ml.tree.impl.RandomForest
import org.apache.spark.ml.util._
-import org.apache.spark.mllib.linalg.Vector
-import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo, Strategy => OldStrategy}
import org.apache.spark.mllib.tree.model.{DecisionTreeModel => OldDecisionTreeModel}
import org.apache.spark.rdd.RDD
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GBTRegressor.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GBTRegressor.scala
index c41fb4b062..81f2139f0b 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/GBTRegressor.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GBTRegressor.scala
@@ -24,13 +24,13 @@ import org.json4s.JsonDSL._
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.internal.Logging
import org.apache.spark.ml.{PredictionModel, Predictor}
+import org.apache.spark.ml.feature.LabeledPoint
+import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.tree._
import org.apache.spark.ml.tree.impl.GradientBoostedTrees
import org.apache.spark.ml.util._
import org.apache.spark.ml.util.DefaultParamsReader.Metadata
-import org.apache.spark.mllib.linalg.Vector
-import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo}
import org.apache.spark.mllib.tree.model.{GradientBoostedTreesModel => OldGBTModel}
import org.apache.spark.rdd.RDD
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
index 05fffa0d97..4aa7c2cc0b 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
@@ -25,11 +25,11 @@ import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.internal.Logging
import org.apache.spark.ml.PredictorParams
import org.apache.spark.ml.feature.Instance
+import org.apache.spark.ml.linalg.{BLAS, Vector}
import org.apache.spark.ml.optim._
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util._
-import org.apache.spark.mllib.linalg.{BLAS, Vector}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row}
import org.apache.spark.sql.functions._
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala
index 7a78ecbdf1..ba0f59e89b 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala
@@ -22,11 +22,11 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.internal.Logging
import org.apache.spark.ml.{Estimator, Model}
+import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT}
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.regression.IsotonicRegressionModel.IsotonicRegressionModelWriter
import org.apache.spark.ml.util._
-import org.apache.spark.mllib.linalg.{Vector, Vectors, VectorUDT}
import org.apache.spark.mllib.regression.{IsotonicRegression => MLlibIsotonicRegression}
import org.apache.spark.mllib.regression.{IsotonicRegressionModel => MLlibIsotonicRegressionModel}
import org.apache.spark.rdd.RDD
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
index d13b15fd82..3e9a3f9db5 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
@@ -28,14 +28,16 @@ import org.apache.spark.SparkException
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.internal.Logging
import org.apache.spark.ml.feature.Instance
+import org.apache.spark.ml.linalg.{Vector, Vectors}
+import org.apache.spark.ml.linalg.BLAS._
import org.apache.spark.ml.optim.WeightedLeastSquares
import org.apache.spark.ml.PredictorParams
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util._
import org.apache.spark.mllib.evaluation.RegressionMetrics
-import org.apache.spark.mllib.linalg.{Vector, Vectors}
-import org.apache.spark.mllib.linalg.BLAS._
+import org.apache.spark.mllib.linalg.{Vectors => OldVectors}
+import org.apache.spark.mllib.linalg.VectorImplicits._
import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala
index 9605de7202..a6dbf21d55 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala
@@ -22,13 +22,13 @@ import org.json4s.JsonDSL._
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.ml.{PredictionModel, Predictor}
+import org.apache.spark.ml.feature.LabeledPoint
+import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.tree._
import org.apache.spark.ml.tree.impl.RandomForest
import org.apache.spark.ml.util._
import org.apache.spark.ml.util.DefaultParamsReader.Metadata
-import org.apache.spark.mllib.linalg.Vector
-import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo}
import org.apache.spark.mllib.tree.model.{RandomForestModel => OldRandomForestModel}
import org.apache.spark.rdd.RDD
diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
index 39bdd1afad..5ba768d551 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
@@ -26,8 +26,8 @@ import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.spark.annotation.Since
-import org.apache.spark.mllib.linalg.{Vector, Vectors, VectorUDT}
-import org.apache.spark.mllib.regression.LabeledPoint
+import org.apache.spark.ml.feature.LabeledPoint
+import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT}
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.sql.{DataFrame, DataFrameReader, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala
index f71d28cf59..d5e5c45460 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala
@@ -18,7 +18,7 @@
package org.apache.spark.ml.tree
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.mllib.linalg.Vector
+import org.apache.spark.ml.linalg.Vector
import org.apache.spark.mllib.tree.impurity.ImpurityCalculator
import org.apache.spark.mllib.tree.model.{ImpurityStats,
InformationGainStats => OldInformationGainStats, Node => OldNode, Predict => OldPredict}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/Split.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/Split.scala
index a4287483d1..9704e15cd8 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/tree/Split.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/tree/Split.scala
@@ -20,7 +20,7 @@ package org.apache.spark.ml.tree
import java.util.Objects
import org.apache.spark.annotation.{DeveloperApi, Since}
-import org.apache.spark.mllib.linalg.Vector
+import org.apache.spark.ml.linalg.Vector
import org.apache.spark.mllib.tree.configuration.{FeatureType => OldFeatureType}
import org.apache.spark.mllib.tree.model.{Split => OldSplit}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/DecisionTreeMetadata.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/DecisionTreeMetadata.scala
index 5f7c40f607..442f52bf02 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/DecisionTreeMetadata.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/DecisionTreeMetadata.scala
@@ -21,8 +21,8 @@ import scala.collection.mutable
import scala.util.Try
import org.apache.spark.internal.Logging
+import org.apache.spark.ml.feature.LabeledPoint
import org.apache.spark.ml.tree.RandomForestParams
-import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.configuration.Algo._
import org.apache.spark.mllib.tree.configuration.QuantileStrategy._
import org.apache.spark.mllib.tree.configuration.Strategy
diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/GradientBoostedTrees.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/GradientBoostedTrees.scala
index b6334762c7..a0faff236e 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/GradientBoostedTrees.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/GradientBoostedTrees.scala
@@ -18,10 +18,10 @@
package org.apache.spark.ml.tree.impl
import org.apache.spark.internal.Logging
+import org.apache.spark.ml.feature.LabeledPoint
+import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.regression.{DecisionTreeRegressionModel, DecisionTreeRegressor}
import org.apache.spark.mllib.impl.PeriodicRDDCheckpointer
-import org.apache.spark.mllib.linalg.Vector
-import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo}
import org.apache.spark.mllib.tree.configuration.{BoostingStrategy => OldBoostingStrategy}
import org.apache.spark.mllib.tree.impurity.{Variance => OldVariance}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala
index 2038a6873d..be3792eb77 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala
@@ -24,10 +24,10 @@ import scala.util.Random
import org.apache.spark.internal.Logging
import org.apache.spark.ml.classification.DecisionTreeClassificationModel
+import org.apache.spark.ml.feature.LabeledPoint
import org.apache.spark.ml.regression.DecisionTreeRegressionModel
import org.apache.spark.ml.tree._
import org.apache.spark.ml.util.Instrumentation
-import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo, Strategy => OldStrategy}
import org.apache.spark.mllib.tree.impurity.ImpurityCalculator
import org.apache.spark.mllib.tree.model.ImpurityStats
diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/TreePoint.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/TreePoint.scala
index 3a2bf3c725..a6ac64a046 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/TreePoint.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/TreePoint.scala
@@ -17,8 +17,8 @@
package org.apache.spark.ml.tree.impl
+import org.apache.spark.ml.feature.LabeledPoint
import org.apache.spark.ml.tree.{ContinuousSplit, Split}
-import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/treeModels.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/treeModels.scala
index f38e1ec7c0..56c85c9b53 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/tree/treeModels.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/tree/treeModels.scala
@@ -23,11 +23,11 @@ import org.apache.hadoop.fs.Path
import org.json4s._
import org.json4s.jackson.JsonMethods._
+import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.{Param, Params}
import org.apache.spark.ml.tree.DecisionTreeModelReadWrite.NodeData
import org.apache.spark.ml.util.{DefaultParamsReader, DefaultParamsWriter}
import org.apache.spark.ml.util.DefaultParamsReader.Metadata
-import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.tree.impurity.ImpurityCalculator
import org.apache.spark.mllib.tree.model.{DecisionTreeModel => OldDecisionTreeModel}
import org.apache.spark.rdd.RDD
diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/MetadataUtils.scala b/mllib/src/main/scala/org/apache/spark/ml/util/MetadataUtils.scala
index 96a38a3bde..f34a8310dd 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/util/MetadataUtils.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/util/MetadataUtils.scala
@@ -20,7 +20,7 @@ package org.apache.spark.ml.util
import scala.collection.immutable.HashMap
import org.apache.spark.ml.attribute._
-import org.apache.spark.mllib.linalg.VectorUDT
+import org.apache.spark.ml.linalg.VectorUDT
import org.apache.spark.sql.types.StructField
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index 8daee7b3aa..90d3827531 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -30,6 +30,8 @@ import net.razorvine.pickle._
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.api.python.SerDeUtil
+import org.apache.spark.ml.feature.{LabeledPoint => MLLabeledPoint}
+import org.apache.spark.ml.linalg.{DenseMatrix => NewDenseMatrix, DenseVector => NewDenseVector, SparseMatrix => NewSparseMatrix, SparseVector => NewSparseVector, Vector => NewVector, Vectors => NewVectors}
import org.apache.spark.mllib.classification._
import org.apache.spark.mllib.clustering._
import org.apache.spark.mllib.evaluation.RankingMetrics
@@ -1207,6 +1209,7 @@ private[python] class PythonMLLibAPI extends Serializable {
private[spark] object SerDe extends Serializable {
val PYSPARK_PACKAGE = "pyspark.mllib"
+ val PYSPARK_ML_PACKAGE = "pyspark.ml"
/**
* Base class used for pickle
@@ -1214,8 +1217,10 @@ private[spark] object SerDe extends Serializable {
private[python] abstract class BasePickler[T: ClassTag]
extends IObjectPickler with IObjectConstructor {
+ protected def packageName: String = PYSPARK_PACKAGE
+
private val cls = implicitly[ClassTag[T]].runtimeClass
- private val module = PYSPARK_PACKAGE + "." + cls.getName.split('.')(4)
+ private val module = packageName + "." + cls.getName.split('.')(4)
private val name = cls.getSimpleName
// register this to Pickler and Unpickler
@@ -1262,7 +1267,7 @@ private[spark] object SerDe extends Serializable {
private[python] def saveState(obj: Object, out: OutputStream, pickler: Pickler)
}
- // Pickler for DenseVector
+ // Pickler for (mllib) DenseVector
private[python] class DenseVectorPickler extends BasePickler[DenseVector] {
def saveState(obj: Object, out: OutputStream, pickler: Pickler): Unit = {
@@ -1294,7 +1299,41 @@ private[spark] object SerDe extends Serializable {
}
}
- // Pickler for DenseMatrix
+ // Pickler for (new) DenseVector
+ private[python] class NewDenseVectorPickler extends BasePickler[NewDenseVector] {
+
+ override protected def packageName = PYSPARK_ML_PACKAGE
+
+ def saveState(obj: Object, out: OutputStream, pickler: Pickler): Unit = {
+ val vector: NewDenseVector = obj.asInstanceOf[NewDenseVector]
+ val bytes = new Array[Byte](8 * vector.size)
+ val bb = ByteBuffer.wrap(bytes)
+ bb.order(ByteOrder.nativeOrder())
+ val db = bb.asDoubleBuffer()
+ db.put(vector.values)
+
+ out.write(Opcodes.BINSTRING)
+ out.write(PickleUtils.integer_to_bytes(bytes.length))
+ out.write(bytes)
+ out.write(Opcodes.TUPLE1)
+ }
+
+ def construct(args: Array[Object]): Object = {
+ require(args.length == 1)
+ if (args.length != 1) {
+ throw new PickleException("should be 1")
+ }
+ val bytes = getBytes(args(0))
+ val bb = ByteBuffer.wrap(bytes, 0, bytes.length)
+ bb.order(ByteOrder.nativeOrder())
+ val db = bb.asDoubleBuffer()
+ val ans = new Array[Double](bytes.length / 8)
+ db.get(ans)
+ NewVectors.dense(ans)
+ }
+ }
+
+ // Pickler for (mllib) DenseMatrix
private[python] class DenseMatrixPickler extends BasePickler[DenseMatrix] {
def saveState(obj: Object, out: OutputStream, pickler: Pickler): Unit = {
@@ -1331,7 +1370,46 @@ private[spark] object SerDe extends Serializable {
}
}
- // Pickler for SparseMatrix
+ // Pickler for (new) DenseMatrix
+ private[python] class NewDenseMatrixPickler extends BasePickler[NewDenseMatrix] {
+
+ override protected def packageName = PYSPARK_ML_PACKAGE
+
+ def saveState(obj: Object, out: OutputStream, pickler: Pickler): Unit = {
+ val m: NewDenseMatrix = obj.asInstanceOf[NewDenseMatrix]
+ val bytes = new Array[Byte](8 * m.values.length)
+ val order = ByteOrder.nativeOrder()
+ val isTransposed = if (m.isTransposed) 1 else 0
+ ByteBuffer.wrap(bytes).order(order).asDoubleBuffer().put(m.values)
+
+ out.write(Opcodes.MARK)
+ out.write(Opcodes.BININT)
+ out.write(PickleUtils.integer_to_bytes(m.numRows))
+ out.write(Opcodes.BININT)
+ out.write(PickleUtils.integer_to_bytes(m.numCols))
+ out.write(Opcodes.BINSTRING)
+ out.write(PickleUtils.integer_to_bytes(bytes.length))
+ out.write(bytes)
+ out.write(Opcodes.BININT)
+ out.write(PickleUtils.integer_to_bytes(isTransposed))
+ out.write(Opcodes.TUPLE)
+ }
+
+ def construct(args: Array[Object]): Object = {
+ if (args.length != 4) {
+ throw new PickleException("should be 4")
+ }
+ val bytes = getBytes(args(2))
+ val n = bytes.length / 8
+ val values = new Array[Double](n)
+ val order = ByteOrder.nativeOrder()
+ ByteBuffer.wrap(bytes).order(order).asDoubleBuffer().get(values)
+ val isTransposed = args(3).asInstanceOf[Int] == 1
+ new NewDenseMatrix(args(0).asInstanceOf[Int], args(1).asInstanceOf[Int], values, isTransposed)
+ }
+ }
+
+ // Pickler for (mllib) SparseMatrix
private[python] class SparseMatrixPickler extends BasePickler[SparseMatrix] {
def saveState(obj: Object, out: OutputStream, pickler: Pickler): Unit = {
@@ -1386,7 +1464,64 @@ private[spark] object SerDe extends Serializable {
}
}
- // Pickler for SparseVector
+ // Pickler for (new) SparseMatrix
+ private[python] class NewSparseMatrixPickler extends BasePickler[NewSparseMatrix] {
+
+ override protected def packageName = PYSPARK_ML_PACKAGE
+
+ def saveState(obj: Object, out: OutputStream, pickler: Pickler): Unit = {
+ val s = obj.asInstanceOf[NewSparseMatrix]
+ val order = ByteOrder.nativeOrder()
+
+ val colPtrsBytes = new Array[Byte](4 * s.colPtrs.length)
+ val indicesBytes = new Array[Byte](4 * s.rowIndices.length)
+ val valuesBytes = new Array[Byte](8 * s.values.length)
+ val isTransposed = if (s.isTransposed) 1 else 0
+ ByteBuffer.wrap(colPtrsBytes).order(order).asIntBuffer().put(s.colPtrs)
+ ByteBuffer.wrap(indicesBytes).order(order).asIntBuffer().put(s.rowIndices)
+ ByteBuffer.wrap(valuesBytes).order(order).asDoubleBuffer().put(s.values)
+
+ out.write(Opcodes.MARK)
+ out.write(Opcodes.BININT)
+ out.write(PickleUtils.integer_to_bytes(s.numRows))
+ out.write(Opcodes.BININT)
+ out.write(PickleUtils.integer_to_bytes(s.numCols))
+ out.write(Opcodes.BINSTRING)
+ out.write(PickleUtils.integer_to_bytes(colPtrsBytes.length))
+ out.write(colPtrsBytes)
+ out.write(Opcodes.BINSTRING)
+ out.write(PickleUtils.integer_to_bytes(indicesBytes.length))
+ out.write(indicesBytes)
+ out.write(Opcodes.BINSTRING)
+ out.write(PickleUtils.integer_to_bytes(valuesBytes.length))
+ out.write(valuesBytes)
+ out.write(Opcodes.BININT)
+ out.write(PickleUtils.integer_to_bytes(isTransposed))
+ out.write(Opcodes.TUPLE)
+ }
+
+ def construct(args: Array[Object]): Object = {
+ if (args.length != 6) {
+ throw new PickleException("should be 6")
+ }
+ val order = ByteOrder.nativeOrder()
+ val colPtrsBytes = getBytes(args(2))
+ val indicesBytes = getBytes(args(3))
+ val valuesBytes = getBytes(args(4))
+ val colPtrs = new Array[Int](colPtrsBytes.length / 4)
+ val rowIndices = new Array[Int](indicesBytes.length / 4)
+ val values = new Array[Double](valuesBytes.length / 8)
+ ByteBuffer.wrap(colPtrsBytes).order(order).asIntBuffer().get(colPtrs)
+ ByteBuffer.wrap(indicesBytes).order(order).asIntBuffer().get(rowIndices)
+ ByteBuffer.wrap(valuesBytes).order(order).asDoubleBuffer().get(values)
+ val isTransposed = args(5).asInstanceOf[Int] == 1
+ new NewSparseMatrix(
+ args(0).asInstanceOf[Int], args(1).asInstanceOf[Int], colPtrs, rowIndices, values,
+ isTransposed)
+ }
+ }
+
+ // Pickler for (mllib) SparseVector
private[python] class SparseVectorPickler extends BasePickler[SparseVector] {
def saveState(obj: Object, out: OutputStream, pickler: Pickler): Unit = {
@@ -1428,7 +1563,51 @@ private[spark] object SerDe extends Serializable {
}
}
- // Pickler for LabeledPoint
+ // Pickler for (new) SparseVector
+ private[python] class NewSparseVectorPickler extends BasePickler[NewSparseVector] {
+
+ override protected def packageName = PYSPARK_ML_PACKAGE
+
+ def saveState(obj: Object, out: OutputStream, pickler: Pickler): Unit = {
+ val v: NewSparseVector = obj.asInstanceOf[NewSparseVector]
+ val n = v.indices.length
+ val indiceBytes = new Array[Byte](4 * n)
+ val order = ByteOrder.nativeOrder()
+ ByteBuffer.wrap(indiceBytes).order(order).asIntBuffer().put(v.indices)
+ val valueBytes = new Array[Byte](8 * n)
+ ByteBuffer.wrap(valueBytes).order(order).asDoubleBuffer().put(v.values)
+
+ out.write(Opcodes.BININT)
+ out.write(PickleUtils.integer_to_bytes(v.size))
+ out.write(Opcodes.BINSTRING)
+ out.write(PickleUtils.integer_to_bytes(indiceBytes.length))
+ out.write(indiceBytes)
+ out.write(Opcodes.BINSTRING)
+ out.write(PickleUtils.integer_to_bytes(valueBytes.length))
+ out.write(valueBytes)
+ out.write(Opcodes.TUPLE3)
+ }
+
+ def construct(args: Array[Object]): Object = {
+ if (args.length != 3) {
+ throw new PickleException("should be 3")
+ }
+ val size = args(0).asInstanceOf[Int]
+ val indiceBytes = getBytes(args(1))
+ val valueBytes = getBytes(args(2))
+ val n = indiceBytes.length / 4
+ val indices = new Array[Int](n)
+ val values = new Array[Double](n)
+ if (n > 0) {
+ val order = ByteOrder.nativeOrder()
+ ByteBuffer.wrap(indiceBytes).order(order).asIntBuffer().get(indices)
+ ByteBuffer.wrap(valueBytes).order(order).asDoubleBuffer().get(values)
+ }
+ new NewSparseVector(size, indices, values)
+ }
+ }
+
+ // Pickler for MLlib LabeledPoint
private[python] class LabeledPointPickler extends BasePickler[LabeledPoint] {
def saveState(obj: Object, out: OutputStream, pickler: Pickler): Unit = {
@@ -1482,6 +1661,10 @@ private[spark] object SerDe extends Serializable {
new DenseMatrixPickler().register()
new SparseMatrixPickler().register()
new SparseVectorPickler().register()
+ new NewDenseVectorPickler().register()
+ new NewDenseMatrixPickler().register()
+ new NewSparseMatrixPickler().register()
+ new NewSparseVectorPickler().register()
new LabeledPointPickler().register()
new RatingPickler().register()
initialized = true
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
index 1d25a58e0f..f3c52f61a3 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
@@ -431,7 +431,7 @@ class LogisticRegressionWithLBFGS
if (userSuppliedWeights) {
val uid = Identifiable.randomUID("logreg-static")
lr.setInitialModel(new org.apache.spark.ml.classification.LogisticRegressionModel(
- uid, initialWeights, 1.0))
+ uid, initialWeights.asML, 1.0))
}
lr.setFitIntercept(addIntercept)
lr.setMaxIter(optimizer.getNumIterations())
@@ -439,7 +439,7 @@ class LogisticRegressionWithLBFGS
// Convert our input into a DataFrame
val sqlContext = new SQLContext(input.context)
import sqlContext.implicits._
- val df = input.toDF()
+ val df = input.map(_.asML).toDF()
// Determine if we should cache the DF
val handlePersistence = input.getStorageLevel == StorageLevel.NONE
// Train our model
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
index 076cca6016..5c9a112ca6 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
@@ -20,6 +20,7 @@ package org.apache.spark.mllib.linalg
import java.util.{Arrays, Random}
import scala.collection.mutable.{ArrayBuffer, ArrayBuilder => MArrayBuilder, HashSet => MHashSet}
+import scala.language.implicitConversions
import breeze.linalg.{CSCMatrix => BSM, DenseMatrix => BDM, Matrix => BM}
import com.github.fommil.netlib.BLAS.{getInstance => blas}
@@ -1212,3 +1213,24 @@ object Matrices {
SparseMatrix.fromML(sm)
}
}
+
+/**
+ * Implicit methods available in Scala for converting [[org.apache.spark.mllib.linalg.Matrix]] to
+ * [[org.apache.spark.ml.linalg.Matrix]] and vice versa.
+ */
+private[spark] object MatrixImplicits {
+
+ implicit def mllibMatrixToMLMatrix(m: Matrix): newlinalg.Matrix = m.asML
+
+ implicit def mllibDenseMatrixToMLDenseMatrix(m: DenseMatrix): newlinalg.DenseMatrix = m.asML
+
+ implicit def mllibSparseMatrixToMLSparseMatrix(m: SparseMatrix): newlinalg.SparseMatrix = m.asML
+
+ implicit def mlMatrixToMLlibMatrix(m: newlinalg.Matrix): Matrix = Matrices.fromML(m)
+
+ implicit def mlDenseMatrixToMLlibDenseMatrix(m: newlinalg.DenseMatrix): DenseMatrix =
+ Matrices.fromML(m).asInstanceOf[DenseMatrix]
+
+ implicit def mlSparseMatrixToMLlibSparseMatrix(m: newlinalg.SparseMatrix): SparseMatrix =
+ Matrices.fromML(m).asInstanceOf[SparseMatrix]
+}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
index 132e54a8c3..1f1cfa0cb2 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
@@ -22,6 +22,7 @@ import java.util
import scala.annotation.varargs
import scala.collection.JavaConverters._
+import scala.language.implicitConversions
import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => BV}
import org.json4s.DefaultFormats
@@ -926,3 +927,24 @@ object SparseVector {
new SparseVector(v.size, v.indices, v.values)
}
}
+
+/**
+ * Implicit methods available in Scala for converting [[org.apache.spark.mllib.linalg.Vector]] to
+ * [[org.apache.spark.ml.linalg.Vector]] and vice versa.
+ */
+private[spark] object VectorImplicits {
+
+ implicit def mllibVectorToMLVector(v: Vector): newlinalg.Vector = v.asML
+
+ implicit def mllibDenseVectorToMLDenseVector(v: DenseVector): newlinalg.DenseVector = v.asML
+
+ implicit def mllibSparseVectorToMLSparseVector(v: SparseVector): newlinalg.SparseVector = v.asML
+
+ implicit def mlVectorToMLlibVector(v: newlinalg.Vector): Vector = Vectors.fromML(v)
+
+ implicit def mlDenseVectorToMLlibDenseVector(v: newlinalg.DenseVector): DenseVector =
+ Vectors.fromML(v).asInstanceOf[DenseVector]
+
+ implicit def mlSparseVectorToMLlibSparseVector(v: newlinalg.SparseVector): SparseVector =
+ Vectors.fromML(v).asInstanceOf[SparseVector]
+}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala
index 45540f0c5c..f082b16b95 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala
@@ -20,6 +20,7 @@ package org.apache.spark.mllib.regression
import scala.beans.BeanInfo
import org.apache.spark.annotation.Since
+import org.apache.spark.ml.feature.{LabeledPoint => NewLabeledPoint}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.util.NumericParser
import org.apache.spark.SparkException
@@ -38,6 +39,10 @@ case class LabeledPoint @Since("1.0.0") (
override def toString: String = {
s"($label,$features)"
}
+
+ private[spark] def asML: NewLabeledPoint = {
+ NewLabeledPoint(label, features.asML)
+ }
}
/**
@@ -67,4 +72,8 @@ object LabeledPoint {
LabeledPoint(label, features)
}
}
+
+ private[spark] def fromML(point: NewLabeledPoint): LabeledPoint = {
+ LabeledPoint(point.label, Vectors.fromML(point.features))
+ }
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/GradientBoostedTrees.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/GradientBoostedTrees.scala
index 7fe60e2d99..ece1e41d98 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/GradientBoostedTrees.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/GradientBoostedTrees.scala
@@ -20,6 +20,7 @@ package org.apache.spark.mllib.tree
import org.apache.spark.annotation.Since
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.internal.Logging
+import org.apache.spark.ml.feature.{LabeledPoint => NewLabeledPoint}
import org.apache.spark.ml.tree.impl.{GradientBoostedTrees => NewGBT}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.configuration.BoostingStrategy
@@ -66,7 +67,9 @@ class GradientBoostedTrees private[spark] (
@Since("1.2.0")
def run(input: RDD[LabeledPoint]): GradientBoostedTreesModel = {
val algo = boostingStrategy.treeStrategy.algo
- val (trees, treeWeights) = NewGBT.run(input, boostingStrategy, seed.toLong)
+ val (trees, treeWeights) = NewGBT.run(input.map { point =>
+ NewLabeledPoint(point.label, point.features.asML)
+ }, boostingStrategy, seed.toLong)
new GradientBoostedTreesModel(algo, trees.map(_.toOld), treeWeights)
}
@@ -94,8 +97,11 @@ class GradientBoostedTrees private[spark] (
input: RDD[LabeledPoint],
validationInput: RDD[LabeledPoint]): GradientBoostedTreesModel = {
val algo = boostingStrategy.treeStrategy.algo
- val (trees, treeWeights) = NewGBT.runWithValidation(input, validationInput, boostingStrategy,
- seed.toLong)
+ val (trees, treeWeights) = NewGBT.runWithValidation(input.map { point =>
+ NewLabeledPoint(point.label, point.features.asML)
+ }, validationInput.map { point =>
+ NewLabeledPoint(point.label, point.features.asML)
+ }, boostingStrategy, seed.toLong)
new GradientBoostedTreesModel(algo, trees.map(_.toOld), treeWeights)
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/RandomForest.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/RandomForest.scala
index c31ed9c1ce..14f11ce51b 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/RandomForest.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/RandomForest.scala
@@ -90,8 +90,8 @@ private class RandomForest (
* @return RandomForestModel that can be used for prediction.
*/
def run(input: RDD[LabeledPoint]): RandomForestModel = {
- val trees: Array[NewDTModel] =
- NewRandomForest.run(input, strategy, numTrees, featureSubsetStrategy, seed.toLong, None)
+ val trees: Array[NewDTModel] = NewRandomForest.run(input.map(_.asML), strategy, numTrees,
+ featureSubsetStrategy, seed.toLong, None)
new RandomForestModel(strategy.algo, trees.map(_.toOld))
}