aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala1
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala (renamed from streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala)43
2 files changed, 8 insertions, 36 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
index 92e1b76d28..40434b1f9b 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
@@ -18,7 +18,6 @@
package org.apache.spark.streaming
import org.apache.spark.Logging
-import org.apache.spark.streaming.util.MasterFailureTest
import org.apache.spark.util.Utils
import java.io.File
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
index 98e17ff92e..c53c017060 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
@@ -15,20 +15,18 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.util
+package org.apache.spark.streaming
import org.apache.spark.Logging
-import org.apache.spark.rdd.RDD
-import org.apache.spark.streaming._
-import org.apache.spark.streaming.dstream.{DStream, ForEachDStream}
+import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.util.Utils
-import StreamingContext._
+import org.apache.spark.streaming.StreamingContext._
import scala.util.Random
-import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
-import java.io.{File, ObjectInputStream, IOException}
+import java.io.{File, IOException}
import java.nio.charset.Charset
import java.util.UUID
@@ -91,7 +89,7 @@ object MasterFailureTest extends Logging {
// Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ...
val input = (1 to numBatches).map(i => (1 to i).map(_ => "a").mkString(" ")).toSeq
// Expected output: time=1 ==> [ (a, 1) ] , time=2 ==> [ (a, 3) ] , time=3 ==> [ (a,6) ] , ...
- val expectedOutput = (1L to numBatches).map(i => (1L to i).reduce(_ + _)).map(j => ("a", j))
+ val expectedOutput = (1L to numBatches).map(i => (1L to i).sum).map(j => ("a", j))
val operation = (st: DStream[String]) => {
val updateFunc = (values: Seq[Long], state: Option[Long]) => {
@@ -218,7 +216,7 @@ object MasterFailureTest extends Logging {
while(!isLastOutputGenerated && !isTimedOut) {
// Get the output buffer
- val outputBuffer = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[T]].output
+ val outputBuffer = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[T]].output
def output = outputBuffer.flatMap(x => x)
// Start the thread to kill the streaming after some time
@@ -239,7 +237,7 @@ object MasterFailureTest extends Logging {
while (!killed && !isLastOutputGenerated && !isTimedOut) {
Thread.sleep(100)
timeRan = System.currentTimeMillis() - startTime
- isLastOutputGenerated = (!output.isEmpty && output.last == lastExpectedOutput)
+ isLastOutputGenerated = (output.nonEmpty && output.last == lastExpectedOutput)
isTimedOut = (timeRan + totalTimeRan > maxTimeToRun)
}
} catch {
@@ -314,31 +312,6 @@ object MasterFailureTest extends Logging {
}
/**
- * This is a output stream just for testing. All the output is collected into a
- * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
- */
-private[streaming]
-class TestOutputStream[T: ClassTag](
- parent: DStream[T],
- val output: ArrayBuffer[Seq[T]] = new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]]
- ) extends ForEachDStream[T](
- parent,
- (rdd: RDD[T], t: Time) => {
- val collected = rdd.collect()
- output += collected
- }
- ) {
-
- // This is to clear the output buffer every it is read from a checkpoint
- @throws(classOf[IOException])
- private def readObject(ois: ObjectInputStream) {
- ois.defaultReadObject()
- output.clear()
- }
-}
-
-
-/**
* Thread to kill streaming context after a random period of time.
*/
private[streaming]