aboutsummaryrefslogtreecommitdiff
path: root/external/zeromq
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-03-14 16:56:04 -0700
committerReynold Xin <rxin@databricks.com>2016-03-14 16:56:04 -0700
commit06dec37455c3f800897defee6fad0da623f26050 (patch)
treed49dd098587f8a3c7a019b0aad605327da6fcecd /external/zeromq
parent8301fadd8d269da11e72870b7a889596e3337839 (diff)
downloadspark-06dec37455c3f800897defee6fad0da623f26050.tar.gz
spark-06dec37455c3f800897defee6fad0da623f26050.tar.bz2
spark-06dec37455c3f800897defee6fad0da623f26050.zip
[SPARK-13843][STREAMING] Remove streaming-flume, streaming-mqtt, streaming-zeromq, streaming-akka, streaming-twitter to Spark packages
## What changes were proposed in this pull request? Currently there are a few sub-projects, each for integrating with different external sources for Streaming. Now that we have better ability to include external libraries (spark packages) and with Spark 2.0 coming up, we can move the following projects out of Spark to https://github.com/spark-packages - streaming-flume - streaming-akka - streaming-mqtt - streaming-zeromq - streaming-twitter They are just some ancillary packages and considering the overhead of maintenance, running tests and PR failures, it's better to maintain them out of Spark. In addition, these projects can have their different release cycles and we can release them faster. I have already copied these projects to https://github.com/spark-packages ## How was this patch tested? Jenkins tests Author: Shixiong Zhu <shixiong@databricks.com> Closes #11672 from zsxwing/remove-external-pkg.
Diffstat (limited to 'external/zeromq')
-rw-r--r--external/zeromq/pom.xml74
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala55
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala163
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package-info.java21
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala23
-rw-r--r--external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java44
-rw-r--r--external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java63
-rw-r--r--external/zeromq/src/test/resources/log4j.properties28
-rw-r--r--external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala63
9 files changed, 0 insertions, 534 deletions
diff --git a/external/zeromq/pom.xml b/external/zeromq/pom.xml
deleted file mode 100644
index f16bc0f319..0000000000
--- a/external/zeromq/pom.xml
+++ /dev/null
@@ -1,74 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-parent_2.11</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- <relativePath>../../pom.xml</relativePath>
- </parent>
-
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-zeromq_2.11</artifactId>
- <properties>
- <sbt.project.name>streaming-zeromq</sbt.project.name>
- </properties>
- <packaging>jar</packaging>
- <name>Spark Project External ZeroMQ</name>
- <url>http://spark.apache.org/</url>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-akka_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>${akka.group}</groupId>
- <artifactId>akka-zeromq_${scala.binary.version}</artifactId>
- </dependency>
- <dependency>
- <groupId>org.scalacheck</groupId>
- <artifactId>scalacheck_${scala.binary.version}</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-test-tags_${scala.binary.version}</artifactId>
- </dependency>
- </dependencies>
- <build>
- <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
- <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
- </build>
-</project>
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
deleted file mode 100644
index dd367cd43b..0000000000
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.zeromq
-
-import scala.reflect.ClassTag
-
-import akka.util.ByteString
-import akka.zeromq._
-
-import org.apache.spark.Logging
-import org.apache.spark.streaming.akka.ActorReceiver
-
-/**
- * A receiver to subscribe to ZeroMQ stream.
- */
-private[streaming] class ZeroMQReceiver[T: ClassTag](
- publisherUrl: String,
- subscribe: Subscribe,
- bytesToObjects: Seq[ByteString] => Iterator[T])
- extends ActorReceiver with Logging {
-
- override def preStart(): Unit = {
- ZeroMQExtension(context.system)
- .newSocket(SocketType.Sub, Listener(self), Connect(publisherUrl), subscribe)
- }
-
- def receive: Receive = {
-
- case Connecting => logInfo("connecting ...")
-
- case m: ZMQMessage =>
- logDebug("Received message for:" + m.frame(0))
-
- // We ignore first frame for processing as it is the topic
- val bytes = m.frames.tail
- store(bytesToObjects(bytes))
-
- case Closed => logInfo("received closed ")
- }
-}
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
deleted file mode 100644
index 1784d6e862..0000000000
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.zeromq
-
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-
-import akka.actor.{ActorSystem, Props, SupervisorStrategy}
-import akka.util.ByteString
-import akka.zeromq.Subscribe
-
-import org.apache.spark.api.java.function.{Function => JFunction, Function0 => JFunction0}
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils}
-import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
-
-object ZeroMQUtils {
- /**
- * Create an input stream that receives messages pushed by a zeromq publisher.
- * @param ssc StreamingContext object
- * @param publisherUrl Url of remote zeromq publisher
- * @param subscribe Topic to subscribe to
- * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic
- * and each frame has sequence of byte thus it needs the converter
- * (which might be deserializer of bytes) to translate from sequence
- * of sequence of bytes, where sequence refer to a frame
- * and sub sequence refer to its payload.
- * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
- * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will
- * be shut down when the receiver is stopping (default:
- * ActorReceiver.defaultActorSystemCreator)
- * @param supervisorStrategy the supervisor strategy (default: ActorReceiver.defaultStrategy)
- */
- def createStream[T: ClassTag](
- ssc: StreamingContext,
- publisherUrl: String,
- subscribe: Subscribe,
- bytesToObjects: Seq[ByteString] => Iterator[T],
- storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
- actorSystemCreator: () => ActorSystem = ActorReceiver.defaultActorSystemCreator,
- supervisorStrategy: SupervisorStrategy = ActorReceiver.defaultSupervisorStrategy
- ): ReceiverInputDStream[T] = {
- AkkaUtils.createStream(
- ssc,
- Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
- "ZeroMQReceiver",
- storageLevel,
- actorSystemCreator,
- supervisorStrategy)
- }
-
- /**
- * Create an input stream that receives messages pushed by a zeromq publisher.
- * @param jssc JavaStreamingContext object
- * @param publisherUrl Url of remote ZeroMQ publisher
- * @param subscribe Topic to subscribe to
- * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
- * frame has sequence of byte thus it needs the converter(which might be
- * deserializer of bytes) to translate from sequence of sequence of bytes,
- * where sequence refer to a frame and sub sequence refer to its payload.
- * @param storageLevel Storage level to use for storing the received objects
- * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will
- * be shut down when the receiver is stopping.
- * @param supervisorStrategy the supervisor strategy (default: ActorReceiver.defaultStrategy)
- */
- def createStream[T](
- jssc: JavaStreamingContext,
- publisherUrl: String,
- subscribe: Subscribe,
- bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
- storageLevel: StorageLevel,
- actorSystemCreator: JFunction0[ActorSystem],
- supervisorStrategy: SupervisorStrategy
- ): JavaReceiverInputDStream[T] = {
- implicit val cm: ClassTag[T] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- val fn =
- (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala
- createStream[T](
- jssc.ssc,
- publisherUrl,
- subscribe,
- fn,
- storageLevel,
- () => actorSystemCreator.call(),
- supervisorStrategy)
- }
-
- /**
- * Create an input stream that receives messages pushed by a zeromq publisher.
- * @param jssc JavaStreamingContext object
- * @param publisherUrl Url of remote zeromq publisher
- * @param subscribe Topic to subscribe to
- * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
- * frame has sequence of byte thus it needs the converter(which might be
- * deserializer of bytes) to translate from sequence of sequence of bytes,
- * where sequence refer to a frame and sub sequence refer to its payload.
- * @param storageLevel RDD storage level.
- */
- def createStream[T](
- jssc: JavaStreamingContext,
- publisherUrl: String,
- subscribe: Subscribe,
- bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
- storageLevel: StorageLevel
- ): JavaReceiverInputDStream[T] = {
- implicit val cm: ClassTag[T] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- val fn =
- (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala
- createStream[T](
- jssc.ssc,
- publisherUrl,
- subscribe,
- fn,
- storageLevel)
- }
-
- /**
- * Create an input stream that receives messages pushed by a zeromq publisher.
- * @param jssc JavaStreamingContext object
- * @param publisherUrl Url of remote zeromq publisher
- * @param subscribe Topic to subscribe to
- * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
- * frame has sequence of byte thus it needs the converter(which might
- * be deserializer of bytes) to translate from sequence of sequence of
- * bytes, where sequence refer to a frame and sub sequence refer to its
- * payload.
- */
- def createStream[T](
- jssc: JavaStreamingContext,
- publisherUrl: String,
- subscribe: Subscribe,
- bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]]
- ): JavaReceiverInputDStream[T] = {
- implicit val cm: ClassTag[T] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- val fn =
- (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala
- createStream[T](
- jssc.ssc,
- publisherUrl,
- subscribe,
- fn)
- }
-}
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package-info.java b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package-info.java
deleted file mode 100644
index 587c524e21..0000000000
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Zeromq receiver for spark streaming.
- */
-package org.apache.spark.streaming.zeromq; \ No newline at end of file
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala
deleted file mode 100644
index 65e6e57f2c..0000000000
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-/**
- * Zeromq receiver for spark streaming.
- */
-package object zeromq
diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
deleted file mode 100644
index cfedb5a042..0000000000
--- a/external/zeromq/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.junit.After;
-import org.junit.Before;
-
-public abstract class LocalJavaStreamingContext {
-
- protected transient JavaStreamingContext ssc;
-
- @Before
- public void setUp() {
- SparkConf conf = new SparkConf()
- .setMaster("local[2]")
- .setAppName("test")
- .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
- ssc = new JavaStreamingContext(conf, new Duration(1000));
- ssc.checkpoint("checkpoint");
- }
-
- @After
- public void tearDown() {
- ssc.stop();
- ssc = null;
- }
-}
diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
deleted file mode 100644
index 9ff4b41f97..0000000000
--- a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.zeromq;
-
-import akka.actor.ActorSystem;
-import akka.actor.SupervisorStrategy;
-import akka.util.ByteString;
-import akka.zeromq.Subscribe;
-import org.junit.Test;
-
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.Function0;
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.LocalJavaStreamingContext;
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
-
-public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
-
- @Test // tests the API, does not actually test data receiving
- public void testZeroMQStream() {
- String publishUrl = "abc";
- Subscribe subscribe = new Subscribe((ByteString)null);
- Function<byte[][], Iterable<String>> bytesToObjects = new BytesToObjects();
- Function0<ActorSystem> actorSystemCreator = new ActorSystemCreatorForTest();
-
- JavaReceiverInputDStream<String> test1 = ZeroMQUtils.<String>createStream(
- ssc, publishUrl, subscribe, bytesToObjects);
- JavaReceiverInputDStream<String> test2 = ZeroMQUtils.<String>createStream(
- ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2());
- JavaReceiverInputDStream<String> test3 = ZeroMQUtils.<String>createStream(
- ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), actorSystemCreator,
- SupervisorStrategy.defaultStrategy());
- }
-}
-
-class BytesToObjects implements Function<byte[][], Iterable<String>> {
- @Override
- public Iterable<String> call(byte[][] bytes) throws Exception {
- return null;
- }
-}
-
-class ActorSystemCreatorForTest implements Function0<ActorSystem> {
- @Override
- public ActorSystem call() {
- return null;
- }
-}
diff --git a/external/zeromq/src/test/resources/log4j.properties b/external/zeromq/src/test/resources/log4j.properties
deleted file mode 100644
index 75e3b53a09..0000000000
--- a/external/zeromq/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,28 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Set everything to be logged to the file target/unit-tests.log
-log4j.rootCategory=INFO, file
-log4j.appender.file=org.apache.log4j.FileAppender
-log4j.appender.file.append=true
-log4j.appender.file.file=target/unit-tests.log
-log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
-
-# Ignore messages below warning level from Jetty, because it's a bit verbose
-log4j.logger.org.spark-project.jetty=WARN
-
diff --git a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
deleted file mode 100644
index bac2679cab..0000000000
--- a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.zeromq
-
-import akka.actor.SupervisorStrategy
-import akka.util.ByteString
-import akka.zeromq.Subscribe
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
-
-class ZeroMQStreamSuite extends SparkFunSuite {
-
- val batchDuration = Seconds(1)
-
- private val master: String = "local[2]"
-
- private val framework: String = this.getClass.getSimpleName
-
- test("zeromq input stream") {
- val ssc = new StreamingContext(master, framework, batchDuration)
- val publishUrl = "abc"
- val subscribe = new Subscribe(null.asInstanceOf[ByteString])
- val bytesToObjects = (bytes: Seq[ByteString]) => null.asInstanceOf[Iterator[String]]
-
- // tests the API, does not actually test data receiving
- val test1: ReceiverInputDStream[String] =
- ZeroMQUtils.createStream(
- ssc, publishUrl, subscribe, bytesToObjects, actorSystemCreator = () => null)
- val test2: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
- ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2, () => null)
- val test3: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
- ssc, publishUrl, subscribe, bytesToObjects,
- StorageLevel.MEMORY_AND_DISK_SER_2, () => null, SupervisorStrategy.defaultStrategy)
- val test4: ReceiverInputDStream[String] =
- ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects)
- val test5: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
- ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2)
- val test6: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
- ssc, publishUrl, subscribe, bytesToObjects,
- StorageLevel.MEMORY_AND_DISK_SER_2, supervisorStrategy = SupervisorStrategy.defaultStrategy)
-
- // TODO: Actually test data receiving. A real test needs the native ZeroMQ library
- ssc.stop()
- }
-}