aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-10-24 22:18:53 -0700
committerPatrick Wendell <pwendell@gmail.com>2013-10-24 22:18:53 -0700
commitad5f579cbfb3f0c9834cc5fdd26ad0745f5f8abf (patch)
treede9fa966f4a5adc8da82eecc71ff289dd44d6b3d /streaming
parente5f6d5697b43ac89a50fb791f4b284409e75b1f4 (diff)
downloadspark-ad5f579cbfb3f0c9834cc5fdd26ad0745f5f8abf.tar.gz
spark-ad5f579cbfb3f0c9834cc5fdd26ad0745f5f8abf.tar.bz2
spark-ad5f579cbfb3f0c9834cc5fdd26ad0745f5f8abf.zip
Style fixes
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala18
1 files changed, 9 insertions, 9 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
index 780f7b823b..5e384eeee4 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
@@ -33,9 +33,9 @@ trait JavaTestBase extends TestSuiteBase {
* The stream will be derived from the supplied lists of Java objects.
**/
def attachTestInputStream[T](
- ssc: JavaStreamingContext,
- data: JList[JList[T]],
- numPartitions: Int) = {
+ ssc: JavaStreamingContext,
+ data: JList[JList[T]],
+ numPartitions: Int) = {
val seqData = data.map(Seq(_:_*))
implicit val cm: ClassManifest[T] =
@@ -50,7 +50,7 @@ trait JavaTestBase extends TestSuiteBase {
* [[org.apache.spark.streaming.TestOutputStream]].
**/
def attachTestOutputStream[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]](
- dstream: JavaDStreamLike[T, This, R]) =
+ dstream: JavaDStreamLike[T, This, R]) =
{
implicit val cm: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
@@ -66,7 +66,7 @@ trait JavaTestBase extends TestSuiteBase {
* Returns a list of items for each RDD.
*/
def runStreams[V](
- ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = {
+ ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = {
implicit val cm: ClassManifest[V] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput)
@@ -83,16 +83,16 @@ trait JavaTestBase extends TestSuiteBase {
* Returns a sequence of RDD's. Each RDD is represented as several sequences of items, each
* representing one partition.
*/
- def runStreamsWithPartitions[V](
- ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[JList[V]]] = {
+ def runStreamsWithPartitions[V](ssc: JavaStreamingContext, numBatches: Int,
+ numExpectedOutput: Int): JList[JList[JList[V]]] = {
implicit val cm: ClassManifest[V] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
val res = runStreamsWithPartitions[V](ssc.ssc, numBatches, numExpectedOutput)
val out = new ArrayList[JList[JList[V]]]()
- res.map(entry => {
+ res.map{entry =>
val lists = entry.map(new ArrayList[V](_))
out.append(new ArrayList[JList[V]](lists))
- })
+ }
out
}
}