aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorXiangrui Meng <meng@databricks.com>2015-05-20 07:46:17 -0700
committerXiangrui Meng <meng@databricks.com>2015-05-20 07:46:17 -0700
commit589b12f8e62ec5d10713ce057756ebc791e7ddc6 (patch)
tree5752a53a6a4f60d31a9fc05da43b58a594a667e7 /mllib
parent3ddf051ee7256f642f8a17768d161c7b5f55c7e1 (diff)
downloadspark-589b12f8e62ec5d10713ce057756ebc791e7ddc6.tar.gz
spark-589b12f8e62ec5d10713ce057756ebc791e7ddc6.tar.bz2
spark-589b12f8e62ec5d10713ce057756ebc791e7ddc6.zip
[SPARK-7654] [MLLIB] Migrate MLlib to the DataFrame reader/writer API
parquetFile -> read.parquet rxin Author: Xiangrui Meng <meng@databricks.com> Closes #6281 from mengxr/SPARK-7654 and squashes the following commits: a79b612 [Xiangrui Meng] parquetFile -> read.parquet
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala4
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala4
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala2
10 files changed, 12 insertions, 12 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
index 53fb2cba03..cffe9ef1e0 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
@@ -153,7 +153,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
def load(sc: SparkContext, path: String): NaiveBayesModel = {
val sqlContext = new SQLContext(sc)
// Load Parquet data.
- val dataRDD = sqlContext.parquetFile(dataPath(path))
+ val dataRDD = sqlContext.read.parquet(dataPath(path))
// Check schema explicitly since erasure makes it hard to use match-case for checking.
checkSchema[Data](dataRDD.schema)
val dataArray = dataRDD.select("labels", "pi", "theta", "modelType").take(1)
@@ -199,7 +199,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
def load(sc: SparkContext, path: String): NaiveBayesModel = {
val sqlContext = new SQLContext(sc)
// Load Parquet data.
- val dataRDD = sqlContext.parquetFile(dataPath(path))
+ val dataRDD = sqlContext.read.parquet(dataPath(path))
// Check schema explicitly since erasure makes it hard to use match-case for checking.
checkSchema[Data](dataRDD.schema)
val dataArray = dataRDD.select("labels", "pi", "theta").take(1)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala
index d842ec57b2..fe09f6b75d 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala
@@ -75,7 +75,7 @@ private[classification] object GLMClassificationModel {
def loadData(sc: SparkContext, path: String, modelClass: String): Data = {
val datapath = Loader.dataPath(path)
val sqlContext = new SQLContext(sc)
- val dataRDD = sqlContext.parquetFile(datapath)
+ val dataRDD = sqlContext.read.parquet(datapath)
val dataArray = dataRDD.select("weights", "intercept", "threshold").take(1)
assert(dataArray.size == 1, s"Unable to load $modelClass data from: $datapath")
val data = dataArray(0)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
index 731b43a1be..86353aed81 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
@@ -132,7 +132,7 @@ object GaussianMixtureModel extends Loader[GaussianMixtureModel] {
def load(sc: SparkContext, path: String): GaussianMixtureModel = {
val dataPath = Loader.dataPath(path)
val sqlContext = new SQLContext(sc)
- val dataFrame = sqlContext.parquetFile(dataPath)
+ val dataFrame = sqlContext.read.parquet(dataPath)
val dataArray = dataFrame.select("weight", "mu", "sigma").collect()
// Check schema explicitly since erasure makes it hard to use match-case for checking.
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
index 252e166e85..8ecb3df11d 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
@@ -120,7 +120,7 @@ object KMeansModel extends Loader[KMeansModel] {
assert(className == thisClassName)
assert(formatVersion == thisFormatVersion)
val k = (metadata \ "k").extract[Int]
- val centriods = sqlContext.parquetFile(Loader.dataPath(path))
+ val centriods = sqlContext.read.parquet(Loader.dataPath(path))
Loader.checkSchema[Cluster](centriods.schema)
val localCentriods = centriods.map(Cluster.apply).collect()
assert(k == localCentriods.size)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
index f65f78299d..9106b73dfc 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
@@ -559,7 +559,7 @@ object Word2VecModel extends Loader[Word2VecModel] {
def load(sc: SparkContext, path: String): Word2VecModel = {
val dataPath = Loader.dataPath(path)
val sqlContext = new SQLContext(sc)
- val dataFrame = sqlContext.parquetFile(dataPath)
+ val dataFrame = sqlContext.read.parquet(dataPath)
val dataArray = dataFrame.select("word", "vector").collect()
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
index b960fbc5bf..93aa41e499 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
@@ -292,11 +292,11 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {
assert(className == thisClassName)
assert(formatVersion == thisFormatVersion)
val rank = (metadata \ "rank").extract[Int]
- val userFeatures = sqlContext.parquetFile(userPath(path))
+ val userFeatures = sqlContext.read.parquet(userPath(path))
.map { case Row(id: Int, features: Seq[_]) =>
(id, features.asInstanceOf[Seq[Double]].toArray)
}
- val productFeatures = sqlContext.parquetFile(productPath(path))
+ val productFeatures = sqlContext.read.parquet(productPath(path))
.map { case Row(id: Int, features: Seq[_]) =>
(id, features.asInstanceOf[Seq[Double]].toArray)
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
index 22b9b22a87..3ea63dd8c0 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
@@ -189,7 +189,7 @@ object IsotonicRegressionModel extends Loader[IsotonicRegressionModel] {
def load(sc: SparkContext, path: String): (Array[Double], Array[Double]) = {
val sqlContext = new SQLContext(sc)
- val dataRDD = sqlContext.parquetFile(dataPath(path))
+ val dataRDD = sqlContext.read.parquet(dataPath(path))
checkSchema[Data](dataRDD.schema)
val dataArray = dataRDD.select("boundary", "prediction").collect()
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala
index 2aa0e9ef96..317d3a5702 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala
@@ -72,7 +72,7 @@ private[regression] object GLMRegressionModel {
def loadData(sc: SparkContext, path: String, modelClass: String, numFeatures: Int): Data = {
val datapath = Loader.dataPath(path)
val sqlContext = new SQLContext(sc)
- val dataRDD = sqlContext.parquetFile(datapath)
+ val dataRDD = sqlContext.read.parquet(datapath)
val dataArray = dataRDD.select("weights", "intercept").take(1)
assert(dataArray.size == 1, s"Unable to load $modelClass data from: $datapath")
val data = dataArray(0)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
index a558f84c8d..25bb1453db 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
@@ -230,7 +230,7 @@ object DecisionTreeModel extends Loader[DecisionTreeModel] with Logging {
val datapath = Loader.dataPath(path)
val sqlContext = new SQLContext(sc)
// Load Parquet data.
- val dataRDD = sqlContext.parquetFile(datapath)
+ val dataRDD = sqlContext.read.parquet(datapath)
// Check schema explicitly since erasure makes it hard to use match-case for checking.
Loader.checkSchema[NodeData](dataRDD.schema)
val nodes = dataRDD.map(NodeData.apply)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala
index f9cd0140fe..1e3333d8d8 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala
@@ -437,7 +437,7 @@ private[tree] object TreeEnsembleModel extends Logging {
treeAlgo: String): Array[DecisionTreeModel] = {
val datapath = Loader.dataPath(path)
val sqlContext = new SQLContext(sc)
- val nodes = sqlContext.parquetFile(datapath).map(NodeData.apply)
+ val nodes = sqlContext.read.parquet(datapath).map(NodeData.apply)
val trees = constructTrees(nodes)
trees.map(new DecisionTreeModel(_, Algo.fromString(treeAlgo)))
}