aboutsummaryrefslogtreecommitdiff
path: root/external/flume/src/test
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-03-14 16:56:04 -0700
committerReynold Xin <rxin@databricks.com>2016-03-14 16:56:04 -0700
commit06dec37455c3f800897defee6fad0da623f26050 (patch)
treed49dd098587f8a3c7a019b0aad605327da6fcecd /external/flume/src/test
parent8301fadd8d269da11e72870b7a889596e3337839 (diff)
downloadspark-06dec37455c3f800897defee6fad0da623f26050.tar.gz
spark-06dec37455c3f800897defee6fad0da623f26050.tar.bz2
spark-06dec37455c3f800897defee6fad0da623f26050.zip
[SPARK-13843][STREAMING] Remove streaming-flume, streaming-mqtt, streaming-zeromq, streaming-akka, streaming-twitter to Spark packages
## What changes were proposed in this pull request? Currently there are a few sub-projects, each for integrating with different external sources for Streaming. Now that we have better ability to include external libraries (spark packages) and with Spark 2.0 coming up, we can move the following projects out of Spark to https://github.com/spark-packages - streaming-flume - streaming-akka - streaming-mqtt - streaming-zeromq - streaming-twitter They are just some ancillary packages and considering the overhead of maintenance, running tests and PR failures, it's better to maintain them out of Spark. In addition, these projects can have their different release cycles and we can release them faster. I have already copied these projects to https://github.com/spark-packages ## How was this patch tested? Jenkins tests Author: Shixiong Zhu <shixiong@databricks.com> Closes #11672 from zsxwing/remove-external-pkg.
Diffstat (limited to 'external/flume/src/test')
-rw-r--r--external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java44
-rw-r--r--external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java44
-rw-r--r--external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java36
-rw-r--r--external/flume/src/test/resources/log4j.properties28
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala48
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala129
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala102
7 files changed, 0 insertions, 431 deletions
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
deleted file mode 100644
index cfedb5a042..0000000000
--- a/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.junit.After;
-import org.junit.Before;
-
-public abstract class LocalJavaStreamingContext {
-
- protected transient JavaStreamingContext ssc;
-
- @Before
- public void setUp() {
- SparkConf conf = new SparkConf()
- .setMaster("local[2]")
- .setAppName("test")
- .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
- ssc = new JavaStreamingContext(conf, new Duration(1000));
- ssc.checkpoint("checkpoint");
- }
-
- @After
- public void tearDown() {
- ssc.stop();
- ssc = null;
- }
-}
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java
deleted file mode 100644
index 79c5b91654..0000000000
--- a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume;
-
-import java.net.InetSocketAddress;
-
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.LocalJavaStreamingContext;
-
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
-import org.junit.Test;
-
-public class JavaFlumePollingStreamSuite extends LocalJavaStreamingContext {
- @Test
- public void testFlumeStream() {
- // tests the API, does not actually test data receiving
- InetSocketAddress[] addresses = new InetSocketAddress[] {
- new InetSocketAddress("localhost", 12345)
- };
- JavaReceiverInputDStream<SparkFlumeEvent> test1 =
- FlumeUtils.createPollingStream(ssc, "localhost", 12345);
- JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createPollingStream(
- ssc, "localhost", 12345, StorageLevel.MEMORY_AND_DISK_SER_2());
- JavaReceiverInputDStream<SparkFlumeEvent> test3 = FlumeUtils.createPollingStream(
- ssc, addresses, StorageLevel.MEMORY_AND_DISK_SER_2());
- JavaReceiverInputDStream<SparkFlumeEvent> test4 = FlumeUtils.createPollingStream(
- ssc, addresses, StorageLevel.MEMORY_AND_DISK_SER_2(), 100, 5);
- }
-}
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
deleted file mode 100644
index 3b5e0c7746..0000000000
--- a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume;
-
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.LocalJavaStreamingContext;
-
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
-import org.junit.Test;
-
-public class JavaFlumeStreamSuite extends LocalJavaStreamingContext {
- @Test
- public void testFlumeStream() {
- // tests the API, does not actually test data receiving
- JavaReceiverInputDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
- JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
- StorageLevel.MEMORY_AND_DISK_SER_2());
- JavaReceiverInputDStream<SparkFlumeEvent> test3 = FlumeUtils.createStream(ssc, "localhost", 12345,
- StorageLevel.MEMORY_AND_DISK_SER_2(), false);
- }
-}
diff --git a/external/flume/src/test/resources/log4j.properties b/external/flume/src/test/resources/log4j.properties
deleted file mode 100644
index 75e3b53a09..0000000000
--- a/external/flume/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,28 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Set everything to be logged to the file target/unit-tests.log
-log4j.rootCategory=INFO, file
-log4j.appender.file=org.apache.log4j.FileAppender
-log4j.appender.file.append=true
-log4j.appender.file.file=target/unit-tests.log
-log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
-
-# Ignore messages below warning level from Jetty, because it's a bit verbose
-log4j.logger.org.spark-project.jetty=WARN
-
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
deleted file mode 100644
index c97a27ca7c..0000000000
--- a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-import java.io.{IOException, ObjectInputStream}
-import java.util.concurrent.ConcurrentLinkedQueue
-
-import scala.reflect.ClassTag
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.streaming.dstream.{DStream, ForEachDStream}
-import org.apache.spark.util.Utils
-
-/**
- * This is a output stream just for the testsuites. All the output is collected into a
- * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
- *
- * The buffer contains a sequence of RDD's, each containing a sequence of items
- */
-class TestOutputStream[T: ClassTag](parent: DStream[T],
- val output: ConcurrentLinkedQueue[Seq[T]] = new ConcurrentLinkedQueue[Seq[T]]())
- extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
- val collected = rdd.collect()
- output.add(collected)
- }, false) {
-
- // This is to clear the output buffer every it is read from a checkpoint
- @throws(classOf[IOException])
- private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
- ois.defaultReadObject()
- output.clear()
- }
-}
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
deleted file mode 100644
index 10dcbf98bc..0000000000
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume
-
-import java.net.InetSocketAddress
-import java.util.concurrent.ConcurrentLinkedQueue
-
-import scala.collection.JavaConverters._
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-import org.scalatest.BeforeAndAfter
-import org.scalatest.concurrent.Eventually._
-
-import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
-import org.apache.spark.network.util.JavaUtils
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Seconds, StreamingContext, TestOutputStream}
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
-import org.apache.spark.util.{ManualClock, Utils}
-
-class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Logging {
-
- val maxAttempts = 5
- val batchDuration = Seconds(1)
-
- val conf = new SparkConf()
- .setMaster("local[2]")
- .setAppName(this.getClass.getSimpleName)
- .set("spark.streaming.clock", "org.apache.spark.util.ManualClock")
-
- val utils = new PollingFlumeTestUtils
-
- test("flume polling test") {
- testMultipleTimes(testFlumePolling)
- }
-
- test("flume polling test multiple hosts") {
- testMultipleTimes(testFlumePollingMultipleHost)
- }
-
- /**
- * Run the given test until no more java.net.BindException's are thrown.
- * Do this only up to a certain attempt limit.
- */
- private def testMultipleTimes(test: () => Unit): Unit = {
- var testPassed = false
- var attempt = 0
- while (!testPassed && attempt < maxAttempts) {
- try {
- test()
- testPassed = true
- } catch {
- case e: Exception if Utils.isBindCollision(e) =>
- logWarning("Exception when running flume polling test: " + e)
- attempt += 1
- }
- }
- assert(testPassed, s"Test failed after $attempt attempts!")
- }
-
- private def testFlumePolling(): Unit = {
- try {
- val port = utils.startSingleSink()
-
- writeAndVerify(Seq(port))
- utils.assertChannelsAreEmpty()
- } finally {
- utils.close()
- }
- }
-
- private def testFlumePollingMultipleHost(): Unit = {
- try {
- val ports = utils.startMultipleSinks()
- writeAndVerify(ports)
- utils.assertChannelsAreEmpty()
- } finally {
- utils.close()
- }
- }
-
- def writeAndVerify(sinkPorts: Seq[Int]): Unit = {
- // Set up the streaming context and input streams
- val ssc = new StreamingContext(conf, batchDuration)
- val addresses = sinkPorts.map(port => new InetSocketAddress("localhost", port))
- val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
- FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK,
- utils.eventsPerBatch, 5)
- val outputQueue = new ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]
- val outputStream = new TestOutputStream(flumeStream, outputQueue)
- outputStream.register()
-
- ssc.start()
- try {
- utils.sendDatAndEnsureAllDataHasBeenReceived()
- val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- clock.advance(batchDuration.milliseconds)
-
- // The eventually is required to ensure that all data in the batch has been processed.
- eventually(timeout(10 seconds), interval(100 milliseconds)) {
- val flattenOutput = outputQueue.asScala.toSeq.flatten
- val headers = flattenOutput.map(_.event.getHeaders.asScala.map {
- case (key, value) => (key.toString, value.toString)
- }).map(_.asJava)
- val bodies = flattenOutput.map(e => JavaUtils.bytesToString(e.event.getBody))
- utils.assertOutput(headers.asJava, bodies.asJava)
- }
- } finally {
- ssc.stop()
- }
- }
-
-}
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
deleted file mode 100644
index 38208c6518..0000000000
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume
-
-import java.util.concurrent.ConcurrentLinkedQueue
-
-import scala.collection.JavaConverters._
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-import org.jboss.netty.channel.ChannelPipeline
-import org.jboss.netty.channel.socket.SocketChannel
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
-import org.jboss.netty.handler.codec.compression._
-import org.scalatest.{BeforeAndAfter, Matchers}
-import org.scalatest.concurrent.Eventually._
-
-import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
-import org.apache.spark.network.util.JavaUtils
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Milliseconds, StreamingContext, TestOutputStream}
-
-class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers with Logging {
- val conf = new SparkConf().setMaster("local[4]").setAppName("FlumeStreamSuite")
- var ssc: StreamingContext = null
-
- test("flume input stream") {
- testFlumeStream(testCompression = false)
- }
-
- test("flume input compressed stream") {
- testFlumeStream(testCompression = true)
- }
-
- /** Run test on flume stream */
- private def testFlumeStream(testCompression: Boolean): Unit = {
- val input = (1 to 100).map { _.toString }
- val utils = new FlumeTestUtils
- try {
- val outputQueue = startContext(utils.getTestPort(), testCompression)
-
- eventually(timeout(10 seconds), interval(100 milliseconds)) {
- utils.writeInput(input.asJava, testCompression)
- }
-
- eventually(timeout(10 seconds), interval(100 milliseconds)) {
- val outputEvents = outputQueue.asScala.toSeq.flatten.map { _.event }
- outputEvents.foreach {
- event =>
- event.getHeaders.get("test") should be("header")
- }
- val output = outputEvents.map(event => JavaUtils.bytesToString(event.getBody))
- output should be (input)
- }
- } finally {
- if (ssc != null) {
- ssc.stop()
- }
- utils.close()
- }
- }
-
- /** Setup and start the streaming context */
- private def startContext(
- testPort: Int, testCompression: Boolean): (ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]) = {
- ssc = new StreamingContext(conf, Milliseconds(200))
- val flumeStream = FlumeUtils.createStream(
- ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, testCompression)
- val outputQueue = new ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]
- val outputStream = new TestOutputStream(flumeStream, outputQueue)
- outputStream.register()
- ssc.start()
- outputQueue
- }
-
- /** Class to create socket channel with compression */
- private class CompressionChannelFactory(compressionLevel: Int)
- extends NioClientSocketChannelFactory {
-
- override def newChannel(pipeline: ChannelPipeline): SocketChannel = {
- val encoder = new ZlibEncoder(compressionLevel)
- pipeline.addFirst("deflater", encoder)
- pipeline.addFirst("inflater", new ZlibDecoder())
- super.newChannel(pipeline)
- }
- }
-}