diff options
Diffstat (limited to 'external/zeromq/src/test')
3 files changed, 128 insertions, 0 deletions
diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java new file mode 100644 index 0000000000..96af7d737d --- /dev/null +++ b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.zeromq; + +import org.junit.Test; + +import akka.actor.SupervisorStrategy; +import akka.util.ByteString; +import akka.zeromq.Subscribe; + +import org.apache.spark.api.java.function.Function; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.LocalJavaStreamingContext; +import org.apache.spark.streaming.api.java.JavaDStream; + +public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext { + + @Test // tests the API, does not actually test data receiving + public void testZeroMQStream() { + JavaStreamingContextWithZeroMQ sscWithZeroMQ = new JavaStreamingContextWithZeroMQ(ssc); + String publishUrl = "abc"; + Subscribe subscribe = new Subscribe((ByteString)null); + Function<byte[][], Iterable<String>> bytesToObjects = new Function<byte[][], Iterable<String>>() { + @Override + public Iterable<String> call(byte[][] bytes) throws Exception { + return null; + } + }; + + JavaDStream<String> test1 = sscWithZeroMQ.<String>zeroMQStream( + publishUrl, subscribe, bytesToObjects); + JavaDStream<String> test2 = sscWithZeroMQ.<String>zeroMQStream( + publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2()); + JavaDStream<String> test3 = sscWithZeroMQ.<String>zeroMQStream( + publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), SupervisorStrategy.defaultStrategy()); + + // To verify that JavaStreamingContextWithKafka is also StreamingContext + JavaDStream<String> socketStream = sscWithZeroMQ.socketTextStream("localhost", 9999); + } +} diff --git a/external/zeromq/src/test/resources/log4j.properties b/external/zeromq/src/test/resources/log4j.properties new file mode 100644 index 0000000000..063529a9cb --- /dev/null +++ b/external/zeromq/src/test/resources/log4j.properties @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file streaming/target/unit-tests.log +log4j.rootCategory=INFO, file +# log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=false +log4j.appender.file.file=streaming/target/unit-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}: %m%n + +# Ignore messages below warning level from Jetty, because it's a bit verbose +log4j.logger.org.eclipse.jetty=WARN + diff --git a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala new file mode 100644 index 0000000000..5adcdb821f --- /dev/null +++ b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.zeromq + +import akka.actor.SupervisorStrategy +import akka.util.ByteString +import akka.zeromq.Subscribe + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{StreamingContext, TestSuiteBase} + +class ZeroMQStreamSuite extends TestSuiteBase { + + test("zeromq input stream") { + val ssc = new StreamingContext(master, framework, batchDuration) + val publishUrl = "abc" + val subscribe = new Subscribe(null.asInstanceOf[ByteString]) + val bytesToObjects = (bytes: Seq[ByteString]) => null.asInstanceOf[Iterator[String]] + + // tests the API, does not actually test data receiving + val test1 = ssc.zeroMQStream(publishUrl, subscribe, bytesToObjects) + val test2 = ssc.zeroMQStream( + publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2) + val test3 = ssc.zeroMQStream(publishUrl, subscribe, bytesToObjects, + StorageLevel.MEMORY_AND_DISK_SER_2, SupervisorStrategy.defaultStrategy) + + // TODO: Actually test data receiving + } +} |