aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2013-04-25 14:38:01 +0530
committerPrashant Sharma <prashant.s@imaginea.com>2013-04-25 14:38:01 +0530
commitbb4102b0eefd7321d1fadf9df6db79c8dd9880fb (patch)
treefef0ff73ec457259742530068e9c06ac1dc1bf90 /streaming
parentad88f083a627ba38e99b1b135a82a1fcfd107444 (diff)
downloadspark-bb4102b0eefd7321d1fadf9df6db79c8dd9880fb.tar.gz
spark-bb4102b0eefd7321d1fadf9df6db79c8dd9880fb.tar.bz2
spark-bb4102b0eefd7321d1fadf9df6db79c8dd9880fb.zip
Fixed breaking tests in streaming checkpoint suite. Changed RichInt to Int as it is final and not serializable
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/scala/spark/streaming/CheckpointSuite.scala35
1 files changed, 19 insertions, 16 deletions
diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
index cac86deeaf..f9285b19e2 100644
--- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
@@ -1,16 +1,19 @@
package spark.streaming
-import dstream.FileInputDStream
-import spark.streaming.StreamingContext._
import java.io.File
-import runtime.RichInt
-import org.scalatest.BeforeAndAfter
+
+import scala.collection.mutable.ArrayBuffer
+
import org.apache.commons.io.FileUtils
-import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
-import util.{Clock, ManualClock}
-import scala.util.Random
+import org.scalatest.BeforeAndAfter
+
import com.google.common.io.Files
+import spark.streaming.StreamingContext.toPairDStreamFunctions
+import spark.streaming.dstream.FileInputDStream
+import spark.streaming.util.ManualClock
+
+
/**
* This test suites tests the checkpointing functionality of DStreams -
@@ -56,13 +59,13 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Setup the streams
val input = (1 to 10).map(_ => Seq("a")).toSeq
val operation = (st: DStream[String]) => {
- val updateFunc = (values: Seq[Int], state: Option[RichInt]) => {
- Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0)))
+ val updateFunc = (values: Seq[Int], state: Option[Int]) => {
+ Some((values.foldLeft(0)(_ + _) + state.getOrElse(0)))
}
st.map(x => (x, 1))
- .updateStateByKey[RichInt](updateFunc)
+ .updateStateByKey(updateFunc)
.checkpoint(stateStreamCheckpointInterval)
- .map(t => (t._1, t._2.self))
+ .map(t => (t._1, t._2))
}
var ssc = setupStreams(input, operation)
var stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head
@@ -162,13 +165,13 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
val input = (1 to 10).map(_ => Seq("a")).toSeq
val output = (1 to 10).map(x => Seq(("a", x))).toSeq
val operation = (st: DStream[String]) => {
- val updateFunc = (values: Seq[Int], state: Option[RichInt]) => {
- Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0)))
+ val updateFunc = (values: Seq[Int], state: Option[Int]) => {
+ Some((values.foldLeft(0)(_ + _) + state.getOrElse(0)))
}
st.map(x => (x, 1))
- .updateStateByKey[RichInt](updateFunc)
+ .updateStateByKey(updateFunc)
.checkpoint(batchDuration * 2)
- .map(t => (t._1, t._2.self))
+ .map(t => (t._1, t._2))
}
testCheckpointedOperation(input, operation, output, 7)
}
@@ -350,4 +353,4 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]]
outputStream.output
}
-} \ No newline at end of file
+}