aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala7
1 files changed, 3 insertions, 4 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala
index 1fd37edfa7..0e992fa996 100644
--- a/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala
@@ -18,8 +18,7 @@
package org.apache.spark.examples.mllib
import org.apache.spark.mllib.linalg.Vectors
-import org.apache.spark.mllib.util.MLUtils
-import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
+import org.apache.spark.mllib.regression.{LabeledPoint, StreamingLinearRegressionWithSGD}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
@@ -56,8 +55,8 @@ object StreamingLinearRegression {
val conf = new SparkConf().setMaster("local").setAppName("StreamingLinearRegression")
val ssc = new StreamingContext(conf, Seconds(args(2).toLong))
- val trainingData = MLUtils.loadStreamingLabeledPoints(ssc, args(0))
- val testData = MLUtils.loadStreamingLabeledPoints(ssc, args(1))
+ val trainingData = ssc.textFileStream(args(0)).map(LabeledPoint.parse)
+ val testData = ssc.textFileStream(args(1)).map(LabeledPoint.parse)
val model = new StreamingLinearRegressionWithSGD()
.setInitialWeights(Vectors.dense(Array.fill[Double](args(3).toInt)(0)))