aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorYanbo Liang <ybliang8@gmail.com>2016-04-29 09:42:54 -0700
committerXiangrui Meng <meng@databricks.com>2016-04-29 09:43:04 -0700
commit87ac84d43729c54be100bb9ad7dc6e8fa14b8805 (patch)
treed3fbb8c5996a10177fd3af3579d160b6278509ac /mllib
parenta7d0fedc940721d09350f2e57ae85591e0a3d90e (diff)
downloadspark-87ac84d43729c54be100bb9ad7dc6e8fa14b8805.tar.gz
spark-87ac84d43729c54be100bb9ad7dc6e8fa14b8805.tar.bz2
spark-87ac84d43729c54be100bb9ad7dc6e8fa14b8805.zip
[SPARK-14314][SPARK-14315][ML][SPARKR] Model persistence in SparkR (glm & kmeans)
SparkR ```glm``` and ```kmeans``` model persistence. Unit tests. Author: Yanbo Liang <ybliang8@gmail.com> Author: Gayathri Murali <gayathri.m.softie@gmail.com> Closes #12778 from yanboliang/spark-14311. Closes #12680 Closes #12683
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala1
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala181
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala65
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala1
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala4
5 files changed, 188 insertions, 64 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala
index a442469e4d..5462f80d69 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala
@@ -19,7 +19,6 @@ package org.apache.spark.ml.r
import org.apache.hadoop.fs.Path
import org.json4s._
-import org.json4s.DefaultFormats
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala
index f66323e36c..9618a3423e 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/GeneralizedLinearRegressionWrapper.scala
@@ -17,65 +17,34 @@
package org.apache.spark.ml.r
+import org.apache.hadoop.fs.Path
+import org.json4s._
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.attribute.AttributeGroup
import org.apache.spark.ml.feature.RFormula
import org.apache.spark.ml.regression._
+import org.apache.spark.ml.util._
import org.apache.spark.sql._
private[r] class GeneralizedLinearRegressionWrapper private (
- pipeline: PipelineModel,
- val features: Array[String]) {
+ val pipeline: PipelineModel,
+ val rFeatures: Array[String],
+ val rCoefficients: Array[Double],
+ val rDispersion: Double,
+ val rNullDeviance: Double,
+ val rDeviance: Double,
+ val rResidualDegreeOfFreedomNull: Long,
+ val rResidualDegreeOfFreedom: Long,
+ val rAic: Double,
+ val rNumIterations: Int,
+ val isLoaded: Boolean = false) extends MLWritable {
private val glm: GeneralizedLinearRegressionModel =
pipeline.stages(1).asInstanceOf[GeneralizedLinearRegressionModel]
- lazy val rFeatures: Array[String] = if (glm.getFitIntercept) {
- Array("(Intercept)") ++ features
- } else {
- features
- }
-
- lazy val rCoefficients: Array[Double] = if (glm.getFitIntercept) {
- Array(glm.intercept) ++ glm.coefficients.toArray ++
- rCoefficientStandardErrors ++ rTValues ++ rPValues
- } else {
- glm.coefficients.toArray ++ rCoefficientStandardErrors ++ rTValues ++ rPValues
- }
-
- private lazy val rCoefficientStandardErrors = if (glm.getFitIntercept) {
- Array(glm.summary.coefficientStandardErrors.last) ++
- glm.summary.coefficientStandardErrors.dropRight(1)
- } else {
- glm.summary.coefficientStandardErrors
- }
-
- private lazy val rTValues = if (glm.getFitIntercept) {
- Array(glm.summary.tValues.last) ++ glm.summary.tValues.dropRight(1)
- } else {
- glm.summary.tValues
- }
-
- private lazy val rPValues = if (glm.getFitIntercept) {
- Array(glm.summary.pValues.last) ++ glm.summary.pValues.dropRight(1)
- } else {
- glm.summary.pValues
- }
-
- lazy val rDispersion: Double = glm.summary.dispersion
-
- lazy val rNullDeviance: Double = glm.summary.nullDeviance
-
- lazy val rDeviance: Double = glm.summary.deviance
-
- lazy val rResidualDegreeOfFreedomNull: Long = glm.summary.residualDegreeOfFreedomNull
-
- lazy val rResidualDegreeOfFreedom: Long = glm.summary.residualDegreeOfFreedom
-
- lazy val rAic: Double = glm.summary.aic
-
- lazy val rNumIterations: Int = glm.summary.numIterations
-
lazy val rDevianceResiduals: DataFrame = glm.summary.residuals()
lazy val rFamily: String = glm.getFamily
@@ -85,9 +54,13 @@ private[r] class GeneralizedLinearRegressionWrapper private (
def transform(dataset: Dataset[_]): DataFrame = {
pipeline.transform(dataset).drop(glm.getFeaturesCol)
}
+
+ override def write: MLWriter =
+ new GeneralizedLinearRegressionWrapper.GeneralizedLinearRegressionWrapperWriter(this)
}
-private[r] object GeneralizedLinearRegressionWrapper {
+private[r] object GeneralizedLinearRegressionWrapper
+ extends MLReadable[GeneralizedLinearRegressionWrapper] {
def fit(
formula: String,
@@ -105,15 +78,119 @@ private[r] object GeneralizedLinearRegressionWrapper {
.attributes.get
val features = featureAttrs.map(_.name.get)
// assemble and fit the pipeline
- val glm = new GeneralizedLinearRegression()
+ val glr = new GeneralizedLinearRegression()
.setFamily(family)
.setLink(link)
.setFitIntercept(rFormula.hasIntercept)
.setTol(epsilon)
.setMaxIter(maxit)
val pipeline = new Pipeline()
- .setStages(Array(rFormulaModel, glm))
+ .setStages(Array(rFormulaModel, glr))
.fit(data)
- new GeneralizedLinearRegressionWrapper(pipeline, features)
+
+ val glm: GeneralizedLinearRegressionModel =
+ pipeline.stages(1).asInstanceOf[GeneralizedLinearRegressionModel]
+ val summary = glm.summary
+
+ val rFeatures: Array[String] = if (glm.getFitIntercept) {
+ Array("(Intercept)") ++ features
+ } else {
+ features
+ }
+
+ val rCoefficientStandardErrors = if (glm.getFitIntercept) {
+ Array(summary.coefficientStandardErrors.last) ++
+ summary.coefficientStandardErrors.dropRight(1)
+ } else {
+ summary.coefficientStandardErrors
+ }
+
+ val rTValues = if (glm.getFitIntercept) {
+ Array(summary.tValues.last) ++ summary.tValues.dropRight(1)
+ } else {
+ summary.tValues
+ }
+
+ val rPValues = if (glm.getFitIntercept) {
+ Array(summary.pValues.last) ++ summary.pValues.dropRight(1)
+ } else {
+ summary.pValues
+ }
+
+ val rCoefficients: Array[Double] = if (glm.getFitIntercept) {
+ Array(glm.intercept) ++ glm.coefficients.toArray ++
+ rCoefficientStandardErrors ++ rTValues ++ rPValues
+ } else {
+ glm.coefficients.toArray ++ rCoefficientStandardErrors ++ rTValues ++ rPValues
+ }
+
+ val rDispersion: Double = summary.dispersion
+ val rNullDeviance: Double = summary.nullDeviance
+ val rDeviance: Double = summary.deviance
+ val rResidualDegreeOfFreedomNull: Long = summary.residualDegreeOfFreedomNull
+ val rResidualDegreeOfFreedom: Long = summary.residualDegreeOfFreedom
+ val rAic: Double = summary.aic
+ val rNumIterations: Int = summary.numIterations
+
+ new GeneralizedLinearRegressionWrapper(pipeline, rFeatures, rCoefficients, rDispersion,
+ rNullDeviance, rDeviance, rResidualDegreeOfFreedomNull, rResidualDegreeOfFreedom,
+ rAic, rNumIterations)
+ }
+
+ override def read: MLReader[GeneralizedLinearRegressionWrapper] =
+ new GeneralizedLinearRegressionWrapperReader
+
+ override def load(path: String): GeneralizedLinearRegressionWrapper = super.load(path)
+
+ class GeneralizedLinearRegressionWrapperWriter(instance: GeneralizedLinearRegressionWrapper)
+ extends MLWriter {
+
+ override protected def saveImpl(path: String): Unit = {
+ val rMetadataPath = new Path(path, "rMetadata").toString
+ val pipelinePath = new Path(path, "pipeline").toString
+
+ val rMetadata = ("class" -> instance.getClass.getName) ~
+ ("rFeatures" -> instance.rFeatures.toSeq) ~
+ ("rCoefficients" -> instance.rCoefficients.toSeq) ~
+ ("rDispersion" -> instance.rDispersion) ~
+ ("rNullDeviance" -> instance.rNullDeviance) ~
+ ("rDeviance" -> instance.rDeviance) ~
+ ("rResidualDegreeOfFreedomNull" -> instance.rResidualDegreeOfFreedomNull) ~
+ ("rResidualDegreeOfFreedom" -> instance.rResidualDegreeOfFreedom) ~
+ ("rAic" -> instance.rAic) ~
+ ("rNumIterations" -> instance.rNumIterations)
+ val rMetadataJson: String = compact(render(rMetadata))
+ sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+
+ instance.pipeline.save(pipelinePath)
+ }
+ }
+
+ class GeneralizedLinearRegressionWrapperReader
+ extends MLReader[GeneralizedLinearRegressionWrapper] {
+
+ override def load(path: String): GeneralizedLinearRegressionWrapper = {
+ implicit val format = DefaultFormats
+ val rMetadataPath = new Path(path, "rMetadata").toString
+ val pipelinePath = new Path(path, "pipeline").toString
+
+ val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+ val rMetadata = parse(rMetadataStr)
+ val rFeatures = (rMetadata \ "rFeatures").extract[Array[String]]
+ val rCoefficients = (rMetadata \ "rCoefficients").extract[Array[Double]]
+ val rDispersion = (rMetadata \ "rDispersion").extract[Double]
+ val rNullDeviance = (rMetadata \ "rNullDeviance").extract[Double]
+ val rDeviance = (rMetadata \ "rDeviance").extract[Double]
+ val rResidualDegreeOfFreedomNull = (rMetadata \ "rResidualDegreeOfFreedomNull").extract[Long]
+ val rResidualDegreeOfFreedom = (rMetadata \ "rResidualDegreeOfFreedom").extract[Long]
+ val rAic = (rMetadata \ "rAic").extract[Double]
+ val rNumIterations = (rMetadata \ "rNumIterations").extract[Int]
+
+ val pipeline = PipelineModel.load(pipelinePath)
+
+ new GeneralizedLinearRegressionWrapper(pipeline, rFeatures, rCoefficients, rDispersion,
+ rNullDeviance, rDeviance, rResidualDegreeOfFreedomNull, rResidualDegreeOfFreedom,
+ rAic, rNumIterations, isLoaded = true)
+ }
}
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala
index 9e2b81ee20..f67760d3ca 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala
@@ -17,28 +17,30 @@
package org.apache.spark.ml.r
+import org.apache.hadoop.fs.Path
+import org.json4s._
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.attribute.AttributeGroup
import org.apache.spark.ml.clustering.{KMeans, KMeansModel}
import org.apache.spark.ml.feature.VectorAssembler
+import org.apache.spark.ml.util._
import org.apache.spark.sql.{DataFrame, Dataset}
private[r] class KMeansWrapper private (
- pipeline: PipelineModel) {
+ val pipeline: PipelineModel,
+ val features: Array[String],
+ val size: Array[Long],
+ val isLoaded: Boolean = false) extends MLWritable {
private val kMeansModel: KMeansModel = pipeline.stages(1).asInstanceOf[KMeansModel]
lazy val coefficients: Array[Double] = kMeansModel.clusterCenters.flatMap(_.toArray)
- private lazy val attrs = AttributeGroup.fromStructField(
- kMeansModel.summary.predictions.schema(kMeansModel.getFeaturesCol))
-
- lazy val features: Array[String] = attrs.attributes.get.map(_.name.get)
-
lazy val k: Int = kMeansModel.getK
- lazy val size: Array[Long] = kMeansModel.summary.clusterSizes
-
lazy val cluster: DataFrame = kMeansModel.summary.cluster
def fitted(method: String): DataFrame = {
@@ -56,9 +58,10 @@ private[r] class KMeansWrapper private (
pipeline.transform(dataset).drop(kMeansModel.getFeaturesCol)
}
+ override def write: MLWriter = new KMeansWrapper.KMeansWrapperWriter(this)
}
-private[r] object KMeansWrapper {
+private[r] object KMeansWrapper extends MLReadable[KMeansWrapper] {
def fit(
data: DataFrame,
@@ -80,6 +83,48 @@ private[r] object KMeansWrapper {
.setStages(Array(assembler, kMeans))
.fit(data)
- new KMeansWrapper(pipeline)
+ val kMeansModel: KMeansModel = pipeline.stages(1).asInstanceOf[KMeansModel]
+ val attrs = AttributeGroup.fromStructField(
+ kMeansModel.summary.predictions.schema(kMeansModel.getFeaturesCol))
+ val features: Array[String] = attrs.attributes.get.map(_.name.get)
+ val size: Array[Long] = kMeansModel.summary.clusterSizes
+
+ new KMeansWrapper(pipeline, features, size)
+ }
+
+ override def read: MLReader[KMeansWrapper] = new KMeansWrapperReader
+
+ override def load(path: String): KMeansWrapper = super.load(path)
+
+ class KMeansWrapperWriter(instance: KMeansWrapper) extends MLWriter {
+
+ override protected def saveImpl(path: String): Unit = {
+ val rMetadataPath = new Path(path, "rMetadata").toString
+ val pipelinePath = new Path(path, "pipeline").toString
+
+ val rMetadata = ("class" -> instance.getClass.getName) ~
+ ("features" -> instance.features.toSeq) ~
+ ("size" -> instance.size.toSeq)
+ val rMetadataJson: String = compact(render(rMetadata))
+
+ sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+ instance.pipeline.save(pipelinePath)
+ }
+ }
+
+ class KMeansWrapperReader extends MLReader[KMeansWrapper] {
+
+ override def load(path: String): KMeansWrapper = {
+ implicit val format = DefaultFormats
+ val rMetadataPath = new Path(path, "rMetadata").toString
+ val pipelinePath = new Path(path, "pipeline").toString
+ val pipeline = PipelineModel.load(pipelinePath)
+
+ val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+ val rMetadata = parse(rMetadataStr)
+ val features = (rMetadata \ "features").extract[Array[String]]
+ val size = (rMetadata \ "size").extract[Array[Long]]
+ new KMeansWrapper(pipeline, features, size, isLoaded = true)
+ }
}
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala
index 27c7e72881..28925c79da 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/NaiveBayesWrapper.scala
@@ -19,7 +19,6 @@ package org.apache.spark.ml.r
import org.apache.hadoop.fs.Path
import org.json4s._
-import org.json4s.DefaultFormats
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala b/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala
index 06baedf2a2..9c0757941e 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala
@@ -40,6 +40,10 @@ private[r] object RWrappers extends MLReader[Object] {
case "org.apache.spark.ml.r.NaiveBayesWrapper" => NaiveBayesWrapper.load(path)
case "org.apache.spark.ml.r.AFTSurvivalRegressionWrapper" =>
AFTSurvivalRegressionWrapper.load(path)
+ case "org.apache.spark.ml.r.GeneralizedLinearRegressionWrapper" =>
+ GeneralizedLinearRegressionWrapper.load(path)
+ case "org.apache.spark.ml.r.KMeansWrapper" =>
+ KMeansWrapper.load(path)
case _ =>
throw new SparkException(s"SparkR ml.load does not support load $className")
}