aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2014-09-25 23:20:17 +0530
committerTathagata Das <tathagata.das1565@gmail.com>2014-09-25 23:20:17 +0530
commitc3f2a8588e19aab814ac5cdd86575bb5558d5e46 (patch)
tree27f9a895812e8b7a4a6522a1f10aff91968a359f /streaming
parentb8487713d3bf288a4f6fc149e6ee4cc8196d6e7d (diff)
downloadspark-c3f2a8588e19aab814ac5cdd86575bb5558d5e46.tar.gz
spark-c3f2a8588e19aab814ac5cdd86575bb5558d5e46.tar.bz2
spark-c3f2a8588e19aab814ac5cdd86575bb5558d5e46.zip
SPARK-2932 [STREAMING] Move MasterFailureTest out of "main" source directory
(HT @vanzin) Whatever the reason was for having this test class in `main`, if there is one, appear to be moot. This may have been a result of earlier streaming test reorganization. This simply puts `MasterFailureTest` back under `test/`, removes some redundant copied code, and touches up a few tiny inspection warnings along the way. Author: Sean Owen <sowen@cloudera.com> Closes #2399 from srowen/SPARK-2932 and squashes the following commits: 3909411 [Sean Owen] Move MasterFailureTest to src/test, and remove redundant TestOutputStream
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala1
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala (renamed from streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala)43
2 files changed, 8 insertions, 36 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
index 92e1b76d28..40434b1f9b 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
@@ -18,7 +18,6 @@
package org.apache.spark.streaming
import org.apache.spark.Logging
-import org.apache.spark.streaming.util.MasterFailureTest
import org.apache.spark.util.Utils
import java.io.File
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
index 98e17ff92e..c53c017060 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
@@ -15,20 +15,18 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.util
+package org.apache.spark.streaming
import org.apache.spark.Logging
-import org.apache.spark.rdd.RDD
-import org.apache.spark.streaming._
-import org.apache.spark.streaming.dstream.{DStream, ForEachDStream}
+import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.util.Utils
-import StreamingContext._
+import org.apache.spark.streaming.StreamingContext._
import scala.util.Random
-import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
-import java.io.{File, ObjectInputStream, IOException}
+import java.io.{File, IOException}
import java.nio.charset.Charset
import java.util.UUID
@@ -91,7 +89,7 @@ object MasterFailureTest extends Logging {
// Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ...
val input = (1 to numBatches).map(i => (1 to i).map(_ => "a").mkString(" ")).toSeq
// Expected output: time=1 ==> [ (a, 1) ] , time=2 ==> [ (a, 3) ] , time=3 ==> [ (a,6) ] , ...
- val expectedOutput = (1L to numBatches).map(i => (1L to i).reduce(_ + _)).map(j => ("a", j))
+ val expectedOutput = (1L to numBatches).map(i => (1L to i).sum).map(j => ("a", j))
val operation = (st: DStream[String]) => {
val updateFunc = (values: Seq[Long], state: Option[Long]) => {
@@ -218,7 +216,7 @@ object MasterFailureTest extends Logging {
while(!isLastOutputGenerated && !isTimedOut) {
// Get the output buffer
- val outputBuffer = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[T]].output
+ val outputBuffer = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[T]].output
def output = outputBuffer.flatMap(x => x)
// Start the thread to kill the streaming after some time
@@ -239,7 +237,7 @@ object MasterFailureTest extends Logging {
while (!killed && !isLastOutputGenerated && !isTimedOut) {
Thread.sleep(100)
timeRan = System.currentTimeMillis() - startTime
- isLastOutputGenerated = (!output.isEmpty && output.last == lastExpectedOutput)
+ isLastOutputGenerated = (output.nonEmpty && output.last == lastExpectedOutput)
isTimedOut = (timeRan + totalTimeRan > maxTimeToRun)
}
} catch {
@@ -314,31 +312,6 @@ object MasterFailureTest extends Logging {
}
/**
- * This is a output stream just for testing. All the output is collected into a
- * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
- */
-private[streaming]
-class TestOutputStream[T: ClassTag](
- parent: DStream[T],
- val output: ArrayBuffer[Seq[T]] = new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]]
- ) extends ForEachDStream[T](
- parent,
- (rdd: RDD[T], t: Time) => {
- val collected = rdd.collect()
- output += collected
- }
- ) {
-
- // This is to clear the output buffer every it is read from a checkpoint
- @throws(classOf[IOException])
- private def readObject(ois: ObjectInputStream) {
- ois.defaultReadObject()
- output.clear()
- }
-}
-
-
-/**
* Thread to kill streaming context after a random period of time.
*/
private[streaming]