aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-05-16 15:03:57 -0700
committerReynold Xin <rxin@databricks.com>2015-05-16 15:03:57 -0700
commit161d0b4a41f453b21adde46a86e16c2743752799 (patch)
tree43519153e3a1cd636b2f8d8963e81ea62be5af15 /mllib
parent1b4e710e5cdb00febb4c5920d81e77c2e3966a8b (diff)
downloadspark-161d0b4a41f453b21adde46a86e16c2743752799.tar.gz
spark-161d0b4a41f453b21adde46a86e16c2743752799.tar.bz2
spark-161d0b4a41f453b21adde46a86e16c2743752799.zip
[SPARK-7654][MLlib] Migrate MLlib to the DataFrame reader/writer API.
Author: Reynold Xin <rxin@databricks.com> Closes #6211 from rxin/mllib-reader and squashes the following commits: 79a2cb9 [Reynold Xin] [SPARK-7654][MLlib] Migrate MLlib to the DataFrame reader/writer API.
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala4
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala4
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala4
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala2
11 files changed, 14 insertions, 14 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
index af24ab6166..ac0ebeceaa 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
@@ -140,7 +140,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
// Create Parquet data.
val dataRDD: DataFrame = sc.parallelize(Seq(data), 1).toDF()
- dataRDD.saveAsParquetFile(dataPath(path))
+ dataRDD.write.parquet(dataPath(path))
}
def load(sc: SparkContext, path: String): NaiveBayesModel = {
@@ -186,7 +186,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
// Create Parquet data.
val dataRDD: DataFrame = sc.parallelize(Seq(data), 1).toDF()
- dataRDD.saveAsParquetFile(dataPath(path))
+ dataRDD.write.parquet(dataPath(path))
}
def load(sc: SparkContext, path: String): NaiveBayesModel = {
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala
index 3b6790cce4..d842ec57b2 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala
@@ -62,7 +62,7 @@ private[classification] object GLMClassificationModel {
// Create Parquet data.
val data = Data(weights, intercept, threshold)
- sc.parallelize(Seq(data), 1).toDF().saveAsParquetFile(Loader.dataPath(path))
+ sc.parallelize(Seq(data), 1).toDF().write.parquet(Loader.dataPath(path))
}
/**
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
index c22862c130..731b43a1be 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
@@ -126,7 +126,7 @@ object GaussianMixtureModel extends Loader[GaussianMixtureModel] {
val dataArray = Array.tabulate(weights.length) { i =>
Data(weights(i), gaussians(i).mu, gaussians(i).sigma)
}
- sc.parallelize(dataArray, 1).toDF().saveAsParquetFile(Loader.dataPath(path))
+ sc.parallelize(dataArray, 1).toDF().write.parquet(Loader.dataPath(path))
}
def load(sc: SparkContext, path: String): GaussianMixtureModel = {
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
index ba228b11fc..252e166e85 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
@@ -110,7 +110,7 @@ object KMeansModel extends Loader[KMeansModel] {
val dataRDD = sc.parallelize(model.clusterCenters.zipWithIndex).map { case (point, id) =>
Cluster(id, point)
}.toDF()
- dataRDD.saveAsParquetFile(Loader.dataPath(path))
+ dataRDD.write.parquet(Loader.dataPath(path))
}
def load(sc: SparkContext, path: String): KMeansModel = {
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
index aa53e88d59..1ed01c9d8b 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
@@ -74,7 +74,7 @@ object PowerIterationClusteringModel extends Loader[PowerIterationClusteringMode
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
val dataRDD = model.assignments.toDF()
- dataRDD.saveAsParquetFile(Loader.dataPath(path))
+ dataRDD.write.parquet(Loader.dataPath(path))
}
def load(sc: SparkContext, path: String): PowerIterationClusteringModel = {
@@ -86,7 +86,7 @@ object PowerIterationClusteringModel extends Loader[PowerIterationClusteringMode
assert(formatVersion == thisFormatVersion)
val k = (metadata \ "k").extract[Int]
- val assignments = sqlContext.parquetFile(Loader.dataPath(path))
+ val assignments = sqlContext.read.parquet(Loader.dataPath(path))
Loader.checkSchema[PowerIterationClustering.Assignment](assignments.schema)
val assignmentsRDD = assignments.map {
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
index 98e83112f5..731f7576c2 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
@@ -580,7 +580,7 @@ object Word2VecModel extends Loader[Word2VecModel] {
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
val dataArray = model.toSeq.map { case (w, v) => Data(w, v) }
- sc.parallelize(dataArray.toSeq, 1).toDF().saveAsParquetFile(Loader.dataPath(path))
+ sc.parallelize(dataArray.toSeq, 1).toDF().write.parquet(Loader.dataPath(path))
}
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
index 88c2148403..b960fbc5bf 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
@@ -281,8 +281,8 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {
val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("rank" -> model.rank)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path))
- model.userFeatures.toDF("id", "features").saveAsParquetFile(userPath(path))
- model.productFeatures.toDF("id", "features").saveAsParquetFile(productPath(path))
+ model.userFeatures.toDF("id", "features").write.parquet(userPath(path))
+ model.productFeatures.toDF("id", "features").write.parquet(productPath(path))
}
def load(sc: SparkContext, path: String): MatrixFactorizationModel = {
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
index 4ce541ae5b..22b9b22a87 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
@@ -184,7 +184,7 @@ object IsotonicRegressionModel extends Loader[IsotonicRegressionModel] {
sqlContext.createDataFrame(
boundaries.toSeq.zip(predictions).map { case (b, p) => Data(b, p) }
- ).saveAsParquetFile(dataPath(path))
+ ).write.parquet(dataPath(path))
}
def load(sc: SparkContext, path: String): (Array[Double], Array[Double]) = {
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala
index b55944f74f..2aa0e9ef96 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala
@@ -60,7 +60,7 @@ private[regression] object GLMRegressionModel {
val data = Data(weights, intercept)
val dataRDD: DataFrame = sc.parallelize(Seq(data), 1).toDF()
// TODO: repartition with 1 partition after SPARK-5532 gets fixed
- dataRDD.saveAsParquetFile(Loader.dataPath(path))
+ dataRDD.write.parquet(Loader.dataPath(path))
}
/**
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
index 331af42853..a558f84c8d 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
@@ -223,7 +223,7 @@ object DecisionTreeModel extends Loader[DecisionTreeModel] with Logging {
val dataRDD: DataFrame = sc.parallelize(nodes)
.map(NodeData.apply(0, _))
.toDF()
- dataRDD.saveAsParquetFile(Loader.dataPath(path))
+ dataRDD.write.parquet(Loader.dataPath(path))
}
def load(sc: SparkContext, path: String, algo: String, numNodes: Int): DecisionTreeModel = {
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala
index 8341219bfa..f9cd0140fe 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala
@@ -414,7 +414,7 @@ private[tree] object TreeEnsembleModel extends Logging {
val dataRDD = sc.parallelize(model.trees.zipWithIndex).flatMap { case (tree, treeId) =>
tree.topNode.subtreeIterator.toSeq.map(node => NodeData(treeId, node))
}.toDF()
- dataRDD.saveAsParquetFile(Loader.dataPath(path))
+ dataRDD.write.parquet(Loader.dataPath(path))
}
/**