aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-07-30 15:39:46 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-07-30 15:39:46 -0700
commit0dbd6963d589a8f6ad344273f3da7df680ada515 (patch)
treeb631e05cb48185de381418dd007494cffe80bd88 /mllib
parent89cda69ecd5ef942a68ad13fc4e1f4184010f087 (diff)
downloadspark-0dbd6963d589a8f6ad344273f3da7df680ada515.tar.gz
spark-0dbd6963d589a8f6ad344273f3da7df680ada515.tar.bz2
spark-0dbd6963d589a8f6ad344273f3da7df680ada515.zip
[SPARK-9479] [STREAMING] [TESTS] Fix ReceiverTrackerSuite failure for maven build and other potential test failures in Streaming
See https://issues.apache.org/jira/browse/SPARK-9479 for the failure cause. The PR includes the following changes: 1. Make ReceiverTrackerSuite create StreamingContext in the test body. 2. Fix places that don't stop StreamingContext. I verified no SparkContext was stopped in the shutdown hook locally after this fix. 3. Fix an issue that `ReceiverTracker.endpoint` may be null. 4. Make sure stopping SparkContext in non-main thread won't fail other tests. Author: zsxwing <zsxwing@gmail.com> Closes #7797 from zsxwing/fix-ReceiverTrackerSuite and squashes the following commits: 3a4bb98 [zsxwing] Fix another potential NPE d7497df [zsxwing] Fix ReceiverTrackerSuite; make sure StreamingContext in tests is closed
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala21
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/clustering/StreamingKMeansSuite.scala17
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala21
3 files changed, 43 insertions, 16 deletions
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala
index fd653296c9..d7b291d5a6 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala
@@ -24,13 +24,22 @@ import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.TestingUtils._
import org.apache.spark.streaming.dstream.DStream
-import org.apache.spark.streaming.TestSuiteBase
+import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
class StreamingLogisticRegressionSuite extends SparkFunSuite with TestSuiteBase {
// use longer wait time to ensure job completion
override def maxWaitTimeMillis: Int = 30000
+ var ssc: StreamingContext = _
+
+ override def afterFunction() {
+ super.afterFunction()
+ if (ssc != null) {
+ ssc.stop()
+ }
+ }
+
// Test if we can accurately learn B for Y = logistic(BX) on streaming data
test("parameter accuracy") {
@@ -50,7 +59,7 @@ class StreamingLogisticRegressionSuite extends SparkFunSuite with TestSuiteBase
}
// apply model training to input stream
- val ssc = setupStreams(input, (inputDStream: DStream[LabeledPoint]) => {
+ ssc = setupStreams(input, (inputDStream: DStream[LabeledPoint]) => {
model.trainOn(inputDStream)
inputDStream.count()
})
@@ -84,7 +93,7 @@ class StreamingLogisticRegressionSuite extends SparkFunSuite with TestSuiteBase
// apply model training to input stream, storing the intermediate results
// (we add a count to ensure the result is a DStream)
- val ssc = setupStreams(input, (inputDStream: DStream[LabeledPoint]) => {
+ ssc = setupStreams(input, (inputDStream: DStream[LabeledPoint]) => {
model.trainOn(inputDStream)
inputDStream.foreachRDD(x => history.append(math.abs(model.latestModel().weights(0) - B)))
inputDStream.count()
@@ -118,7 +127,7 @@ class StreamingLogisticRegressionSuite extends SparkFunSuite with TestSuiteBase
}
// apply model predictions to test stream
- val ssc = setupStreams(testInput, (inputDStream: DStream[LabeledPoint]) => {
+ ssc = setupStreams(testInput, (inputDStream: DStream[LabeledPoint]) => {
model.predictOnValues(inputDStream.map(x => (x.label, x.features)))
})
@@ -147,7 +156,7 @@ class StreamingLogisticRegressionSuite extends SparkFunSuite with TestSuiteBase
}
// train and predict
- val ssc = setupStreams(testInput, (inputDStream: DStream[LabeledPoint]) => {
+ ssc = setupStreams(testInput, (inputDStream: DStream[LabeledPoint]) => {
model.trainOn(inputDStream)
model.predictOnValues(inputDStream.map(x => (x.label, x.features)))
})
@@ -167,7 +176,7 @@ class StreamingLogisticRegressionSuite extends SparkFunSuite with TestSuiteBase
.setNumIterations(10)
val numBatches = 10
val emptyInput = Seq.empty[Seq[LabeledPoint]]
- val ssc = setupStreams(emptyInput,
+ ssc = setupStreams(emptyInput,
(inputDStream: DStream[LabeledPoint]) => {
model.trainOn(inputDStream)
model.predictOnValues(inputDStream.map(x => (x.label, x.features)))
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/StreamingKMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/StreamingKMeansSuite.scala
index ac01622b8a..3645d29dcc 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/StreamingKMeansSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/StreamingKMeansSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.mllib.clustering
import org.apache.spark.SparkFunSuite
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.util.TestingUtils._
-import org.apache.spark.streaming.TestSuiteBase
+import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.util.random.XORShiftRandom
@@ -28,6 +28,15 @@ class StreamingKMeansSuite extends SparkFunSuite with TestSuiteBase {
override def maxWaitTimeMillis: Int = 30000
+ var ssc: StreamingContext = _
+
+ override def afterFunction() {
+ super.afterFunction()
+ if (ssc != null) {
+ ssc.stop()
+ }
+ }
+
test("accuracy for single center and equivalence to grand average") {
// set parameters
val numBatches = 10
@@ -46,7 +55,7 @@ class StreamingKMeansSuite extends SparkFunSuite with TestSuiteBase {
val (input, centers) = StreamingKMeansDataGenerator(numPoints, numBatches, k, d, r, 42)
// setup and run the model training
- val ssc = setupStreams(input, (inputDStream: DStream[Vector]) => {
+ ssc = setupStreams(input, (inputDStream: DStream[Vector]) => {
model.trainOn(inputDStream)
inputDStream.count()
})
@@ -82,7 +91,7 @@ class StreamingKMeansSuite extends SparkFunSuite with TestSuiteBase {
val (input, centers) = StreamingKMeansDataGenerator(numPoints, numBatches, k, d, r, 42)
// setup and run the model training
- val ssc = setupStreams(input, (inputDStream: DStream[Vector]) => {
+ ssc = setupStreams(input, (inputDStream: DStream[Vector]) => {
kMeans.trainOn(inputDStream)
inputDStream.count()
})
@@ -114,7 +123,7 @@ class StreamingKMeansSuite extends SparkFunSuite with TestSuiteBase {
StreamingKMeansDataGenerator(numPoints, numBatches, k, d, r, 42, Array(Vectors.dense(0.0)))
// setup and run the model training
- val ssc = setupStreams(input, (inputDStream: DStream[Vector]) => {
+ ssc = setupStreams(input, (inputDStream: DStream[Vector]) => {
kMeans.trainOn(inputDStream)
inputDStream.count()
})
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala
index a2a4c5f6b8..34c07ed170 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala
@@ -22,14 +22,23 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SparkFunSuite
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.LinearDataGenerator
+import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
import org.apache.spark.streaming.dstream.DStream
-import org.apache.spark.streaming.TestSuiteBase
class StreamingLinearRegressionSuite extends SparkFunSuite with TestSuiteBase {
// use longer wait time to ensure job completion
override def maxWaitTimeMillis: Int = 20000
+ var ssc: StreamingContext = _
+
+ override def afterFunction() {
+ super.afterFunction()
+ if (ssc != null) {
+ ssc.stop()
+ }
+ }
+
// Assert that two values are equal within tolerance epsilon
def assertEqual(v1: Double, v2: Double, epsilon: Double) {
def errorMessage = v1.toString + " did not equal " + v2.toString
@@ -62,7 +71,7 @@ class StreamingLinearRegressionSuite extends SparkFunSuite with TestSuiteBase {
}
// apply model training to input stream
- val ssc = setupStreams(input, (inputDStream: DStream[LabeledPoint]) => {
+ ssc = setupStreams(input, (inputDStream: DStream[LabeledPoint]) => {
model.trainOn(inputDStream)
inputDStream.count()
})
@@ -98,7 +107,7 @@ class StreamingLinearRegressionSuite extends SparkFunSuite with TestSuiteBase {
// apply model training to input stream, storing the intermediate results
// (we add a count to ensure the result is a DStream)
- val ssc = setupStreams(input, (inputDStream: DStream[LabeledPoint]) => {
+ ssc = setupStreams(input, (inputDStream: DStream[LabeledPoint]) => {
model.trainOn(inputDStream)
inputDStream.foreachRDD(x => history.append(math.abs(model.latestModel().weights(0) - 10.0)))
inputDStream.count()
@@ -129,7 +138,7 @@ class StreamingLinearRegressionSuite extends SparkFunSuite with TestSuiteBase {
}
// apply model predictions to test stream
- val ssc = setupStreams(testInput, (inputDStream: DStream[LabeledPoint]) => {
+ ssc = setupStreams(testInput, (inputDStream: DStream[LabeledPoint]) => {
model.predictOnValues(inputDStream.map(x => (x.label, x.features)))
})
// collect the output as (true, estimated) tuples
@@ -156,7 +165,7 @@ class StreamingLinearRegressionSuite extends SparkFunSuite with TestSuiteBase {
}
// train and predict
- val ssc = setupStreams(testInput, (inputDStream: DStream[LabeledPoint]) => {
+ ssc = setupStreams(testInput, (inputDStream: DStream[LabeledPoint]) => {
model.trainOn(inputDStream)
model.predictOnValues(inputDStream.map(x => (x.label, x.features)))
})
@@ -177,7 +186,7 @@ class StreamingLinearRegressionSuite extends SparkFunSuite with TestSuiteBase {
val numBatches = 10
val nPoints = 100
val emptyInput = Seq.empty[Seq[LabeledPoint]]
- val ssc = setupStreams(emptyInput,
+ ssc = setupStreams(emptyInput,
(inputDStream: DStream[LabeledPoint]) => {
model.trainOn(inputDStream)
model.predictOnValues(inputDStream.map(x => (x.label, x.features)))