diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-30 11:13:24 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-30 11:13:24 -0800 |
commit | f4e40661912af2a23e250a49f72f00675172e2de (patch) | |
tree | 97d40d041b08bf8a320f908e7b241cce9432c014 /streaming/src/test/java | |
parent | 6e43039614ed1ec55a134fb82fb3e8d4e80996ef (diff) | |
download | spark-f4e40661912af2a23e250a49f72f00675172e2de.tar.gz spark-f4e40661912af2a23e250a49f72f00675172e2de.tar.bz2 spark-f4e40661912af2a23e250a49f72f00675172e2de.zip |
Refactored kafka, flume, zeromq, mqtt as separate external projects, with their own self-contained scala API, java API, scala unit tests and java unit tests. Updated examples to use the external projects.
Diffstat (limited to 'streaming/src/test/java')
-rw-r--r-- | streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java | 80 | ||||
-rw-r--r-- | streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java | 46 |
2 files changed, 56 insertions, 70 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index daeb99f5b7..f4d26c0be6 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -17,22 +17,16 @@ package org.apache.spark.streaming; -import com.google.common.base.Optional; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.io.Files; - -import kafka.serializer.StringDecoder; +import scala.Tuple2; -import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; -import org.apache.spark.streaming.api.java.JavaDStreamLike; -import org.junit.After; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; +import java.io.*; +import java.util.*; -import scala.Tuple2; -import twitter4j.Status; +import com.google.common.base.Optional; +import com.google.common.collect.Lists; +import com.google.common.io.Files; import org.apache.spark.HashPartitioner; import org.apache.spark.api.java.JavaPairRDD; @@ -43,39 +37,11 @@ import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.apache.spark.streaming.dstream.SparkFlumeEvent; -import org.apache.spark.streaming.JavaTestUtils; -import org.apache.spark.streaming.JavaCheckpointTestUtils; - -import java.io.*; -import java.util.*; - -import akka.actor.Props; -import akka.zeromq.Subscribe; - // The test suite itself is Serializable so that anonymous Function implementations can be // serialized, as an alternative to converting these anonymous classes to static inner classes; // see http://stackoverflow.com/questions/758570/. -public class JavaAPISuite implements Serializable { - private transient JavaStreamingContext ssc; - - @Before - public void setUp() { - System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock"); - ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); - ssc.checkpoint("checkpoint"); - } - - @After - public void tearDown() { - ssc.stop(); - ssc = null; - - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown - System.clearProperty("spark.driver.port"); - } - +public class JavaAPISuite extends LocalJavaStreamingContext implements Serializable { @Test public void testCount() { List<List<Integer>> inputData = Arrays.asList( @@ -1597,26 +1563,6 @@ public class JavaAPISuite implements Serializable { // Java arguments and assign it to a JavaDStream without producing type errors. Testing of the // InputStream functionality is deferred to the existing Scala tests. @Test - public void testKafkaStream() { - HashMap<String, Integer> topics = Maps.newHashMap(); - JavaPairDStream<String, String> test1 = ssc.kafkaStream("localhost:12345", "group", topics); - JavaPairDStream<String, String> test2 = ssc.kafkaStream("localhost:12345", "group", topics, - StorageLevel.MEMORY_AND_DISK()); - - HashMap<String, String> kafkaParams = Maps.newHashMap(); - kafkaParams.put("zookeeper.connect","localhost:12345"); - kafkaParams.put("group.id","consumer-group"); - JavaPairDStream<String, String> test3 = ssc.kafkaStream( - String.class, - String.class, - StringDecoder.class, - StringDecoder.class, - kafkaParams, - topics, - StorageLevel.MEMORY_AND_DISK()); - } - - @Test public void testSocketTextStream() { JavaDStream<String> test = ssc.socketTextStream("localhost", 12345); } @@ -1654,16 +1600,10 @@ public class JavaAPISuite implements Serializable { public void testRawSocketStream() { JavaDStream<String> test = ssc.rawSocketStream("localhost", 12345); } - - @Test - public void testFlumeStream() { - JavaDStream<SparkFlumeEvent> test = ssc.flumeStream("localhost", 12345, StorageLevel.MEMORY_ONLY()); - } - + /* @Test public void testFileStream() { - JavaPairDStream<String, String> foo = - ssc.<String, String, SequenceFileInputFormat<String,String>>fileStream("/tmp/foo"); + JavaPairDStream<String, String> foo = ssc.<String, String, SequenceFileInputFormat<String,String>>fileStream("/tmp/foo"); } @Test @@ -1685,5 +1625,5 @@ public class JavaAPISuite implements Serializable { return null; } }); - } + } */ } diff --git a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java new file mode 100644 index 0000000000..34bee56885 --- /dev/null +++ b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming; + +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.junit.After; +import org.junit.Before; + +public abstract class LocalJavaStreamingContext { + + protected transient JavaStreamingContext ssc; + + @Before + public void setUp() { + System.clearProperty("spark.driver.port"); + System.clearProperty("spark.hostPort"); + System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock"); + ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); + ssc.checkpoint("checkpoint"); + } + + @After + public void tearDown() { + ssc.stop(); + ssc = null; + + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.driver.port"); + System.clearProperty("spark.hostPort"); + } +} |