diff options
author | Patrick Wendell <pwendell@gmail.com> | 2012-12-07 15:01:15 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2012-12-07 16:42:39 -0800 |
commit | 3ff9710265d4bb518b89461cfb0fcc771e61a726 (patch) | |
tree | e5394ba308955c862406ceb7bb3b3ce35dd253d0 /streaming | |
parent | c36ca10241991d46f2f1513b2c0c5e369d8b34f9 (diff) | |
download | spark-3ff9710265d4bb518b89461cfb0fcc771e61a726.tar.gz spark-3ff9710265d4bb518b89461cfb0fcc771e61a726.tar.bz2 spark-3ff9710265d4bb518b89461cfb0fcc771e61a726.zip |
Adding Flume InputDStream
Diffstat (limited to 'streaming')
8 files changed, 246 insertions, 6 deletions
diff --git a/streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala b/streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala new file mode 100644 index 0000000000..9c403278c3 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala @@ -0,0 +1,130 @@ +package spark.streaming + +import java.io.{ObjectInput, ObjectOutput, Externalizable} +import spark.storage.StorageLevel +import org.apache.flume.source.avro.AvroSourceProtocol +import org.apache.flume.source.avro.AvroFlumeEvent +import org.apache.flume.source.avro.Status +import org.apache.avro.ipc.specific.SpecificResponder +import org.apache.avro.ipc.NettyServer +import java.net.InetSocketAddress +import collection.JavaConversions._ +import spark.Utils +import java.nio.ByteBuffer + +class FlumeInputDStream[T: ClassManifest]( + @transient ssc_ : StreamingContext, + host: String, + port: Int, + storageLevel: StorageLevel +) extends NetworkInputDStream[SparkFlumeEvent](ssc_) { + + override def createReceiver(): NetworkReceiver[SparkFlumeEvent] = { + new FlumeReceiver(id, host, port, storageLevel) + } +} + +/** + * A wrapper class for AvroFlumeEvent's with a custom serialization format. + * + * This is necessary because AvroFlumeEvent uses inner data structures + * which are not serializable. + */ +class SparkFlumeEvent() extends Externalizable { + var event : AvroFlumeEvent = new AvroFlumeEvent() + + /* De-serialize from bytes. */ + def readExternal(in: ObjectInput) { + val bodyLength = in.readInt() + val bodyBuff = new Array[Byte](bodyLength) + in.read(bodyBuff) + + val numHeaders = in.readInt() + val headers = new java.util.HashMap[CharSequence, CharSequence] + + for (i <- 0 until numHeaders) { + val keyLength = in.readInt() + val keyBuff = new Array[Byte](keyLength) + in.read(keyBuff) + val key : String = Utils.deserialize(keyBuff) + + val valLength = in.readInt() + val valBuff = new Array[Byte](valLength) + in.read(valBuff) + val value : String = Utils.deserialize(valBuff) + + headers.put(key, value) + } + + event.setBody(ByteBuffer.wrap(bodyBuff)) + event.setHeaders(headers) + } + + /* Serialize to bytes. */ + def writeExternal(out: ObjectOutput) { + val body = event.getBody.array() + out.writeInt(body.length) + out.write(body) + + val numHeaders = event.getHeaders.size() + out.writeInt(numHeaders) + for ((k, v) <- event.getHeaders) { + val keyBuff = Utils.serialize(k.toString) + out.writeInt(keyBuff.length) + out.write(keyBuff) + val valBuff = Utils.serialize(v.toString) + out.writeInt(valBuff.length) + out.write(valBuff) + } + } +} + +object SparkFlumeEvent { + def fromAvroFlumeEvent(in : AvroFlumeEvent) : SparkFlumeEvent = { + val event = new SparkFlumeEvent + event.event = in + event + } +} + +/** A simple server that implements Flume's Avro protocol. */ +class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol { + override def append(event : AvroFlumeEvent) : Status = { + receiver.dataHandler += SparkFlumeEvent.fromAvroFlumeEvent(event) + Status.OK + } + + override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = { + events.foreach (event => + receiver.dataHandler += SparkFlumeEvent.fromAvroFlumeEvent(event)) + Status.OK + } +} + +/** A NetworkReceiver which listens for events using the + * Flume Avro interface.*/ +class FlumeReceiver( + streamId: Int, + host: String, + port: Int, + storageLevel: StorageLevel + ) extends NetworkReceiver[SparkFlumeEvent](streamId) { + + lazy val dataHandler = new DataHandler(this, storageLevel) + + protected override def onStart() { + val responder = new SpecificResponder( + classOf[AvroSourceProtocol], new FlumeEventServer(this)); + val server = new NettyServer(responder, new InetSocketAddress(host, port)); + dataHandler.start() + server.start() + logInfo("Flume receiver started") + } + + protected override def onStop() { + dataHandler.stop() + logInfo("Flume receiver stopped") + } + + override def getLocationConstraint = Some(host) +}
\ No newline at end of file diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala index d3f37b8b0e..052fc8bb74 100644 --- a/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala @@ -8,7 +8,6 @@ import spark.streaming.util.{RecurringTimer, SystemClock} import spark.storage.StorageLevel import java.nio.ByteBuffer -import java.util.concurrent.ArrayBlockingQueue import akka.actor.{Props, Actor} import akka.pattern.ask @@ -63,6 +62,9 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri /** This method will be called to stop receiving data. */ protected def onStop() + /** This method conveys a placement constraint (hostname) for this receiver. */ + def getLocationConstraint() : Option[String] = None + /** * This method starts the receiver. First is accesses all the lazy members to * materialize them. Then it calls the user-defined onStart() method to start @@ -151,6 +153,4 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri tracker ! DeregisterReceiver(streamId, msg) } } - } - diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala index 73ba877085..56661c2615 100644 --- a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala +++ b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala @@ -98,7 +98,18 @@ class NetworkInputTracker( def startReceivers() { val receivers = networkInputStreams.map(_.createReceiver()) - val tempRDD = ssc.sc.makeRDD(receivers, receivers.size) + + // We only honor constraints if all receivers have them + val hasLocationConstraints = receivers.map(_.getLocationConstraint().isDefined).reduce(_ && _) + + val tempRDD = + if (hasLocationConstraints) { + val receiversWithConstraints = receivers.map(r => (r, Seq(r.getLocationConstraint().toString))) + ssc.sc.makeLocalityConstrainedRDD[NetworkReceiver[_]](receiversWithConstraints) + } + else { + ssc.sc.makeRDD(receivers, receivers.size) + } val startReceiver = (iterator: Iterator[NetworkReceiver[_]]) => { if (!iterator.hasNext) { diff --git a/streaming/src/main/scala/spark/streaming/RawInputDStream.scala b/streaming/src/main/scala/spark/streaming/RawInputDStream.scala index d5db8e787d..fd51ed47a5 100644 --- a/streaming/src/main/scala/spark/streaming/RawInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/RawInputDStream.scala @@ -31,6 +31,8 @@ class RawNetworkReceiver(streamId: Int, host: String, port: Int, storageLevel: S var blockPushingThread: Thread = null + override def getLocationConstraint = None + def onStart() { // Open a socket to the target address and keep reading from it logInfo("Connecting to " + host + ":" + port) diff --git a/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala b/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala index ff99d50b76..ebbb17a39a 100644 --- a/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala @@ -34,6 +34,8 @@ class SocketReceiver[T: ClassManifest]( lazy protected val dataHandler = new DataHandler(this, storageLevel) + override def getLocationConstraint = None + protected def onStart() { logInfo("Connecting to " + host + ":" + port) val socket = new Socket(host, port) diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 8153dd4567..ce47bcb2da 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -15,6 +15,7 @@ import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat +import org.apache.flume.source.avro.AvroFlumeEvent import org.apache.hadoop.fs.Path import java.util.UUID import spark.util.MetadataCleaner @@ -166,6 +167,16 @@ class StreamingContext private ( inputStream } + def flumeStream ( + hostname: String, + port: Int, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): DStream[SparkFlumeEvent] = { + val inputStream = new FlumeInputDStream(this, hostname, port, storageLevel) + graph.addInputStream(inputStream) + inputStream + } + + def rawNetworkStream[T: ClassManifest]( hostname: String, port: Int, diff --git a/streaming/src/main/scala/spark/streaming/examples/FlumeEventCount.scala b/streaming/src/main/scala/spark/streaming/examples/FlumeEventCount.scala new file mode 100644 index 0000000000..d76c92fdd5 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/examples/FlumeEventCount.scala @@ -0,0 +1,29 @@ +package spark.streaming.examples + +import spark.util.IntParam +import spark.storage.StorageLevel +import spark.streaming._ + +object FlumeEventCount { + def main(args: Array[String]) { + if (args.length != 4) { + System.err.println( + "Usage: FlumeEventCount <master> <host> <port> <batchMillis>") + System.exit(1) + } + + val Array(master, host, IntParam(port), IntParam(batchMillis)) = args + + // Create the context and set the batch size + val ssc = new StreamingContext(master, "FlumeEventCount", + Milliseconds(batchMillis)) + + // Create a flume stream + val stream = ssc.flumeStream(host,port,StorageLevel.MEMORY_ONLY) + + // Print out the count of events received from this server in each batch + stream.count().map(cnt => "Received " + cnt + " flume events." ).print() + + ssc.start() + } +} diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index e98c096725..ed9a659092 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -1,6 +1,6 @@ package spark.streaming -import java.net.{SocketException, Socket, ServerSocket} +import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket} import java.io.{File, BufferedWriter, OutputStreamWriter} import java.util.concurrent.{TimeUnit, ArrayBlockingQueue} import collection.mutable.{SynchronizedBuffer, ArrayBuffer} @@ -10,7 +10,14 @@ import spark.Logging import scala.util.Random import org.apache.commons.io.FileUtils import org.scalatest.BeforeAndAfter - +import org.apache.flume.source.avro.AvroSourceProtocol +import org.apache.flume.source.avro.AvroFlumeEvent +import org.apache.flume.source.avro.Status +import org.apache.avro.ipc.{specific, NettyTransceiver} +import org.apache.avro.ipc.specific.SpecificRequestor +import java.nio.ByteBuffer +import collection.JavaConversions._ +import java.nio.charset.Charset class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { @@ -123,6 +130,54 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { ssc.stop() } + test("flume input stream") { + // Set up the streaming context and input streams + val ssc = new StreamingContext(master, framework, batchDuration) + val flumeStream = ssc.flumeStream("localhost", 33333, StorageLevel.MEMORY_AND_DISK) + val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]] + with SynchronizedBuffer[Seq[SparkFlumeEvent]] + val outputStream = new TestOutputStream(flumeStream, outputBuffer) + ssc.registerOutputStream(outputStream) + ssc.start() + + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + val input = Seq(1, 2, 3, 4, 5) + + val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", 33333)); + val client = SpecificRequestor.getClient( + classOf[AvroSourceProtocol], transceiver); + + for (i <- 0 until input.size) { + val event = new AvroFlumeEvent + event.setBody(ByteBuffer.wrap(input(i).toString.getBytes())) + event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header")) + client.append(event) + Thread.sleep(500) + clock.addToTime(batchDuration.milliseconds) + } + + val startTime = System.currentTimeMillis() + while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { + logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size) + Thread.sleep(100) + } + Thread.sleep(1000) + val timeTaken = System.currentTimeMillis() - startTime + assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") + logInfo("Stopping context") + ssc.stop() + + val decoder = Charset.forName("UTF-8").newDecoder() + + assert(outputBuffer.size === input.length) + for (i <- 0 until outputBuffer.size) { + assert(outputBuffer(i).size === 1) + val str = decoder.decode(outputBuffer(i).head.event.getBody) + assert(str.toString === input(i).toString) + assert(outputBuffer(i).head.event.getHeaders.get("test") === "header") + } + } + test("file input stream") { // Create a temporary directory |