diff options
Diffstat (limited to 'external/flume/src/test')
7 files changed, 0 insertions, 431 deletions
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java deleted file mode 100644 index cfedb5a042..0000000000 --- a/external/flume/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming; - -import org.apache.spark.SparkConf; -import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.junit.After; -import org.junit.Before; - -public abstract class LocalJavaStreamingContext { - - protected transient JavaStreamingContext ssc; - - @Before - public void setUp() { - SparkConf conf = new SparkConf() - .setMaster("local[2]") - .setAppName("test") - .set("spark.streaming.clock", "org.apache.spark.util.ManualClock"); - ssc = new JavaStreamingContext(conf, new Duration(1000)); - ssc.checkpoint("checkpoint"); - } - - @After - public void tearDown() { - ssc.stop(); - ssc = null; - } -} diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java deleted file mode 100644 index 79c5b91654..0000000000 --- a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.flume; - -import java.net.InetSocketAddress; - -import org.apache.spark.storage.StorageLevel; -import org.apache.spark.streaming.LocalJavaStreamingContext; - -import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; -import org.junit.Test; - -public class JavaFlumePollingStreamSuite extends LocalJavaStreamingContext { - @Test - public void testFlumeStream() { - // tests the API, does not actually test data receiving - InetSocketAddress[] addresses = new InetSocketAddress[] { - new InetSocketAddress("localhost", 12345) - }; - JavaReceiverInputDStream<SparkFlumeEvent> test1 = - FlumeUtils.createPollingStream(ssc, "localhost", 12345); - JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createPollingStream( - ssc, "localhost", 12345, StorageLevel.MEMORY_AND_DISK_SER_2()); - JavaReceiverInputDStream<SparkFlumeEvent> test3 = FlumeUtils.createPollingStream( - ssc, addresses, StorageLevel.MEMORY_AND_DISK_SER_2()); - JavaReceiverInputDStream<SparkFlumeEvent> test4 = FlumeUtils.createPollingStream( - ssc, addresses, StorageLevel.MEMORY_AND_DISK_SER_2(), 100, 5); - } -} diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java deleted file mode 100644 index 3b5e0c7746..0000000000 --- a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.flume; - -import org.apache.spark.storage.StorageLevel; -import org.apache.spark.streaming.LocalJavaStreamingContext; - -import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; -import org.junit.Test; - -public class JavaFlumeStreamSuite extends LocalJavaStreamingContext { - @Test - public void testFlumeStream() { - // tests the API, does not actually test data receiving - JavaReceiverInputDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345); - JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345, - StorageLevel.MEMORY_AND_DISK_SER_2()); - JavaReceiverInputDStream<SparkFlumeEvent> test3 = FlumeUtils.createStream(ssc, "localhost", 12345, - StorageLevel.MEMORY_AND_DISK_SER_2(), false); - } -} diff --git a/external/flume/src/test/resources/log4j.properties b/external/flume/src/test/resources/log4j.properties deleted file mode 100644 index 75e3b53a09..0000000000 --- a/external/flume/src/test/resources/log4j.properties +++ /dev/null @@ -1,28 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# Set everything to be logged to the file target/unit-tests.log -log4j.rootCategory=INFO, file -log4j.appender.file=org.apache.log4j.FileAppender -log4j.appender.file.append=true -log4j.appender.file.file=target/unit-tests.log -log4j.appender.file.layout=org.apache.log4j.PatternLayout -log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n - -# Ignore messages below warning level from Jetty, because it's a bit verbose -log4j.logger.org.spark-project.jetty=WARN - diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala deleted file mode 100644 index c97a27ca7c..0000000000 --- a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming - -import java.io.{IOException, ObjectInputStream} -import java.util.concurrent.ConcurrentLinkedQueue - -import scala.reflect.ClassTag - -import org.apache.spark.rdd.RDD -import org.apache.spark.streaming.dstream.{DStream, ForEachDStream} -import org.apache.spark.util.Utils - -/** - * This is a output stream just for the testsuites. All the output is collected into a - * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint. - * - * The buffer contains a sequence of RDD's, each containing a sequence of items - */ -class TestOutputStream[T: ClassTag](parent: DStream[T], - val output: ConcurrentLinkedQueue[Seq[T]] = new ConcurrentLinkedQueue[Seq[T]]()) - extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => { - val collected = rdd.collect() - output.add(collected) - }, false) { - - // This is to clear the output buffer every it is read from a checkpoint - @throws(classOf[IOException]) - private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException { - ois.defaultReadObject() - output.clear() - } -} diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala deleted file mode 100644 index 10dcbf98bc..0000000000 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.flume - -import java.net.InetSocketAddress -import java.util.concurrent.ConcurrentLinkedQueue - -import scala.collection.JavaConverters._ -import scala.concurrent.duration._ -import scala.language.postfixOps - -import org.scalatest.BeforeAndAfter -import org.scalatest.concurrent.Eventually._ - -import org.apache.spark.{Logging, SparkConf, SparkFunSuite} -import org.apache.spark.network.util.JavaUtils -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Seconds, StreamingContext, TestOutputStream} -import org.apache.spark.streaming.dstream.ReceiverInputDStream -import org.apache.spark.util.{ManualClock, Utils} - -class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Logging { - - val maxAttempts = 5 - val batchDuration = Seconds(1) - - val conf = new SparkConf() - .setMaster("local[2]") - .setAppName(this.getClass.getSimpleName) - .set("spark.streaming.clock", "org.apache.spark.util.ManualClock") - - val utils = new PollingFlumeTestUtils - - test("flume polling test") { - testMultipleTimes(testFlumePolling) - } - - test("flume polling test multiple hosts") { - testMultipleTimes(testFlumePollingMultipleHost) - } - - /** - * Run the given test until no more java.net.BindException's are thrown. - * Do this only up to a certain attempt limit. - */ - private def testMultipleTimes(test: () => Unit): Unit = { - var testPassed = false - var attempt = 0 - while (!testPassed && attempt < maxAttempts) { - try { - test() - testPassed = true - } catch { - case e: Exception if Utils.isBindCollision(e) => - logWarning("Exception when running flume polling test: " + e) - attempt += 1 - } - } - assert(testPassed, s"Test failed after $attempt attempts!") - } - - private def testFlumePolling(): Unit = { - try { - val port = utils.startSingleSink() - - writeAndVerify(Seq(port)) - utils.assertChannelsAreEmpty() - } finally { - utils.close() - } - } - - private def testFlumePollingMultipleHost(): Unit = { - try { - val ports = utils.startMultipleSinks() - writeAndVerify(ports) - utils.assertChannelsAreEmpty() - } finally { - utils.close() - } - } - - def writeAndVerify(sinkPorts: Seq[Int]): Unit = { - // Set up the streaming context and input streams - val ssc = new StreamingContext(conf, batchDuration) - val addresses = sinkPorts.map(port => new InetSocketAddress("localhost", port)) - val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = - FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK, - utils.eventsPerBatch, 5) - val outputQueue = new ConcurrentLinkedQueue[Seq[SparkFlumeEvent]] - val outputStream = new TestOutputStream(flumeStream, outputQueue) - outputStream.register() - - ssc.start() - try { - utils.sendDatAndEnsureAllDataHasBeenReceived() - val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - clock.advance(batchDuration.milliseconds) - - // The eventually is required to ensure that all data in the batch has been processed. - eventually(timeout(10 seconds), interval(100 milliseconds)) { - val flattenOutput = outputQueue.asScala.toSeq.flatten - val headers = flattenOutput.map(_.event.getHeaders.asScala.map { - case (key, value) => (key.toString, value.toString) - }).map(_.asJava) - val bodies = flattenOutput.map(e => JavaUtils.bytesToString(e.event.getBody)) - utils.assertOutput(headers.asJava, bodies.asJava) - } - } finally { - ssc.stop() - } - } - -} diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala deleted file mode 100644 index 38208c6518..0000000000 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.flume - -import java.util.concurrent.ConcurrentLinkedQueue - -import scala.collection.JavaConverters._ -import scala.concurrent.duration._ -import scala.language.postfixOps - -import org.jboss.netty.channel.ChannelPipeline -import org.jboss.netty.channel.socket.SocketChannel -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory -import org.jboss.netty.handler.codec.compression._ -import org.scalatest.{BeforeAndAfter, Matchers} -import org.scalatest.concurrent.Eventually._ - -import org.apache.spark.{Logging, SparkConf, SparkFunSuite} -import org.apache.spark.network.util.JavaUtils -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Milliseconds, StreamingContext, TestOutputStream} - -class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers with Logging { - val conf = new SparkConf().setMaster("local[4]").setAppName("FlumeStreamSuite") - var ssc: StreamingContext = null - - test("flume input stream") { - testFlumeStream(testCompression = false) - } - - test("flume input compressed stream") { - testFlumeStream(testCompression = true) - } - - /** Run test on flume stream */ - private def testFlumeStream(testCompression: Boolean): Unit = { - val input = (1 to 100).map { _.toString } - val utils = new FlumeTestUtils - try { - val outputQueue = startContext(utils.getTestPort(), testCompression) - - eventually(timeout(10 seconds), interval(100 milliseconds)) { - utils.writeInput(input.asJava, testCompression) - } - - eventually(timeout(10 seconds), interval(100 milliseconds)) { - val outputEvents = outputQueue.asScala.toSeq.flatten.map { _.event } - outputEvents.foreach { - event => - event.getHeaders.get("test") should be("header") - } - val output = outputEvents.map(event => JavaUtils.bytesToString(event.getBody)) - output should be (input) - } - } finally { - if (ssc != null) { - ssc.stop() - } - utils.close() - } - } - - /** Setup and start the streaming context */ - private def startContext( - testPort: Int, testCompression: Boolean): (ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]) = { - ssc = new StreamingContext(conf, Milliseconds(200)) - val flumeStream = FlumeUtils.createStream( - ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, testCompression) - val outputQueue = new ConcurrentLinkedQueue[Seq[SparkFlumeEvent]] - val outputStream = new TestOutputStream(flumeStream, outputQueue) - outputStream.register() - ssc.start() - outputQueue - } - - /** Class to create socket channel with compression */ - private class CompressionChannelFactory(compressionLevel: Int) - extends NioClientSocketChannelFactory { - - override def newChannel(pipeline: ChannelPipeline): SocketChannel = { - val encoder = new ZlibEncoder(compressionLevel) - pipeline.addFirst("deflater", encoder) - pipeline.addFirst("inflater", new ZlibDecoder()) - super.newChannel(pipeline) - } - } -} |