diff options
Diffstat (limited to 'mllib/src')
-rw-r--r-- | mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala | 121 |
1 files changed, 67 insertions, 54 deletions
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala index 45e25eecf5..28489410f8 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala @@ -17,20 +17,19 @@ package org.apache.spark.mllib.regression -import java.io.File -import java.nio.charset.Charset - import scala.collection.mutable.ArrayBuffer -import com.google.common.io.Files import org.scalatest.FunSuite import org.apache.spark.mllib.linalg.Vectors -import org.apache.spark.mllib.util.{LinearDataGenerator, LocalSparkContext} -import org.apache.spark.streaming.{Milliseconds, StreamingContext} -import org.apache.spark.util.Utils +import org.apache.spark.mllib.util.LinearDataGenerator +import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.TestSuiteBase + +class StreamingLinearRegressionSuite extends FunSuite with TestSuiteBase { -class StreamingLinearRegressionSuite extends FunSuite with LocalSparkContext { + // use longer wait time to ensure job completion + override def maxWaitTimeMillis = 20000 // Assert that two values are equal within tolerance epsilon def assertEqual(v1: Double, v2: Double, epsilon: Double) { @@ -49,35 +48,26 @@ class StreamingLinearRegressionSuite extends FunSuite with LocalSparkContext { } // Test if we can accurately learn Y = 10*X1 + 10*X2 on streaming data - test("streaming linear regression parameter accuracy") { + test("parameter accuracy") { - val testDir = Files.createTempDir() - val numBatches = 10 - val batchDuration = Milliseconds(1000) - val ssc = new StreamingContext(sc, batchDuration) - val data = ssc.textFileStream(testDir.toString).map(LabeledPoint.parse) + // create model val model = new StreamingLinearRegressionWithSGD() .setInitialWeights(Vectors.dense(0.0, 0.0)) .setStepSize(0.1) - .setNumIterations(50) + .setNumIterations(25) - model.trainOn(data) - - ssc.start() - - // write data to a file stream - for (i <- 0 until numBatches) { - val samples = LinearDataGenerator.generateLinearInput( - 0.0, Array(10.0, 10.0), 100, 42 * (i + 1)) - val file = new File(testDir, i.toString) - Files.write(samples.map(x => x.toString).mkString("\n"), file, Charset.forName("UTF-8")) - Thread.sleep(batchDuration.milliseconds) + // generate sequence of simulated data + val numBatches = 10 + val input = (0 until numBatches).map { i => + LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), 100, 42 * (i + 1)) } - ssc.stop(stopSparkContext=false) - - System.clearProperty("spark.driver.port") - Utils.deleteRecursively(testDir) + // apply model training to input stream + val ssc = setupStreams(input, (inputDStream: DStream[LabeledPoint]) => { + model.trainOn(inputDStream) + inputDStream.count() + }) + runStreams(ssc, numBatches, numBatches) // check accuracy of final parameter estimates assertEqual(model.latestModel().intercept, 0.0, 0.1) @@ -91,39 +81,33 @@ class StreamingLinearRegressionSuite extends FunSuite with LocalSparkContext { } // Test that parameter estimates improve when learning Y = 10*X1 on streaming data - test("streaming linear regression parameter convergence") { + test("parameter convergence") { - val testDir = Files.createTempDir() - val batchDuration = Milliseconds(2000) - val ssc = new StreamingContext(sc, batchDuration) - val numBatches = 5 - val data = ssc.textFileStream(testDir.toString()).map(LabeledPoint.parse) + // create model val model = new StreamingLinearRegressionWithSGD() .setInitialWeights(Vectors.dense(0.0)) .setStepSize(0.1) - .setNumIterations(50) - - model.trainOn(data) - - ssc.start() + .setNumIterations(25) - // write data to a file stream - val history = new ArrayBuffer[Double](numBatches) - for (i <- 0 until numBatches) { - val samples = LinearDataGenerator.generateLinearInput(0.0, Array(10.0), 100, 42 * (i + 1)) - val file = new File(testDir, i.toString) - Files.write(samples.map(x => x.toString).mkString("\n"), file, Charset.forName("UTF-8")) - Thread.sleep(batchDuration.milliseconds) - // wait an extra few seconds to make sure the update finishes before new data arrive - Thread.sleep(4000) - history.append(math.abs(model.latestModel().weights(0) - 10.0)) + // generate sequence of simulated data + val numBatches = 10 + val input = (0 until numBatches).map { i => + LinearDataGenerator.generateLinearInput(0.0, Array(10.0), 100, 42 * (i + 1)) } - ssc.stop(stopSparkContext=false) + // create buffer to store intermediate fits + val history = new ArrayBuffer[Double](numBatches) - System.clearProperty("spark.driver.port") - Utils.deleteRecursively(testDir) + // apply model training to input stream, storing the intermediate results + // (we add a count to ensure the result is a DStream) + val ssc = setupStreams(input, (inputDStream: DStream[LabeledPoint]) => { + model.trainOn(inputDStream) + inputDStream.foreachRDD(x => history.append(math.abs(model.latestModel().weights(0) - 10.0))) + inputDStream.count() + }) + runStreams(ssc, numBatches, numBatches) + // compute change in error val deltas = history.drop(1).zip(history.dropRight(1)) // check error stability (it always either shrinks, or increases with small tol) assert(deltas.forall(x => (x._1 - x._2) <= 0.1)) @@ -132,4 +116,33 @@ class StreamingLinearRegressionSuite extends FunSuite with LocalSparkContext { } + // Test predictions on a stream + test("predictions") { + + // create model initialized with true weights + val model = new StreamingLinearRegressionWithSGD() + .setInitialWeights(Vectors.dense(10.0, 10.0)) + .setStepSize(0.1) + .setNumIterations(25) + + // generate sequence of simulated data for testing + val numBatches = 10 + val nPoints = 100 + val testInput = (0 until numBatches).map { i => + LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), nPoints, 42 * (i + 1)) + } + + // apply model predictions to test stream + val ssc = setupStreams(testInput, (inputDStream: DStream[LabeledPoint]) => { + model.predictOnValues(inputDStream.map(x => (x.label, x.features))) + }) + // collect the output as (true, estimated) tuples + val output: Seq[Seq[(Double, Double)]] = runStreams(ssc, numBatches, numBatches) + + // compute the mean absolute error and check that it's always less than 0.1 + val errors = output.map(batch => batch.map(p => math.abs(p._1 - p._2)).sum / nPoints) + assert(errors.forall(x => x <= 0.1)) + + } + } |