aboutsummaryrefslogtreecommitdiff
path: root/external/flume/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'external/flume/src/test')
-rw-r--r--external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java44
-rw-r--r--external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java44
-rw-r--r--external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java36
-rw-r--r--external/flume/src/test/resources/log4j.properties28
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala48
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala129
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala102
7 files changed, 0 insertions, 431 deletions
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
deleted file mode 100644
index cfedb5a042..0000000000
--- a/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.junit.After;
-import org.junit.Before;
-
-public abstract class LocalJavaStreamingContext {
-
- protected transient JavaStreamingContext ssc;
-
- @Before
- public void setUp() {
- SparkConf conf = new SparkConf()
- .setMaster("local[2]")
- .setAppName("test")
- .set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
- ssc = new JavaStreamingContext(conf, new Duration(1000));
- ssc.checkpoint("checkpoint");
- }
-
- @After
- public void tearDown() {
- ssc.stop();
- ssc = null;
- }
-}
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java
deleted file mode 100644
index 79c5b91654..0000000000
--- a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume;
-
-import java.net.InetSocketAddress;
-
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.LocalJavaStreamingContext;
-
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
-import org.junit.Test;
-
-public class JavaFlumePollingStreamSuite extends LocalJavaStreamingContext {
- @Test
- public void testFlumeStream() {
- // tests the API, does not actually test data receiving
- InetSocketAddress[] addresses = new InetSocketAddress[] {
- new InetSocketAddress("localhost", 12345)
- };
- JavaReceiverInputDStream<SparkFlumeEvent> test1 =
- FlumeUtils.createPollingStream(ssc, "localhost", 12345);
- JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createPollingStream(
- ssc, "localhost", 12345, StorageLevel.MEMORY_AND_DISK_SER_2());
- JavaReceiverInputDStream<SparkFlumeEvent> test3 = FlumeUtils.createPollingStream(
- ssc, addresses, StorageLevel.MEMORY_AND_DISK_SER_2());
- JavaReceiverInputDStream<SparkFlumeEvent> test4 = FlumeUtils.createPollingStream(
- ssc, addresses, StorageLevel.MEMORY_AND_DISK_SER_2(), 100, 5);
- }
-}
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
deleted file mode 100644
index 3b5e0c7746..0000000000
--- a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume;
-
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.LocalJavaStreamingContext;
-
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
-import org.junit.Test;
-
-public class JavaFlumeStreamSuite extends LocalJavaStreamingContext {
- @Test
- public void testFlumeStream() {
- // tests the API, does not actually test data receiving
- JavaReceiverInputDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
- JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
- StorageLevel.MEMORY_AND_DISK_SER_2());
- JavaReceiverInputDStream<SparkFlumeEvent> test3 = FlumeUtils.createStream(ssc, "localhost", 12345,
- StorageLevel.MEMORY_AND_DISK_SER_2(), false);
- }
-}
diff --git a/external/flume/src/test/resources/log4j.properties b/external/flume/src/test/resources/log4j.properties
deleted file mode 100644
index 75e3b53a09..0000000000
--- a/external/flume/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,28 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Set everything to be logged to the file target/unit-tests.log
-log4j.rootCategory=INFO, file
-log4j.appender.file=org.apache.log4j.FileAppender
-log4j.appender.file.append=true
-log4j.appender.file.file=target/unit-tests.log
-log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
-
-# Ignore messages below warning level from Jetty, because it's a bit verbose
-log4j.logger.org.spark-project.jetty=WARN
-
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
deleted file mode 100644
index c97a27ca7c..0000000000
--- a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-import java.io.{IOException, ObjectInputStream}
-import java.util.concurrent.ConcurrentLinkedQueue
-
-import scala.reflect.ClassTag
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.streaming.dstream.{DStream, ForEachDStream}
-import org.apache.spark.util.Utils
-
-/**
- * This is a output stream just for the testsuites. All the output is collected into a
- * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
- *
- * The buffer contains a sequence of RDD's, each containing a sequence of items
- */
-class TestOutputStream[T: ClassTag](parent: DStream[T],
- val output: ConcurrentLinkedQueue[Seq[T]] = new ConcurrentLinkedQueue[Seq[T]]())
- extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
- val collected = rdd.collect()
- output.add(collected)
- }, false) {
-
- // This is to clear the output buffer every it is read from a checkpoint
- @throws(classOf[IOException])
- private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
- ois.defaultReadObject()
- output.clear()
- }
-}
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
deleted file mode 100644
index 10dcbf98bc..0000000000
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume
-
-import java.net.InetSocketAddress
-import java.util.concurrent.ConcurrentLinkedQueue
-
-import scala.collection.JavaConverters._
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-import org.scalatest.BeforeAndAfter
-import org.scalatest.concurrent.Eventually._
-
-import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
-import org.apache.spark.network.util.JavaUtils
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Seconds, StreamingContext, TestOutputStream}
-import org.apache.spark.streaming.dstream.ReceiverInputDStream
-import org.apache.spark.util.{ManualClock, Utils}
-
-class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Logging {
-
- val maxAttempts = 5
- val batchDuration = Seconds(1)
-
- val conf = new SparkConf()
- .setMaster("local[2]")
- .setAppName(this.getClass.getSimpleName)
- .set("spark.streaming.clock", "org.apache.spark.util.ManualClock")
-
- val utils = new PollingFlumeTestUtils
-
- test("flume polling test") {
- testMultipleTimes(testFlumePolling)
- }
-
- test("flume polling test multiple hosts") {
- testMultipleTimes(testFlumePollingMultipleHost)
- }
-
- /**
- * Run the given test until no more java.net.BindException's are thrown.
- * Do this only up to a certain attempt limit.
- */
- private def testMultipleTimes(test: () => Unit): Unit = {
- var testPassed = false
- var attempt = 0
- while (!testPassed && attempt < maxAttempts) {
- try {
- test()
- testPassed = true
- } catch {
- case e: Exception if Utils.isBindCollision(e) =>
- logWarning("Exception when running flume polling test: " + e)
- attempt += 1
- }
- }
- assert(testPassed, s"Test failed after $attempt attempts!")
- }
-
- private def testFlumePolling(): Unit = {
- try {
- val port = utils.startSingleSink()
-
- writeAndVerify(Seq(port))
- utils.assertChannelsAreEmpty()
- } finally {
- utils.close()
- }
- }
-
- private def testFlumePollingMultipleHost(): Unit = {
- try {
- val ports = utils.startMultipleSinks()
- writeAndVerify(ports)
- utils.assertChannelsAreEmpty()
- } finally {
- utils.close()
- }
- }
-
- def writeAndVerify(sinkPorts: Seq[Int]): Unit = {
- // Set up the streaming context and input streams
- val ssc = new StreamingContext(conf, batchDuration)
- val addresses = sinkPorts.map(port => new InetSocketAddress("localhost", port))
- val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
- FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK,
- utils.eventsPerBatch, 5)
- val outputQueue = new ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]
- val outputStream = new TestOutputStream(flumeStream, outputQueue)
- outputStream.register()
-
- ssc.start()
- try {
- utils.sendDatAndEnsureAllDataHasBeenReceived()
- val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- clock.advance(batchDuration.milliseconds)
-
- // The eventually is required to ensure that all data in the batch has been processed.
- eventually(timeout(10 seconds), interval(100 milliseconds)) {
- val flattenOutput = outputQueue.asScala.toSeq.flatten
- val headers = flattenOutput.map(_.event.getHeaders.asScala.map {
- case (key, value) => (key.toString, value.toString)
- }).map(_.asJava)
- val bodies = flattenOutput.map(e => JavaUtils.bytesToString(e.event.getBody))
- utils.assertOutput(headers.asJava, bodies.asJava)
- }
- } finally {
- ssc.stop()
- }
- }
-
-}
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
deleted file mode 100644
index 38208c6518..0000000000
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.flume
-
-import java.util.concurrent.ConcurrentLinkedQueue
-
-import scala.collection.JavaConverters._
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-import org.jboss.netty.channel.ChannelPipeline
-import org.jboss.netty.channel.socket.SocketChannel
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
-import org.jboss.netty.handler.codec.compression._
-import org.scalatest.{BeforeAndAfter, Matchers}
-import org.scalatest.concurrent.Eventually._
-
-import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
-import org.apache.spark.network.util.JavaUtils
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Milliseconds, StreamingContext, TestOutputStream}
-
-class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers with Logging {
- val conf = new SparkConf().setMaster("local[4]").setAppName("FlumeStreamSuite")
- var ssc: StreamingContext = null
-
- test("flume input stream") {
- testFlumeStream(testCompression = false)
- }
-
- test("flume input compressed stream") {
- testFlumeStream(testCompression = true)
- }
-
- /** Run test on flume stream */
- private def testFlumeStream(testCompression: Boolean): Unit = {
- val input = (1 to 100).map { _.toString }
- val utils = new FlumeTestUtils
- try {
- val outputQueue = startContext(utils.getTestPort(), testCompression)
-
- eventually(timeout(10 seconds), interval(100 milliseconds)) {
- utils.writeInput(input.asJava, testCompression)
- }
-
- eventually(timeout(10 seconds), interval(100 milliseconds)) {
- val outputEvents = outputQueue.asScala.toSeq.flatten.map { _.event }
- outputEvents.foreach {
- event =>
- event.getHeaders.get("test") should be("header")
- }
- val output = outputEvents.map(event => JavaUtils.bytesToString(event.getBody))
- output should be (input)
- }
- } finally {
- if (ssc != null) {
- ssc.stop()
- }
- utils.close()
- }
- }
-
- /** Setup and start the streaming context */
- private def startContext(
- testPort: Int, testCompression: Boolean): (ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]) = {
- ssc = new StreamingContext(conf, Milliseconds(200))
- val flumeStream = FlumeUtils.createStream(
- ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, testCompression)
- val outputQueue = new ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]
- val outputStream = new TestOutputStream(flumeStream, outputQueue)
- outputStream.register()
- ssc.start()
- outputQueue
- }
-
- /** Class to create socket channel with compression */
- private class CompressionChannelFactory(compressionLevel: Int)
- extends NioClientSocketChannelFactory {
-
- override def newChannel(pipeline: ChannelPipeline): SocketChannel = {
- val encoder = new ZlibEncoder(compressionLevel)
- pipeline.addFirst("deflater", encoder)
- pipeline.addFirst("inflater", new ZlibDecoder())
- super.newChannel(pipeline)
- }
- }
-}