aboutsummaryrefslogtreecommitdiff
path: root/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala
blob: 84764963b5f365ff14a7f7fa8601e56455a0cab1 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.mllib.regression

import org.apache.spark.annotation.Since
import org.apache.spark.mllib.linalg.Vector

/**
 * Train or predict a linear regression model on streaming data. Training uses
 * Stochastic Gradient Descent to update the model based on each new batch of
 * incoming data from a DStream (see `LinearRegressionWithSGD` for model equation)
 *
 * Each batch of data is assumed to be an RDD of LabeledPoints.
 * The number of data points per batch can vary, but the number
 * of features must be constant. An initial weight
 * vector must be provided.
 *
 * Use a builder pattern to construct a streaming linear regression
 * analysis in an application, like:
 *
 *  val model = new StreamingLinearRegressionWithSGD()
 *    .setStepSize(0.5)
 *    .setNumIterations(10)
 *    .setInitialWeights(Vectors.dense(...))
 *    .trainOn(DStream)
 */
@Since("1.1.0")
class StreamingLinearRegressionWithSGD private[mllib] (
    private var stepSize: Double,
    private var numIterations: Int,
    private var regParam: Double,
    private var miniBatchFraction: Double)
  extends StreamingLinearAlgorithm[LinearRegressionModel, LinearRegressionWithSGD]
  with Serializable {

  /**
   * Construct a StreamingLinearRegression object with default parameters:
   * {stepSize: 0.1, numIterations: 50, miniBatchFraction: 1.0}.
   * Initial weights must be set before using trainOn or predictOn
   * (see `StreamingLinearAlgorithm`)
   */
  @Since("1.1.0")
  def this() = this(0.1, 50, 0.0, 1.0)

  @Since("1.1.0")
  val algorithm = new LinearRegressionWithSGD(stepSize, numIterations, regParam, miniBatchFraction)

  protected var model: Option[LinearRegressionModel] = None

  /**
   * Set the step size for gradient descent. Default: 0.1.
   */
  @Since("1.1.0")
  def setStepSize(stepSize: Double): this.type = {
    this.algorithm.optimizer.setStepSize(stepSize)
    this
  }

  /**
   * Set the regularization parameter. Default: 0.0.
   */
  @Since("2.0.0")
  def setRegParam(regParam: Double): this.type = {
    this.algorithm.optimizer.setRegParam(regParam)
    this
  }

  /**
   * Set the number of iterations of gradient descent to run per update. Default: 50.
   */
  @Since("1.1.0")
  def setNumIterations(numIterations: Int): this.type = {
    this.algorithm.optimizer.setNumIterations(numIterations)
    this
  }

  /**
   * Set the fraction of each batch to use for updates. Default: 1.0.
   */
  @Since("1.1.0")
  def setMiniBatchFraction(miniBatchFraction: Double): this.type = {
    this.algorithm.optimizer.setMiniBatchFraction(miniBatchFraction)
    this
  }

  /**
   * Set the initial weights.
   */
  @Since("1.1.0")
  def setInitialWeights(initialWeights: Vector): this.type = {
    this.model = Some(algorithm.createModel(initialWeights, 0.0))
    this
  }

  /**
   * Set the convergence tolerance. Default: 0.001.
   */
  @Since("1.5.0")
  def setConvergenceTol(tolerance: Double): this.type = {
    this.algorithm.optimizer.setConvergenceTol(tolerance)
    this
  }
}