diff options
author | Sean Owen <sowen@cloudera.com> | 2014-09-25 23:20:17 +0530 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-09-25 23:20:17 +0530 |
commit | c3f2a8588e19aab814ac5cdd86575bb5558d5e46 (patch) | |
tree | 27f9a895812e8b7a4a6522a1f10aff91968a359f /streaming | |
parent | b8487713d3bf288a4f6fc149e6ee4cc8196d6e7d (diff) | |
download | spark-c3f2a8588e19aab814ac5cdd86575bb5558d5e46.tar.gz spark-c3f2a8588e19aab814ac5cdd86575bb5558d5e46.tar.bz2 spark-c3f2a8588e19aab814ac5cdd86575bb5558d5e46.zip |
SPARK-2932 [STREAMING] Move MasterFailureTest out of "main" source directory
(HT @vanzin) Whatever the reason was for having this test class in `main`, if there is one, appear to be moot. This may have been a result of earlier streaming test reorganization.
This simply puts `MasterFailureTest` back under `test/`, removes some redundant copied code, and touches up a few tiny inspection warnings along the way.
Author: Sean Owen <sowen@cloudera.com>
Closes #2399 from srowen/SPARK-2932 and squashes the following commits:
3909411 [Sean Owen] Move MasterFailureTest to src/test, and remove redundant TestOutputStream
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala | 1 | ||||
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala (renamed from streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala) | 43 |
2 files changed, 8 insertions, 36 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala index 92e1b76d28..40434b1f9b 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala @@ -18,7 +18,6 @@ package org.apache.spark.streaming import org.apache.spark.Logging -import org.apache.spark.streaming.util.MasterFailureTest import org.apache.spark.util.Utils import java.io.File diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala index 98e17ff92e..c53c017060 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala @@ -15,20 +15,18 @@ * limitations under the License. */ -package org.apache.spark.streaming.util +package org.apache.spark.streaming import org.apache.spark.Logging -import org.apache.spark.rdd.RDD -import org.apache.spark.streaming._ -import org.apache.spark.streaming.dstream.{DStream, ForEachDStream} +import org.apache.spark.streaming.dstream.DStream import org.apache.spark.util.Utils -import StreamingContext._ +import org.apache.spark.streaming.StreamingContext._ import scala.util.Random -import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} +import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag -import java.io.{File, ObjectInputStream, IOException} +import java.io.{File, IOException} import java.nio.charset.Charset import java.util.UUID @@ -91,7 +89,7 @@ object MasterFailureTest extends Logging { // Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ... val input = (1 to numBatches).map(i => (1 to i).map(_ => "a").mkString(" ")).toSeq // Expected output: time=1 ==> [ (a, 1) ] , time=2 ==> [ (a, 3) ] , time=3 ==> [ (a,6) ] , ... - val expectedOutput = (1L to numBatches).map(i => (1L to i).reduce(_ + _)).map(j => ("a", j)) + val expectedOutput = (1L to numBatches).map(i => (1L to i).sum).map(j => ("a", j)) val operation = (st: DStream[String]) => { val updateFunc = (values: Seq[Long], state: Option[Long]) => { @@ -218,7 +216,7 @@ object MasterFailureTest extends Logging { while(!isLastOutputGenerated && !isTimedOut) { // Get the output buffer - val outputBuffer = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[T]].output + val outputBuffer = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[T]].output def output = outputBuffer.flatMap(x => x) // Start the thread to kill the streaming after some time @@ -239,7 +237,7 @@ object MasterFailureTest extends Logging { while (!killed && !isLastOutputGenerated && !isTimedOut) { Thread.sleep(100) timeRan = System.currentTimeMillis() - startTime - isLastOutputGenerated = (!output.isEmpty && output.last == lastExpectedOutput) + isLastOutputGenerated = (output.nonEmpty && output.last == lastExpectedOutput) isTimedOut = (timeRan + totalTimeRan > maxTimeToRun) } } catch { @@ -314,31 +312,6 @@ object MasterFailureTest extends Logging { } /** - * This is a output stream just for testing. All the output is collected into a - * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint. - */ -private[streaming] -class TestOutputStream[T: ClassTag]( - parent: DStream[T], - val output: ArrayBuffer[Seq[T]] = new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]] - ) extends ForEachDStream[T]( - parent, - (rdd: RDD[T], t: Time) => { - val collected = rdd.collect() - output += collected - } - ) { - - // This is to clear the output buffer every it is read from a checkpoint - @throws(classOf[IOException]) - private def readObject(ois: ObjectInputStream) { - ois.defaultReadObject() - output.clear() - } -} - - -/** * Thread to kill streaming context after a random period of time. */ private[streaming] |