diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-03-25 17:37:16 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-03-25 17:37:16 -0700 |
commit | 24587ce433aa30f30a5d1ed6566365f24c222a27 (patch) | |
tree | b1ff8ffa17b643d3e833be33debecf209d20ff6d /external/flume-sink/src/test/scala/org/apache | |
parent | 54d13bed87fcf2968f77e1f1153e85184ec91d78 (diff) | |
download | spark-24587ce433aa30f30a5d1ed6566365f24c222a27.tar.gz spark-24587ce433aa30f30a5d1ed6566365f24c222a27.tar.bz2 spark-24587ce433aa30f30a5d1ed6566365f24c222a27.zip |
[SPARK-14073][STREAMING][TEST-MAVEN] Move flume back to Spark
## What changes were proposed in this pull request?
This PR moves flume back to Spark as per the discussion in the dev mail-list.
## How was this patch tested?
Existing Jenkins tests.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #11895 from zsxwing/move-flume-back.
Diffstat (limited to 'external/flume-sink/src/test/scala/org/apache')
-rw-r--r-- | external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala | 218 |
1 files changed, 218 insertions, 0 deletions
diff --git a/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala b/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala new file mode 100644 index 0000000000..e8ca1e7163 --- /dev/null +++ b/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.streaming.flume.sink + +import java.net.InetSocketAddress +import java.nio.charset.StandardCharsets +import java.util.concurrent.{CountDownLatch, Executors, TimeUnit} +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConverters._ +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success} + +import org.apache.avro.ipc.NettyTransceiver +import org.apache.avro.ipc.specific.SpecificRequestor +import org.apache.flume.Context +import org.apache.flume.channel.MemoryChannel +import org.apache.flume.event.EventBuilder +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory + +// Due to MNG-1378, there is not a way to include test dependencies transitively. +// We cannot include Spark core tests as a dependency here because it depends on +// Spark core main, which has too many dependencies to require here manually. +// For this reason, we continue to use FunSuite and ignore the scalastyle checks +// that fail if this is detected. +// scalastyle:off +import org.scalatest.FunSuite + +class SparkSinkSuite extends FunSuite { +// scalastyle:on + + val eventsPerBatch = 1000 + val channelCapacity = 5000 + + test("Success with ack") { + val (channel, sink, latch) = initializeChannelAndSink() + channel.start() + sink.start() + + putEvents(channel, eventsPerBatch) + + val port = sink.getPort + val address = new InetSocketAddress("0.0.0.0", port) + + val (transceiver, client) = getTransceiverAndClient(address, 1)(0) + val events = client.getEventBatch(1000) + client.ack(events.getSequenceNumber) + assert(events.getEvents.size() === 1000) + latch.await(1, TimeUnit.SECONDS) + assertChannelIsEmpty(channel) + sink.stop() + channel.stop() + transceiver.close() + } + + test("Failure with nack") { + val (channel, sink, latch) = initializeChannelAndSink() + channel.start() + sink.start() + putEvents(channel, eventsPerBatch) + + val port = sink.getPort + val address = new InetSocketAddress("0.0.0.0", port) + + val (transceiver, client) = getTransceiverAndClient(address, 1)(0) + val events = client.getEventBatch(1000) + assert(events.getEvents.size() === 1000) + client.nack(events.getSequenceNumber) + latch.await(1, TimeUnit.SECONDS) + assert(availableChannelSlots(channel) === 4000) + sink.stop() + channel.stop() + transceiver.close() + } + + test("Failure with timeout") { + val (channel, sink, latch) = initializeChannelAndSink(Map(SparkSinkConfig + .CONF_TRANSACTION_TIMEOUT -> 1.toString)) + channel.start() + sink.start() + putEvents(channel, eventsPerBatch) + val port = sink.getPort + val address = new InetSocketAddress("0.0.0.0", port) + + val (transceiver, client) = getTransceiverAndClient(address, 1)(0) + val events = client.getEventBatch(1000) + assert(events.getEvents.size() === 1000) + latch.await(1, TimeUnit.SECONDS) + assert(availableChannelSlots(channel) === 4000) + sink.stop() + channel.stop() + transceiver.close() + } + + test("Multiple consumers") { + testMultipleConsumers(failSome = false) + } + + test("Multiple consumers with some failures") { + testMultipleConsumers(failSome = true) + } + + def testMultipleConsumers(failSome: Boolean): Unit = { + implicit val executorContext = ExecutionContext + .fromExecutorService(Executors.newFixedThreadPool(5)) + val (channel, sink, latch) = initializeChannelAndSink(Map.empty, 5) + channel.start() + sink.start() + (1 to 5).foreach(_ => putEvents(channel, eventsPerBatch)) + val port = sink.getPort + val address = new InetSocketAddress("0.0.0.0", port) + val transceiversAndClients = getTransceiverAndClient(address, 5) + val batchCounter = new CountDownLatch(5) + val counter = new AtomicInteger(0) + transceiversAndClients.foreach(x => { + Future { + val client = x._2 + val events = client.getEventBatch(1000) + if (!failSome || counter.getAndIncrement() % 2 == 0) { + client.ack(events.getSequenceNumber) + } else { + client.nack(events.getSequenceNumber) + throw new RuntimeException("Sending NACK for failure!") + } + events + }.onComplete { + case Success(events) => + assert(events.getEvents.size() === 1000) + batchCounter.countDown() + case Failure(t) => + // Don't re-throw the exception, causes a nasty unnecessary stack trace on stdout + batchCounter.countDown() + } + }) + batchCounter.await() + latch.await(1, TimeUnit.SECONDS) + executorContext.shutdown() + if(failSome) { + assert(availableChannelSlots(channel) === 3000) + } else { + assertChannelIsEmpty(channel) + } + sink.stop() + channel.stop() + transceiversAndClients.foreach(x => x._1.close()) + } + + private def initializeChannelAndSink(overrides: Map[String, String] = Map.empty, + batchCounter: Int = 1): (MemoryChannel, SparkSink, CountDownLatch) = { + val channel = new MemoryChannel() + val channelContext = new Context() + + channelContext.put("capacity", channelCapacity.toString) + channelContext.put("transactionCapacity", 1000.toString) + channelContext.put("keep-alive", 0.toString) + channelContext.putAll(overrides.asJava) + channel.setName(scala.util.Random.nextString(10)) + channel.configure(channelContext) + + val sink = new SparkSink() + val sinkContext = new Context() + sinkContext.put(SparkSinkConfig.CONF_HOSTNAME, "0.0.0.0") + sinkContext.put(SparkSinkConfig.CONF_PORT, 0.toString) + sink.configure(sinkContext) + sink.setChannel(channel) + val latch = new CountDownLatch(batchCounter) + sink.countdownWhenBatchReceived(latch) + (channel, sink, latch) + } + + private def putEvents(ch: MemoryChannel, count: Int): Unit = { + val tx = ch.getTransaction + tx.begin() + (1 to count).foreach(x => + ch.put(EventBuilder.withBody(x.toString.getBytes(StandardCharsets.UTF_8)))) + tx.commit() + tx.close() + } + + private def getTransceiverAndClient(address: InetSocketAddress, + count: Int): Seq[(NettyTransceiver, SparkFlumeProtocol.Callback)] = { + + (1 to count).map(_ => { + lazy val channelFactoryExecutor = Executors.newCachedThreadPool( + new SparkSinkThreadFactory("Flume Receiver Channel Thread - %d")) + lazy val channelFactory = + new NioClientSocketChannelFactory(channelFactoryExecutor, channelFactoryExecutor) + val transceiver = new NettyTransceiver(address, channelFactory) + val client = SpecificRequestor.getClient(classOf[SparkFlumeProtocol.Callback], transceiver) + (transceiver, client) + }) + } + + private def assertChannelIsEmpty(channel: MemoryChannel): Unit = { + assert(availableChannelSlots(channel) === channelCapacity) + } + + private def availableChannelSlots(channel: MemoryChannel): Int = { + val queueRemaining = channel.getClass.getDeclaredField("queueRemaining") + queueRemaining.setAccessible(true) + val m = queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits") + m.invoke(queueRemaining.get(channel)).asInstanceOf[Int] + } +} |