aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
authorHolden Karau <holden@us.ibm.com>2016-02-22 09:50:51 +0000
committerSean Owen <sowen@cloudera.com>2016-02-22 09:50:51 +0000
commit1b144455b620861d8cc790d3fc69902717f14524 (patch)
treee8ba4ca005a266351a23a5baad28afc1f761efb9 /streaming/src/test
parentef1047fca789e5470b7b12974f0435d6d1c4f2d5 (diff)
downloadspark-1b144455b620861d8cc790d3fc69902717f14524.tar.gz
spark-1b144455b620861d8cc790d3fc69902717f14524.tar.bz2
spark-1b144455b620861d8cc790d3fc69902717f14524.zip
[SPARK-13399][STREAMING] Fix checkpointsuite type erasure warnings
## What changes were proposed in this pull request? Change the checkpointsuite getting the outputstreams to explicitly be unchecked on the generic type so as to avoid the warnings. This only impacts test code. Alternatively we could encode the type tag in the TestOutputStreamWithPartitions and filter the type tag as well - but this is unnecessary since multiple testoutputstreams are not registered and the previous code was not actually checking this type. ## How was the this patch tested? unit tests (streaming/testOnly org.apache.spark.streaming.CheckpointSuite) Author: Holden Karau <holden@us.ibm.com> Closes #11286 from holdenk/SPARK-13399-checkpointsuite-type-erasure.
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala19
1 files changed, 13 insertions, 6 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index dada495843..ca716cf4e6 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -133,6 +133,17 @@ trait DStreamCheckpointTester { self: SparkFunSuite =>
new StreamingContext(SparkContext.getOrCreate(conf), batchDuration)
}
+ /**
+ * Get the first TestOutputStreamWithPartitions, does not check the provided generic type.
+ */
+ protected def getTestOutputStream[V: ClassTag](streams: Array[DStream[_]]):
+ TestOutputStreamWithPartitions[V] = {
+ streams.collect {
+ case ds: TestOutputStreamWithPartitions[V @unchecked] => ds
+ }.head
+ }
+
+
protected def generateOutput[V: ClassTag](
ssc: StreamingContext,
targetBatchTime: Time,
@@ -150,9 +161,7 @@ trait DStreamCheckpointTester { self: SparkFunSuite =>
clock.setTime(targetBatchTime.milliseconds)
logInfo("Manual clock after advancing = " + clock.getTimeMillis())
- val outputStream = ssc.graph.getOutputStreams().filter { dstream =>
- dstream.isInstanceOf[TestOutputStreamWithPartitions[V]]
- }.head.asInstanceOf[TestOutputStreamWithPartitions[V]]
+ val outputStream = getTestOutputStream[V](ssc.graph.getOutputStreams())
eventually(timeout(10 seconds)) {
ssc.awaitTerminationOrTimeout(10)
@@ -908,9 +917,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
logInfo("Manual clock after advancing = " + clock.getTimeMillis())
Thread.sleep(batchDuration.milliseconds)
- val outputStream = ssc.graph.getOutputStreams().filter { dstream =>
- dstream.isInstanceOf[TestOutputStreamWithPartitions[V]]
- }.head.asInstanceOf[TestOutputStreamWithPartitions[V]]
+ val outputStream = getTestOutputStream[V](ssc.graph.getOutputStreams())
outputStream.output.asScala.map(_.flatten)
}
}