aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorJeremy Freeman <the.freeman.lab@gmail.com>2014-08-01 20:10:26 -0700
committerXiangrui Meng <meng@databricks.com>2014-08-01 20:10:26 -0700
commitf6a1899306c5ad766fea122d3ab4b83436d9f6fd (patch)
treebab06f824c6001b8c7ac4f09f37dea3f54c2deb4 /examples
parente8e0fd691a06a2887fdcffb2217b96805ace0cb0 (diff)
downloadspark-f6a1899306c5ad766fea122d3ab4b83436d9f6fd.tar.gz
spark-f6a1899306c5ad766fea122d3ab4b83436d9f6fd.tar.bz2
spark-f6a1899306c5ad766fea122d3ab4b83436d9f6fd.zip
Streaming mllib [SPARK-2438][MLLIB]
This PR implements a streaming linear regression analysis, in which a linear regression model is trained online as new data arrive. The design is based on discussions with tdas and mengxr, in which we determined how to add this functionality in a general way, with minimal changes to existing libraries. __Summary of additions:__ _StreamingLinearAlgorithm_ - An abstract class for fitting generalized linear models online to streaming data, including training on (and updating) a model, and making predictions. _StreamingLinearRegressionWithSGD_ - Class and companion object for running streaming linear regression _StreamingLinearRegressionTestSuite_ - Unit tests _StreamingLinearRegression_ - Example use case: fitting a model online to data from one stream, and making predictions on other data __Notes__ - If this looks good, I can use the StreamingLinearAlgorithm class to easily implement other analyses that follow the same logic (Ridge, Lasso, Logistic, SVM). Author: Jeremy Freeman <the.freeman.lab@gmail.com> Author: freeman <the.freeman.lab@gmail.com> Closes #1361 from freeman-lab/streaming-mllib and squashes the following commits: 775ea29 [Jeremy Freeman] Throw error if user doesn't initialize weights 4086fee [Jeremy Freeman] Fixed current weight formatting 8b95b27 [Jeremy Freeman] Restored broadcasting 29f27ec [Jeremy Freeman] Formatting 8711c41 [Jeremy Freeman] Used return to avoid indentation 777b596 [Jeremy Freeman] Restored treeAggregate 74cf440 [Jeremy Freeman] Removed static methods d28cf9a [Jeremy Freeman] Added usage notes c3326e7 [Jeremy Freeman] Improved documentation 9541a41 [Jeremy Freeman] Merge remote-tracking branch 'upstream/master' into streaming-mllib 66eba5e [Jeremy Freeman] Fixed line lengths 2fe0720 [Jeremy Freeman] Minor cleanup 7d51378 [Jeremy Freeman] Moved streaming loader to MLUtils b9b69f6 [Jeremy Freeman] Added setter methods c3f8b5a [Jeremy Freeman] Modified logging 00aafdc [Jeremy Freeman] Add modifiers 14b801e [Jeremy Freeman] Name changes c7d38a3 [Jeremy Freeman] Move check for empty data to GradientDescent 4b0a5d3 [Jeremy Freeman] Cleaned up tests 74188d6 [Jeremy Freeman] Eliminate dependency on commons 50dd237 [Jeremy Freeman] Removed experimental tag 6bfe1e6 [Jeremy Freeman] Fixed imports a2a63ad [freeman] Makes convergence test more robust 86220bc [freeman] Streaming linear regression unit tests fb4683a [freeman] Minor changes for scalastyle consistency fd31e03 [freeman] Changed logging behavior 453974e [freeman] Fixed indentation c4b1143 [freeman] Streaming linear regression 604f4d7 [freeman] Expanded private class to include mllib d99aa85 [freeman] Helper methods for streaming MLlib apps 0898add [freeman] Added dependency on streaming
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala73
1 files changed, 73 insertions, 0 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala
new file mode 100644
index 0000000000..1fd37edfa7
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.mllib
+
+import org.apache.spark.mllib.linalg.Vectors
+import org.apache.spark.mllib.util.MLUtils
+import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
+import org.apache.spark.SparkConf
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+
+/**
+ * Train a linear regression model on one stream of data and make predictions
+ * on another stream, where the data streams arrive as text files
+ * into two different directories.
+ *
+ * The rows of the text files must be labeled data points in the form
+ * `(y,[x1,x2,x3,...,xn])`
+ * Where n is the number of features. n must be the same for train and test.
+ *
+ * Usage: StreamingLinearRegression <trainingDir> <testDir> <batchDuration> <numFeatures>
+ *
+ * To run on your local machine using the two directories `trainingDir` and `testDir`,
+ * with updates every 5 seconds, and 2 features per data point, call:
+ * $ bin/run-example \
+ * org.apache.spark.examples.mllib.StreamingLinearRegression trainingDir testDir 5 2
+ *
+ * As you add text files to `trainingDir` the model will continuously update.
+ * Anytime you add text files to `testDir`, you'll see predictions from the current model.
+ *
+ */
+object StreamingLinearRegression {
+
+ def main(args: Array[String]) {
+
+ if (args.length != 4) {
+ System.err.println(
+ "Usage: StreamingLinearRegression <trainingDir> <testDir> <batchDuration> <numFeatures>")
+ System.exit(1)
+ }
+
+ val conf = new SparkConf().setMaster("local").setAppName("StreamingLinearRegression")
+ val ssc = new StreamingContext(conf, Seconds(args(2).toLong))
+
+ val trainingData = MLUtils.loadStreamingLabeledPoints(ssc, args(0))
+ val testData = MLUtils.loadStreamingLabeledPoints(ssc, args(1))
+
+ val model = new StreamingLinearRegressionWithSGD()
+ .setInitialWeights(Vectors.dense(Array.fill[Double](args(3).toInt)(0)))
+
+ model.trainOn(trainingData)
+ model.predictOn(testData).print()
+
+ ssc.start()
+ ssc.awaitTermination()
+
+ }
+
+}