aboutsummaryrefslogtreecommitdiff
path: root/external/zeromq/src
diff options
context:
space:
mode:
Diffstat (limited to 'external/zeromq/src')
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala55
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala163
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package-info.java21
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala23
-rw-r--r--external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java44
-rw-r--r--external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java63
-rw-r--r--external/zeromq/src/test/resources/log4j.properties28
-rw-r--r--external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala63
8 files changed, 0 insertions, 460 deletions
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
deleted file mode 100644
index dd367cd43b..0000000000
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.zeromq
-
-import scala.reflect.ClassTag
-
-import akka.util.ByteString
-import akka.zeromq._
-
-import org.apache.spark.Logging
-import org.apache.spark.streaming.akka.ActorReceiver
-
-/**
- * A receiver to subscribe to ZeroMQ stream.
- */
-private[streaming] class ZeroMQReceiver[T: ClassTag](
- publisherUrl: String,
- subscribe: Subscribe,
- bytesToObjects: Seq[ByteString] => Iterator[T])
- extends ActorReceiver with Logging {
-
- override def preStart(): Unit = {
- ZeroMQExtension(context.system)
- .newSocket(SocketType.Sub, Listener(self), Connect(publisherUrl), subscribe)
- }
-
- def receive: Receive = {
-
- case Connecting => logInfo("connecting ...")
-
- case m: ZMQMessage =>
- logDebug("Received message for:" + m.frame(0))
-
- // We ignore first frame for processing as it is the topic
- val bytes = m.frames.tail
- store(bytesToObjects(bytes))
-
- case Closed => logInfo("received closed ")
- }
-}
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
deleted file mode 100644
index 1784d6e862..0000000000
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.zeromq
-
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-
-import akka.actor.{ActorSystem, Props, SupervisorStrategy}
-import akka.util.ByteString
-import akka.zeromq.Subscribe
-
-import org.apache.spark.api.java.function.{Function => JFunction, Function0 => JFunction0}
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils}
-import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
-
-object ZeroMQUtils {
- /**
- * Create an input stream that receives messages pushed by a zeromq publisher.
- * @param ssc StreamingContext object
- * @param publisherUrl Url of remote zeromq publisher
- * @param subscribe Topic to subscribe to
- * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic
- * and each frame has sequence of byte thus it needs the converter
- * (which might be deserializer of bytes) to translate from sequence
- * of sequence of bytes, where sequence refer to a frame
- * and sub sequence refer to its payload.
- * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
- * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will
- * be shut down when the receiver is stopping (default:
- * ActorReceiver.defaultActorSystemCreator)
- * @param supervisorStrategy the supervisor strategy (default: ActorReceiver.defaultStrategy)
- */
- def createStream[T: ClassTag](
- ssc: StreamingContext,
- publisherUrl: String,
- subscribe: Subscribe,
- bytesToObjects: Seq[ByteString] => Iterator[T],
- storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
- actorSystemCreator: () => ActorSystem = ActorReceiver.defaultActorSystemCreator,
- supervisorStrategy: SupervisorStrategy = ActorReceiver.defaultSupervisorStrategy
- ): ReceiverInputDStream[T] = {
- AkkaUtils.createStream(
- ssc,
- Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
- "ZeroMQReceiver",
- storageLevel,
- actorSystemCreator,
- supervisorStrategy)
- }
-
- /**
- * Create an input stream that receives messages pushed by a zeromq publisher.
- * @param jssc JavaStreamingContext object
- * @param publisherUrl Url of remote ZeroMQ publisher
- * @param subscribe Topic to subscribe to
- * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
- * frame has sequence of byte thus it needs the converter(which might be
- * deserializer of bytes) to translate from sequence of sequence of bytes,
- * where sequence refer to a frame and sub sequence refer to its payload.
- * @param storageLevel Storage level to use for storing the received objects
- * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will
- * be shut down when the receiver is stopping.
- * @param supervisorStrategy the supervisor strategy (default: ActorReceiver.defaultStrategy)
- */
- def createStream[T](
- jssc: JavaStreamingContext,
- publisherUrl: String,
- subscribe: Subscribe,
- bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
- storageLevel: StorageLevel,
- actorSystemCreator: JFunction0[ActorSystem],
- supervisorStrategy: SupervisorStrategy
- ): JavaReceiverInputDStream[T] = {
- implicit val cm: ClassTag[T] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- val fn =
- (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala
- createStream[T](
- jssc.ssc,
- publisherUrl,
- subscribe,
- fn,
- storageLevel,
- () => actorSystemCreator.call(),
- supervisorStrategy)
- }
-
- /**
- * Create an input stream that receives messages pushed by a zeromq publisher.
- * @param jssc JavaStreamingContext object
- * @param publisherUrl Url of remote zeromq publisher
- * @param subscribe Topic to subscribe to
- * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
- * frame has sequence of byte thus it needs the converter(which might be
- * deserializer of bytes) to translate from sequence of sequence of bytes,
- * where sequence refer to a frame and sub sequence refer to its payload.
- * @param storageLevel RDD storage level.
- */
- def createStream[T](
- jssc: JavaStreamingContext,
- publisherUrl: String,
- subscribe: Subscribe,
- bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
- storageLevel: StorageLevel
- ): JavaReceiverInputDStream[T] = {
- implicit val cm: ClassTag[T] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- val fn =
- (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala
- createStream[T](
- jssc.ssc,
- publisherUrl,
- subscribe,
- fn,
- storageLevel)
- }
-
- /**
- * Create an input stream that receives messages pushed by a zeromq publisher.
- * @param jssc JavaStreamingContext object
- * @param publisherUrl Url of remote zeromq publisher
- * @param subscribe Topic to subscribe to
- * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
- * frame has sequence of byte thus it needs the converter(which might
- * be deserializer of bytes) to translate from sequence of sequence of
- * bytes, where sequence refer to a frame and sub sequence refer to its
- * payload.
- */
- def createStream[T](
- jssc: JavaStreamingContext,
- publisherUrl: String,
- subscribe: Subscribe,
- bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]]
- ): JavaReceiverInputDStream[T] = {
- implicit val cm: ClassTag[T] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- val fn =
- (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala
- createStream[T](
- jssc.ssc,
- publisherUrl,
- subscribe,
- fn)
- }
-}
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package-info.java b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package-info.java
deleted file mode 100644
index 587c524e21..0000000000
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Zeromq receiver for spark streaming.
- */
-package org.apache.spark.streaming.zeromq; \ No newline at end of file
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala
deleted file mode 100644
index 65e6e57f2c..0000000000
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-/**
- * Zeromq receiver for spark streaming.
- */
-package object zeromq
diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
deleted file mode 100644
index cfedb5a042..0000000000
--- a/external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.junit.After;
-import org.junit.Before;
-
-public abstract class LocalJavaStreamingContext {
-
- protected transient JavaStreamingContext ssc;
-
- @Before
- public void setUp() {
- SparkConf conf = new SparkConf()
- .setMaster("local[2]")
- .setAppName("test")
- .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
- ssc = new JavaStreamingContext(conf, new Duration(1000));
- ssc.checkpoint("checkpoint");
- }
-
- @After
- public void tearDown() {
- ssc.stop();
- ssc = null;
- }
-}
diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
deleted file mode 100644
index 9ff4b41f97..0000000000
--- a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.zeromq;
-
-import akka.actor.ActorSystem;
-import akka.actor.SupervisorStrategy;
-import akka.util.ByteString;
-import akka.zeromq.Subscribe;
-import org.junit.Test;
-
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.Function0;
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.LocalJavaStreamingContext;
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
-
-public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
-
- @Test // tests the API, does not actually test data receiving
- public void testZeroMQStream() {
- String publishUrl = "abc";
- Subscribe subscribe = new Subscribe((ByteString)null);
- Function<byte[][], Iterable<String>> bytesToObjects = new BytesToObjects();
- Function0<ActorSystem> actorSystemCreator = new ActorSystemCreatorForTest();
-
- JavaReceiverInputDStream<String> test1 = ZeroMQUtils.<String>createStream(
- ssc, publishUrl, subscribe, bytesToObjects);
- JavaReceiverInputDStream<String> test2 = ZeroMQUtils.<String>createStream(
- ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2());
- JavaReceiverInputDStream<String> test3 = ZeroMQUtils.<String>createStream(
- ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), actorSystemCreator,
- SupervisorStrategy.defaultStrategy());
- }
-}
-
-class BytesToObjects implements Function<byte[][], Iterable<String>> {
- @Override
- public Iterable<String> call(byte[][] bytes) throws Exception {
- return null;
- }
-}
-
-class ActorSystemCreatorForTest implements Function0<ActorSystem> {
- @Override
- public ActorSystem call() {
- return null;
- }
-}
diff --git a/external/zeromq/src/test/resources/log4j.properties b/external/zeromq/src/test/resources/log4j.properties
deleted file mode 100644
index 75e3b53a09..0000000000
--- a/external/zeromq/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,28 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Set everything to be logged to the file target/unit-tests.log
-log4j.rootCategory=INFO, file
-log4j.appender.file=org.apache.log4j.FileAppender
-log4j.appender.file.append=true
-log4j.appender.file.file=target/unit-tests.log
-log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
-
-# Ignore messages below warning level from Jetty, because it's a bit verbose
-log4j.logger.org.spark-project.jetty=WARN
-
diff --git a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
deleted file mode 100644
index bac2679cab..0000000000
--- a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.zeromq
-
-import akka.actor.SupervisorStrategy
-import akka.util.ByteString
-import akka.zeromq.Subscribe
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
-
-class ZeroMQStreamSuite extends SparkFunSuite {
-
- val batchDuration = Seconds(1)
-
- private val master: String = "local[2]"
-
- private val framework: String = this.getClass.getSimpleName
-
- test("zeromq input stream") {
- val ssc = new StreamingContext(master, framework, batchDuration)
- val publishUrl = "abc"
- val subscribe = new Subscribe(null.asInstanceOf[ByteString])
- val bytesToObjects = (bytes: Seq[ByteString]) => null.asInstanceOf[Iterator[String]]
-
- // tests the API, does not actually test data receiving
- val test1: ReceiverInputDStream[String] =
- ZeroMQUtils.createStream(
- ssc, publishUrl, subscribe, bytesToObjects, actorSystemCreator = () => null)
- val test2: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
- ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2, () => null)
- val test3: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
- ssc, publishUrl, subscribe, bytesToObjects,
- StorageLevel.MEMORY_AND_DISK_SER_2, () => null, SupervisorStrategy.defaultStrategy)
- val test4: ReceiverInputDStream[String] =
- ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects)
- val test5: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
- ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2)
- val test6: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
- ssc, publishUrl, subscribe, bytesToObjects,
- StorageLevel.MEMORY_AND_DISK_SER_2, supervisorStrategy = SupervisorStrategy.defaultStrategy)
-
- // TODO: Actually test data receiving. A real test needs the native ZeroMQ library
- ssc.stop()
- }
-}