aboutsummaryrefslogtreecommitdiff
path: root/external/flume
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-07-01 11:59:24 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-07-01 11:59:24 -0700
commit75b9fe4c5ff6f206c6fc9100563d625b39f142ba (patch)
tree8b7e9a8de0003a8525845f84ef548e76fc9d0729 /external/flume
parentb8faa32875aa560cdce340266d898902a920418d (diff)
downloadspark-75b9fe4c5ff6f206c6fc9100563d625b39f142ba.tar.gz
spark-75b9fe4c5ff6f206c6fc9100563d625b39f142ba.tar.bz2
spark-75b9fe4c5ff6f206c6fc9100563d625b39f142ba.zip
[SPARK-8378] [STREAMING] Add the Python API for Flume
Author: zsxwing <zsxwing@gmail.com> Closes #6830 from zsxwing/flume-python and squashes the following commits: 78dfdac [zsxwing] Fix the compile error in the test code f1bf3c0 [zsxwing] Address TD's comments 0449723 [zsxwing] Add sbt goal streaming-flume-assembly/assembly e93736b [zsxwing] Fix the test case for determine_modules_to_test 9d5821e [zsxwing] Fix pyspark_core dependencies f9ee681 [zsxwing] Merge branch 'master' into flume-python 7a55837 [zsxwing] Add streaming_flume_assembly to run-tests.py b96b0de [zsxwing] Merge branch 'master' into flume-python ce85e83 [zsxwing] Fix incompatible issues for Python 3 01cbb3d [zsxwing] Add import sys 152364c [zsxwing] Fix the issue that StringIO doesn't work in Python 3 14ba0ff [zsxwing] Add flume-assembly for sbt building b8d5551 [zsxwing] Merge branch 'master' into flume-python 4762c34 [zsxwing] Fix the doc 0336579 [zsxwing] Refactor Flume unit tests and also add tests for Python API 9f33873 [zsxwing] Add the Python API for Flume
Diffstat (limited to 'external/flume')
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala116
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala76
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala209
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala173
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala106
5 files changed, 456 insertions, 224 deletions
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
new file mode 100644
index 0000000000..9d9c3b1894
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume
+
+import java.net.{InetSocketAddress, ServerSocket}
+import java.nio.ByteBuffer
+import java.util.{List => JList}
+
+import scala.collection.JavaConversions._
+
+import com.google.common.base.Charsets.UTF_8
+import org.apache.avro.ipc.NettyTransceiver
+import org.apache.avro.ipc.specific.SpecificRequestor
+import org.apache.commons.lang3.RandomUtils
+import org.apache.flume.source.avro
+import org.apache.flume.source.avro.{AvroSourceProtocol, AvroFlumeEvent}
+import org.jboss.netty.channel.ChannelPipeline
+import org.jboss.netty.channel.socket.SocketChannel
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
+import org.jboss.netty.handler.codec.compression.{ZlibDecoder, ZlibEncoder}
+
+import org.apache.spark.util.Utils
+import org.apache.spark.SparkConf
+
+/**
+ * Share codes for Scala and Python unit tests
+ */
+private[flume] class FlumeTestUtils {
+
+ private var transceiver: NettyTransceiver = null
+
+ private val testPort: Int = findFreePort()
+
+ def getTestPort(): Int = testPort
+
+ /** Find a free port */
+ private def findFreePort(): Int = {
+ val candidatePort = RandomUtils.nextInt(1024, 65536)
+ Utils.startServiceOnPort(candidatePort, (trialPort: Int) => {
+ val socket = new ServerSocket(trialPort)
+ socket.close()
+ (null, trialPort)
+ }, new SparkConf())._2
+ }
+
+ /** Send data to the flume receiver */
+ def writeInput(input: JList[String], enableCompression: Boolean): Unit = {
+ val testAddress = new InetSocketAddress("localhost", testPort)
+
+ val inputEvents = input.map { item =>
+ val event = new AvroFlumeEvent
+ event.setBody(ByteBuffer.wrap(item.getBytes(UTF_8)))
+ event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header"))
+ event
+ }
+
+ // if last attempted transceiver had succeeded, close it
+ close()
+
+ // Create transceiver
+ transceiver = {
+ if (enableCompression) {
+ new NettyTransceiver(testAddress, new CompressionChannelFactory(6))
+ } else {
+ new NettyTransceiver(testAddress)
+ }
+ }
+
+ // Create Avro client with the transceiver
+ val client = SpecificRequestor.getClient(classOf[AvroSourceProtocol], transceiver)
+ if (client == null) {
+ throw new AssertionError("Cannot create client")
+ }
+
+ // Send data
+ val status = client.appendBatch(inputEvents.toList)
+ if (status != avro.Status.OK) {
+ throw new AssertionError("Sent events unsuccessfully")
+ }
+ }
+
+ def close(): Unit = {
+ if (transceiver != null) {
+ transceiver.close()
+ transceiver = null
+ }
+ }
+
+ /** Class to create socket channel with compression */
+ private class CompressionChannelFactory(compressionLevel: Int)
+ extends NioClientSocketChannelFactory {
+
+ override def newChannel(pipeline: ChannelPipeline): SocketChannel = {
+ val encoder = new ZlibEncoder(compressionLevel)
+ pipeline.addFirst("deflater", encoder)
+ pipeline.addFirst("inflater", new ZlibDecoder())
+ super.newChannel(pipeline)
+ }
+ }
+
+}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
index 44dec45c22..095bfb0c73 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
@@ -18,10 +18,16 @@
package org.apache.spark.streaming.flume
import java.net.InetSocketAddress
+import java.io.{DataOutputStream, ByteArrayOutputStream}
+import java.util.{List => JList, Map => JMap}
+import scala.collection.JavaConversions._
+
+import org.apache.spark.api.java.function.PairFunction
+import org.apache.spark.api.python.PythonRDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
+import org.apache.spark.streaming.api.java.{JavaPairDStream, JavaReceiverInputDStream, JavaStreamingContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream
@@ -236,3 +242,71 @@ object FlumeUtils {
createPollingStream(jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism)
}
}
+
+/**
+ * This is a helper class that wraps the methods in FlumeUtils into more Python-friendly class and
+ * function so that it can be easily instantiated and called from Python's FlumeUtils.
+ */
+private class FlumeUtilsPythonHelper {
+
+ def createStream(
+ jssc: JavaStreamingContext,
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel,
+ enableDecompression: Boolean
+ ): JavaPairDStream[Array[Byte], Array[Byte]] = {
+ val dstream = FlumeUtils.createStream(jssc, hostname, port, storageLevel, enableDecompression)
+ FlumeUtilsPythonHelper.toByteArrayPairDStream(dstream)
+ }
+
+ def createPollingStream(
+ jssc: JavaStreamingContext,
+ hosts: JList[String],
+ ports: JList[Int],
+ storageLevel: StorageLevel,
+ maxBatchSize: Int,
+ parallelism: Int
+ ): JavaPairDStream[Array[Byte], Array[Byte]] = {
+ assert(hosts.length == ports.length)
+ val addresses = hosts.zip(ports).map {
+ case (host, port) => new InetSocketAddress(host, port)
+ }
+ val dstream = FlumeUtils.createPollingStream(
+ jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism)
+ FlumeUtilsPythonHelper.toByteArrayPairDStream(dstream)
+ }
+
+}
+
+private object FlumeUtilsPythonHelper {
+
+ private def stringMapToByteArray(map: JMap[CharSequence, CharSequence]): Array[Byte] = {
+ val byteStream = new ByteArrayOutputStream()
+ val output = new DataOutputStream(byteStream)
+ try {
+ output.writeInt(map.size)
+ map.foreach { kv =>
+ PythonRDD.writeUTF(kv._1.toString, output)
+ PythonRDD.writeUTF(kv._2.toString, output)
+ }
+ byteStream.toByteArray
+ }
+ finally {
+ output.close()
+ }
+ }
+
+ private def toByteArrayPairDStream(dstream: JavaReceiverInputDStream[SparkFlumeEvent]):
+ JavaPairDStream[Array[Byte], Array[Byte]] = {
+ dstream.mapToPair(new PairFunction[SparkFlumeEvent, Array[Byte], Array[Byte]] {
+ override def call(sparkEvent: SparkFlumeEvent): (Array[Byte], Array[Byte]) = {
+ val event = sparkEvent.event
+ val byteBuffer = event.getBody
+ val body = new Array[Byte](byteBuffer.remaining())
+ byteBuffer.get(body)
+ (stringMapToByteArray(event.getHeaders), body)
+ }
+ })
+ }
+}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
new file mode 100644
index 0000000000..91d63d49db
--- /dev/null
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.flume
+
+import java.util.concurrent._
+import java.util.{List => JList, Map => JMap}
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+import com.google.common.base.Charsets.UTF_8
+import org.apache.flume.event.EventBuilder
+import org.apache.flume.Context
+import org.apache.flume.channel.MemoryChannel
+import org.apache.flume.conf.Configurables
+
+import org.apache.spark.streaming.flume.sink.{SparkSinkConfig, SparkSink}
+
+/**
+ * Share codes for Scala and Python unit tests
+ */
+private[flume] class PollingFlumeTestUtils {
+
+ private val batchCount = 5
+ val eventsPerBatch = 100
+ private val totalEventsPerChannel = batchCount * eventsPerBatch
+ private val channelCapacity = 5000
+
+ def getTotalEvents: Int = totalEventsPerChannel * channels.size
+
+ private val channels = new ArrayBuffer[MemoryChannel]
+ private val sinks = new ArrayBuffer[SparkSink]
+
+ /**
+ * Start a sink and return the port of this sink
+ */
+ def startSingleSink(): Int = {
+ channels.clear()
+ sinks.clear()
+
+ // Start the channel and sink.
+ val context = new Context()
+ context.put("capacity", channelCapacity.toString)
+ context.put("transactionCapacity", "1000")
+ context.put("keep-alive", "0")
+ val channel = new MemoryChannel()
+ Configurables.configure(channel, context)
+
+ val sink = new SparkSink()
+ context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
+ context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
+ Configurables.configure(sink, context)
+ sink.setChannel(channel)
+ sink.start()
+
+ channels += (channel)
+ sinks += sink
+
+ sink.getPort()
+ }
+
+ /**
+ * Start 2 sinks and return the ports
+ */
+ def startMultipleSinks(): JList[Int] = {
+ channels.clear()
+ sinks.clear()
+
+ // Start the channel and sink.
+ val context = new Context()
+ context.put("capacity", channelCapacity.toString)
+ context.put("transactionCapacity", "1000")
+ context.put("keep-alive", "0")
+ val channel = new MemoryChannel()
+ Configurables.configure(channel, context)
+
+ val channel2 = new MemoryChannel()
+ Configurables.configure(channel2, context)
+
+ val sink = new SparkSink()
+ context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
+ context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
+ Configurables.configure(sink, context)
+ sink.setChannel(channel)
+ sink.start()
+
+ val sink2 = new SparkSink()
+ context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
+ context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
+ Configurables.configure(sink2, context)
+ sink2.setChannel(channel2)
+ sink2.start()
+
+ sinks += sink
+ sinks += sink2
+ channels += channel
+ channels += channel2
+
+ sinks.map(_.getPort())
+ }
+
+ /**
+ * Send data and wait until all data has been received
+ */
+ def sendDatAndEnsureAllDataHasBeenReceived(): Unit = {
+ val executor = Executors.newCachedThreadPool()
+ val executorCompletion = new ExecutorCompletionService[Void](executor)
+
+ val latch = new CountDownLatch(batchCount * channels.size)
+ sinks.foreach(_.countdownWhenBatchReceived(latch))
+
+ channels.foreach(channel => {
+ executorCompletion.submit(new TxnSubmitter(channel))
+ })
+
+ for (i <- 0 until channels.size) {
+ executorCompletion.take()
+ }
+
+ latch.await(15, TimeUnit.SECONDS) // Ensure all data has been received.
+ }
+
+ /**
+ * A Python-friendly method to assert the output
+ */
+ def assertOutput(
+ outputHeaders: JList[JMap[String, String]], outputBodies: JList[String]): Unit = {
+ require(outputHeaders.size == outputBodies.size)
+ val eventSize = outputHeaders.size
+ if (eventSize != totalEventsPerChannel * channels.size) {
+ throw new AssertionError(
+ s"Expected ${totalEventsPerChannel * channels.size} events, but was $eventSize")
+ }
+ var counter = 0
+ for (k <- 0 until channels.size; i <- 0 until totalEventsPerChannel) {
+ val eventBodyToVerify = s"${channels(k).getName}-$i"
+ val eventHeaderToVerify: JMap[String, String] = Map[String, String](s"test-$i" -> "header")
+ var found = false
+ var j = 0
+ while (j < eventSize && !found) {
+ if (eventBodyToVerify == outputBodies.get(j) &&
+ eventHeaderToVerify == outputHeaders.get(j)) {
+ found = true
+ counter += 1
+ }
+ j += 1
+ }
+ }
+ if (counter != totalEventsPerChannel * channels.size) {
+ throw new AssertionError(
+ s"111 Expected ${totalEventsPerChannel * channels.size} events, but was $counter")
+ }
+ }
+
+ def assertChannelsAreEmpty(): Unit = {
+ channels.foreach(assertChannelIsEmpty)
+ }
+
+ private def assertChannelIsEmpty(channel: MemoryChannel): Unit = {
+ val queueRemaining = channel.getClass.getDeclaredField("queueRemaining")
+ queueRemaining.setAccessible(true)
+ val m = queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits")
+ if (m.invoke(queueRemaining.get(channel)).asInstanceOf[Int] != 5000) {
+ throw new AssertionError(s"Channel ${channel.getName} is not empty")
+ }
+ }
+
+ def close(): Unit = {
+ sinks.foreach(_.stop())
+ sinks.clear()
+ channels.foreach(_.stop())
+ channels.clear()
+ }
+
+ private class TxnSubmitter(channel: MemoryChannel) extends Callable[Void] {
+ override def call(): Void = {
+ var t = 0
+ for (i <- 0 until batchCount) {
+ val tx = channel.getTransaction
+ tx.begin()
+ for (j <- 0 until eventsPerBatch) {
+ channel.put(EventBuilder.withBody(s"${channel.getName}-$t".getBytes(UTF_8),
+ Map[String, String](s"test-$t" -> "header")))
+ t += 1
+ }
+ tx.commit()
+ tx.close()
+ Thread.sleep(500) // Allow some time for the events to reach
+ }
+ null
+ }
+ }
+
+}
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
index d772b9ca9b..d5f9a0aa38 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -18,47 +18,33 @@
package org.apache.spark.streaming.flume
import java.net.InetSocketAddress
-import java.util.concurrent._
import scala.collection.JavaConversions._
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import scala.concurrent.duration._
import scala.language.postfixOps
-import org.apache.flume.Context
-import org.apache.flume.channel.MemoryChannel
-import org.apache.flume.conf.Configurables
-import org.apache.flume.event.EventBuilder
-import org.scalatest.concurrent.Eventually._
-
+import com.google.common.base.Charsets.UTF_8
import org.scalatest.BeforeAndAfter
+import org.scalatest.concurrent.Eventually._
import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, TestOutputStream, StreamingContext}
-import org.apache.spark.streaming.flume.sink._
import org.apache.spark.util.{ManualClock, Utils}
class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Logging {
- val batchCount = 5
- val eventsPerBatch = 100
- val totalEventsPerChannel = batchCount * eventsPerBatch
- val channelCapacity = 5000
val maxAttempts = 5
val batchDuration = Seconds(1)
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName(this.getClass.getSimpleName)
+ .set("spark.streaming.clock", "org.apache.spark.util.ManualClock")
- def beforeFunction() {
- logInfo("Using manual clock")
- conf.set("spark.streaming.clock", "org.apache.spark.util.ManualClock")
- }
-
- before(beforeFunction())
+ val utils = new PollingFlumeTestUtils
test("flume polling test") {
testMultipleTimes(testFlumePolling)
@@ -89,146 +75,55 @@ class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Log
}
private def testFlumePolling(): Unit = {
- // Start the channel and sink.
- val context = new Context()
- context.put("capacity", channelCapacity.toString)
- context.put("transactionCapacity", "1000")
- context.put("keep-alive", "0")
- val channel = new MemoryChannel()
- Configurables.configure(channel, context)
-
- val sink = new SparkSink()
- context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
- context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
- Configurables.configure(sink, context)
- sink.setChannel(channel)
- sink.start()
-
- writeAndVerify(Seq(sink), Seq(channel))
- assertChannelIsEmpty(channel)
- sink.stop()
- channel.stop()
+ try {
+ val port = utils.startSingleSink()
+
+ writeAndVerify(Seq(port))
+ utils.assertChannelsAreEmpty()
+ } finally {
+ utils.close()
+ }
}
private def testFlumePollingMultipleHost(): Unit = {
- // Start the channel and sink.
- val context = new Context()
- context.put("capacity", channelCapacity.toString)
- context.put("transactionCapacity", "1000")
- context.put("keep-alive", "0")
- val channel = new MemoryChannel()
- Configurables.configure(channel, context)
-
- val channel2 = new MemoryChannel()
- Configurables.configure(channel2, context)
-
- val sink = new SparkSink()
- context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
- context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
- Configurables.configure(sink, context)
- sink.setChannel(channel)
- sink.start()
-
- val sink2 = new SparkSink()
- context.put(SparkSinkConfig.CONF_HOSTNAME, "localhost")
- context.put(SparkSinkConfig.CONF_PORT, String.valueOf(0))
- Configurables.configure(sink2, context)
- sink2.setChannel(channel2)
- sink2.start()
try {
- writeAndVerify(Seq(sink, sink2), Seq(channel, channel2))
- assertChannelIsEmpty(channel)
- assertChannelIsEmpty(channel2)
+ val ports = utils.startMultipleSinks()
+ writeAndVerify(ports)
+ utils.assertChannelsAreEmpty()
} finally {
- sink.stop()
- sink2.stop()
- channel.stop()
- channel2.stop()
+ utils.close()
}
}
- def writeAndVerify(sinks: Seq[SparkSink], channels: Seq[MemoryChannel]) {
+ def writeAndVerify(sinkPorts: Seq[Int]): Unit = {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
- val addresses = sinks.map(sink => new InetSocketAddress("localhost", sink.getPort()))
+ val addresses = sinkPorts.map(port => new InetSocketAddress("localhost", port))
val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK,
- eventsPerBatch, 5)
+ utils.eventsPerBatch, 5)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
outputStream.register()
ssc.start()
- val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- val executor = Executors.newCachedThreadPool()
- val executorCompletion = new ExecutorCompletionService[Void](executor)
-
- val latch = new CountDownLatch(batchCount * channels.size)
- sinks.foreach(_.countdownWhenBatchReceived(latch))
-
- channels.foreach(channel => {
- executorCompletion.submit(new TxnSubmitter(channel, clock))
- })
-
- for (i <- 0 until channels.size) {
- executorCompletion.take()
- }
-
- latch.await(15, TimeUnit.SECONDS) // Ensure all data has been received.
- clock.advance(batchDuration.milliseconds)
-
- // The eventually is required to ensure that all data in the batch has been processed.
- eventually(timeout(10 seconds), interval(100 milliseconds)) {
- val flattenedBuffer = outputBuffer.flatten
- assert(flattenedBuffer.size === totalEventsPerChannel * channels.size)
- var counter = 0
- for (k <- 0 until channels.size; i <- 0 until totalEventsPerChannel) {
- val eventToVerify = EventBuilder.withBody((channels(k).getName + " - " +
- String.valueOf(i)).getBytes("utf-8"),
- Map[String, String]("test-" + i.toString -> "header"))
- var found = false
- var j = 0
- while (j < flattenedBuffer.size && !found) {
- val strToCompare = new String(flattenedBuffer(j).event.getBody.array(), "utf-8")
- if (new String(eventToVerify.getBody, "utf-8") == strToCompare &&
- eventToVerify.getHeaders.get("test-" + i.toString)
- .equals(flattenedBuffer(j).event.getHeaders.get("test-" + i.toString))) {
- found = true
- counter += 1
- }
- j += 1
- }
- }
- assert(counter === totalEventsPerChannel * channels.size)
- }
- ssc.stop()
- }
-
- def assertChannelIsEmpty(channel: MemoryChannel): Unit = {
- val queueRemaining = channel.getClass.getDeclaredField("queueRemaining")
- queueRemaining.setAccessible(true)
- val m = queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits")
- assert(m.invoke(queueRemaining.get(channel)).asInstanceOf[Int] === 5000)
- }
-
- private class TxnSubmitter(channel: MemoryChannel, clock: ManualClock) extends Callable[Void] {
- override def call(): Void = {
- var t = 0
- for (i <- 0 until batchCount) {
- val tx = channel.getTransaction
- tx.begin()
- for (j <- 0 until eventsPerBatch) {
- channel.put(EventBuilder.withBody((channel.getName + " - " + String.valueOf(t)).getBytes(
- "utf-8"),
- Map[String, String]("test-" + t.toString -> "header")))
- t += 1
- }
- tx.commit()
- tx.close()
- Thread.sleep(500) // Allow some time for the events to reach
+ try {
+ utils.sendDatAndEnsureAllDataHasBeenReceived()
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ clock.advance(batchDuration.milliseconds)
+
+ // The eventually is required to ensure that all data in the batch has been processed.
+ eventually(timeout(10 seconds), interval(100 milliseconds)) {
+ val flattenOutputBuffer = outputBuffer.flatten
+ val headers = flattenOutputBuffer.map(_.event.getHeaders.map {
+ case kv => (kv._1.toString, kv._2.toString)
+ }).map(mapAsJavaMap)
+ val bodies = flattenOutputBuffer.map(e => new String(e.event.getBody.array(), UTF_8))
+ utils.assertOutput(headers, bodies)
}
- null
+ } finally {
+ ssc.stop()
}
}
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
index c926359987..5bc4cdf653 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -17,20 +17,12 @@
package org.apache.spark.streaming.flume
-import java.net.{InetSocketAddress, ServerSocket}
-import java.nio.ByteBuffer
-
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import scala.concurrent.duration._
import scala.language.postfixOps
import com.google.common.base.Charsets
-import org.apache.avro.ipc.NettyTransceiver
-import org.apache.avro.ipc.specific.SpecificRequestor
-import org.apache.commons.lang3.RandomUtils
-import org.apache.flume.source.avro
-import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
import org.jboss.netty.channel.ChannelPipeline
import org.jboss.netty.channel.socket.SocketChannel
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
@@ -41,22 +33,10 @@ import org.scalatest.concurrent.Eventually._
import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext, TestOutputStream}
-import org.apache.spark.util.Utils
class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers with Logging {
val conf = new SparkConf().setMaster("local[4]").setAppName("FlumeStreamSuite")
-
var ssc: StreamingContext = null
- var transceiver: NettyTransceiver = null
-
- after {
- if (ssc != null) {
- ssc.stop()
- }
- if (transceiver != null) {
- transceiver.close()
- }
- }
test("flume input stream") {
testFlumeStream(testCompression = false)
@@ -69,19 +49,29 @@ class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers w
/** Run test on flume stream */
private def testFlumeStream(testCompression: Boolean): Unit = {
val input = (1 to 100).map { _.toString }
- val testPort = findFreePort()
- val outputBuffer = startContext(testPort, testCompression)
- writeAndVerify(input, testPort, outputBuffer, testCompression)
- }
+ val utils = new FlumeTestUtils
+ try {
+ val outputBuffer = startContext(utils.getTestPort(), testCompression)
- /** Find a free port */
- private def findFreePort(): Int = {
- val candidatePort = RandomUtils.nextInt(1024, 65536)
- Utils.startServiceOnPort(candidatePort, (trialPort: Int) => {
- val socket = new ServerSocket(trialPort)
- socket.close()
- (null, trialPort)
- }, conf)._2
+ eventually(timeout(10 seconds), interval(100 milliseconds)) {
+ utils.writeInput(input, testCompression)
+ }
+
+ eventually(timeout(10 seconds), interval(100 milliseconds)) {
+ val outputEvents = outputBuffer.flatten.map { _.event }
+ outputEvents.foreach {
+ event =>
+ event.getHeaders.get("test") should be("header")
+ }
+ val output = outputEvents.map(event => new String(event.getBody.array(), Charsets.UTF_8))
+ output should be (input)
+ }
+ } finally {
+ if (ssc != null) {
+ ssc.stop()
+ }
+ utils.close()
+ }
}
/** Setup and start the streaming context */
@@ -98,58 +88,6 @@ class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers w
outputBuffer
}
- /** Send data to the flume receiver and verify whether the data was received */
- private def writeAndVerify(
- input: Seq[String],
- testPort: Int,
- outputBuffer: ArrayBuffer[Seq[SparkFlumeEvent]],
- enableCompression: Boolean
- ) {
- val testAddress = new InetSocketAddress("localhost", testPort)
-
- val inputEvents = input.map { item =>
- val event = new AvroFlumeEvent
- event.setBody(ByteBuffer.wrap(item.getBytes(Charsets.UTF_8)))
- event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header"))
- event
- }
-
- eventually(timeout(10 seconds), interval(100 milliseconds)) {
- // if last attempted transceiver had succeeded, close it
- if (transceiver != null) {
- transceiver.close()
- transceiver = null
- }
-
- // Create transceiver
- transceiver = {
- if (enableCompression) {
- new NettyTransceiver(testAddress, new CompressionChannelFactory(6))
- } else {
- new NettyTransceiver(testAddress)
- }
- }
-
- // Create Avro client with the transceiver
- val client = SpecificRequestor.getClient(classOf[AvroSourceProtocol], transceiver)
- client should not be null
-
- // Send data
- val status = client.appendBatch(inputEvents.toList)
- status should be (avro.Status.OK)
- }
-
- eventually(timeout(10 seconds), interval(100 milliseconds)) {
- val outputEvents = outputBuffer.flatten.map { _.event }
- outputEvents.foreach {
- event =>
- event.getHeaders.get("test") should be("header")
- }
- val output = outputEvents.map(event => new String(event.getBody.array(), Charsets.UTF_8))
- output should be (input)
- }
- }
-
/** Class to create socket channel with compression */
private class CompressionChannelFactory(compressionLevel: Int)
extends NioClientSocketChannelFactory {