aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2012-12-07 15:01:15 -0800
committerPatrick Wendell <pwendell@gmail.com>2012-12-07 16:42:39 -0800
commit3ff9710265d4bb518b89461cfb0fcc771e61a726 (patch)
treee5394ba308955c862406ceb7bb3b3ce35dd253d0 /streaming/src
parentc36ca10241991d46f2f1513b2c0c5e369d8b34f9 (diff)
downloadspark-3ff9710265d4bb518b89461cfb0fcc771e61a726.tar.gz
spark-3ff9710265d4bb518b89461cfb0fcc771e61a726.tar.bz2
spark-3ff9710265d4bb518b89461cfb0fcc771e61a726.zip
Adding Flume InputDStream
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala130
-rw-r--r--streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala6
-rw-r--r--streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala13
-rw-r--r--streaming/src/main/scala/spark/streaming/RawInputDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/SocketInputDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala11
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/FlumeEventCount.scala29
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala59
8 files changed, 246 insertions, 6 deletions
diff --git a/streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala b/streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala
new file mode 100644
index 0000000000..9c403278c3
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala
@@ -0,0 +1,130 @@
+package spark.streaming
+
+import java.io.{ObjectInput, ObjectOutput, Externalizable}
+import spark.storage.StorageLevel
+import org.apache.flume.source.avro.AvroSourceProtocol
+import org.apache.flume.source.avro.AvroFlumeEvent
+import org.apache.flume.source.avro.Status
+import org.apache.avro.ipc.specific.SpecificResponder
+import org.apache.avro.ipc.NettyServer
+import java.net.InetSocketAddress
+import collection.JavaConversions._
+import spark.Utils
+import java.nio.ByteBuffer
+
+class FlumeInputDStream[T: ClassManifest](
+ @transient ssc_ : StreamingContext,
+ host: String,
+ port: Int,
+ storageLevel: StorageLevel
+) extends NetworkInputDStream[SparkFlumeEvent](ssc_) {
+
+ override def createReceiver(): NetworkReceiver[SparkFlumeEvent] = {
+ new FlumeReceiver(id, host, port, storageLevel)
+ }
+}
+
+/**
+ * A wrapper class for AvroFlumeEvent's with a custom serialization format.
+ *
+ * This is necessary because AvroFlumeEvent uses inner data structures
+ * which are not serializable.
+ */
+class SparkFlumeEvent() extends Externalizable {
+ var event : AvroFlumeEvent = new AvroFlumeEvent()
+
+ /* De-serialize from bytes. */
+ def readExternal(in: ObjectInput) {
+ val bodyLength = in.readInt()
+ val bodyBuff = new Array[Byte](bodyLength)
+ in.read(bodyBuff)
+
+ val numHeaders = in.readInt()
+ val headers = new java.util.HashMap[CharSequence, CharSequence]
+
+ for (i <- 0 until numHeaders) {
+ val keyLength = in.readInt()
+ val keyBuff = new Array[Byte](keyLength)
+ in.read(keyBuff)
+ val key : String = Utils.deserialize(keyBuff)
+
+ val valLength = in.readInt()
+ val valBuff = new Array[Byte](valLength)
+ in.read(valBuff)
+ val value : String = Utils.deserialize(valBuff)
+
+ headers.put(key, value)
+ }
+
+ event.setBody(ByteBuffer.wrap(bodyBuff))
+ event.setHeaders(headers)
+ }
+
+ /* Serialize to bytes. */
+ def writeExternal(out: ObjectOutput) {
+ val body = event.getBody.array()
+ out.writeInt(body.length)
+ out.write(body)
+
+ val numHeaders = event.getHeaders.size()
+ out.writeInt(numHeaders)
+ for ((k, v) <- event.getHeaders) {
+ val keyBuff = Utils.serialize(k.toString)
+ out.writeInt(keyBuff.length)
+ out.write(keyBuff)
+ val valBuff = Utils.serialize(v.toString)
+ out.writeInt(valBuff.length)
+ out.write(valBuff)
+ }
+ }
+}
+
+object SparkFlumeEvent {
+ def fromAvroFlumeEvent(in : AvroFlumeEvent) : SparkFlumeEvent = {
+ val event = new SparkFlumeEvent
+ event.event = in
+ event
+ }
+}
+
+/** A simple server that implements Flume's Avro protocol. */
+class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol {
+ override def append(event : AvroFlumeEvent) : Status = {
+ receiver.dataHandler += SparkFlumeEvent.fromAvroFlumeEvent(event)
+ Status.OK
+ }
+
+ override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = {
+ events.foreach (event =>
+ receiver.dataHandler += SparkFlumeEvent.fromAvroFlumeEvent(event))
+ Status.OK
+ }
+}
+
+/** A NetworkReceiver which listens for events using the
+ * Flume Avro interface.*/
+class FlumeReceiver(
+ streamId: Int,
+ host: String,
+ port: Int,
+ storageLevel: StorageLevel
+ ) extends NetworkReceiver[SparkFlumeEvent](streamId) {
+
+ lazy val dataHandler = new DataHandler(this, storageLevel)
+
+ protected override def onStart() {
+ val responder = new SpecificResponder(
+ classOf[AvroSourceProtocol], new FlumeEventServer(this));
+ val server = new NettyServer(responder, new InetSocketAddress(host, port));
+ dataHandler.start()
+ server.start()
+ logInfo("Flume receiver started")
+ }
+
+ protected override def onStop() {
+ dataHandler.stop()
+ logInfo("Flume receiver stopped")
+ }
+
+ override def getLocationConstraint = Some(host)
+} \ No newline at end of file
diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala
index d3f37b8b0e..052fc8bb74 100644
--- a/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/NetworkInputDStream.scala
@@ -8,7 +8,6 @@ import spark.streaming.util.{RecurringTimer, SystemClock}
import spark.storage.StorageLevel
import java.nio.ByteBuffer
-import java.util.concurrent.ArrayBlockingQueue
import akka.actor.{Props, Actor}
import akka.pattern.ask
@@ -63,6 +62,9 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri
/** This method will be called to stop receiving data. */
protected def onStop()
+ /** This method conveys a placement constraint (hostname) for this receiver. */
+ def getLocationConstraint() : Option[String] = None
+
/**
* This method starts the receiver. First is accesses all the lazy members to
* materialize them. Then it calls the user-defined onStart() method to start
@@ -151,6 +153,4 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri
tracker ! DeregisterReceiver(streamId, msg)
}
}
-
}
-
diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
index 73ba877085..56661c2615 100644
--- a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
@@ -98,7 +98,18 @@ class NetworkInputTracker(
def startReceivers() {
val receivers = networkInputStreams.map(_.createReceiver())
- val tempRDD = ssc.sc.makeRDD(receivers, receivers.size)
+
+ // We only honor constraints if all receivers have them
+ val hasLocationConstraints = receivers.map(_.getLocationConstraint().isDefined).reduce(_ && _)
+
+ val tempRDD =
+ if (hasLocationConstraints) {
+ val receiversWithConstraints = receivers.map(r => (r, Seq(r.getLocationConstraint().toString)))
+ ssc.sc.makeLocalityConstrainedRDD[NetworkReceiver[_]](receiversWithConstraints)
+ }
+ else {
+ ssc.sc.makeRDD(receivers, receivers.size)
+ }
val startReceiver = (iterator: Iterator[NetworkReceiver[_]]) => {
if (!iterator.hasNext) {
diff --git a/streaming/src/main/scala/spark/streaming/RawInputDStream.scala b/streaming/src/main/scala/spark/streaming/RawInputDStream.scala
index d5db8e787d..fd51ed47a5 100644
--- a/streaming/src/main/scala/spark/streaming/RawInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/RawInputDStream.scala
@@ -31,6 +31,8 @@ class RawNetworkReceiver(streamId: Int, host: String, port: Int, storageLevel: S
var blockPushingThread: Thread = null
+ override def getLocationConstraint = None
+
def onStart() {
// Open a socket to the target address and keep reading from it
logInfo("Connecting to " + host + ":" + port)
diff --git a/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala b/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala
index ff99d50b76..ebbb17a39a 100644
--- a/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala
@@ -34,6 +34,8 @@ class SocketReceiver[T: ClassManifest](
lazy protected val dataHandler = new DataHandler(this, storageLevel)
+ override def getLocationConstraint = None
+
protected def onStart() {
logInfo("Connecting to " + host + ":" + port)
val socket = new Socket(host, port)
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 8153dd4567..ce47bcb2da 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -15,6 +15,7 @@ import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
+import org.apache.flume.source.avro.AvroFlumeEvent
import org.apache.hadoop.fs.Path
import java.util.UUID
import spark.util.MetadataCleaner
@@ -166,6 +167,16 @@ class StreamingContext private (
inputStream
}
+ def flumeStream (
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): DStream[SparkFlumeEvent] = {
+ val inputStream = new FlumeInputDStream(this, hostname, port, storageLevel)
+ graph.addInputStream(inputStream)
+ inputStream
+ }
+
+
def rawNetworkStream[T: ClassManifest](
hostname: String,
port: Int,
diff --git a/streaming/src/main/scala/spark/streaming/examples/FlumeEventCount.scala b/streaming/src/main/scala/spark/streaming/examples/FlumeEventCount.scala
new file mode 100644
index 0000000000..d76c92fdd5
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/examples/FlumeEventCount.scala
@@ -0,0 +1,29 @@
+package spark.streaming.examples
+
+import spark.util.IntParam
+import spark.storage.StorageLevel
+import spark.streaming._
+
+object FlumeEventCount {
+ def main(args: Array[String]) {
+ if (args.length != 4) {
+ System.err.println(
+ "Usage: FlumeEventCount <master> <host> <port> <batchMillis>")
+ System.exit(1)
+ }
+
+ val Array(master, host, IntParam(port), IntParam(batchMillis)) = args
+
+ // Create the context and set the batch size
+ val ssc = new StreamingContext(master, "FlumeEventCount",
+ Milliseconds(batchMillis))
+
+ // Create a flume stream
+ val stream = ssc.flumeStream(host,port,StorageLevel.MEMORY_ONLY)
+
+ // Print out the count of events received from this server in each batch
+ stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
+
+ ssc.start()
+ }
+}
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index e98c096725..ed9a659092 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -1,6 +1,6 @@
package spark.streaming
-import java.net.{SocketException, Socket, ServerSocket}
+import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket}
import java.io.{File, BufferedWriter, OutputStreamWriter}
import java.util.concurrent.{TimeUnit, ArrayBlockingQueue}
import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
@@ -10,7 +10,14 @@ import spark.Logging
import scala.util.Random
import org.apache.commons.io.FileUtils
import org.scalatest.BeforeAndAfter
-
+import org.apache.flume.source.avro.AvroSourceProtocol
+import org.apache.flume.source.avro.AvroFlumeEvent
+import org.apache.flume.source.avro.Status
+import org.apache.avro.ipc.{specific, NettyTransceiver}
+import org.apache.avro.ipc.specific.SpecificRequestor
+import java.nio.ByteBuffer
+import collection.JavaConversions._
+import java.nio.charset.Charset
class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
@@ -123,6 +130,54 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
ssc.stop()
}
+ test("flume input stream") {
+ // Set up the streaming context and input streams
+ val ssc = new StreamingContext(master, framework, batchDuration)
+ val flumeStream = ssc.flumeStream("localhost", 33333, StorageLevel.MEMORY_AND_DISK)
+ val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
+ with SynchronizedBuffer[Seq[SparkFlumeEvent]]
+ val outputStream = new TestOutputStream(flumeStream, outputBuffer)
+ ssc.registerOutputStream(outputStream)
+ ssc.start()
+
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ val input = Seq(1, 2, 3, 4, 5)
+
+ val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", 33333));
+ val client = SpecificRequestor.getClient(
+ classOf[AvroSourceProtocol], transceiver);
+
+ for (i <- 0 until input.size) {
+ val event = new AvroFlumeEvent
+ event.setBody(ByteBuffer.wrap(input(i).toString.getBytes()))
+ event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header"))
+ client.append(event)
+ Thread.sleep(500)
+ clock.addToTime(batchDuration.milliseconds)
+ }
+
+ val startTime = System.currentTimeMillis()
+ while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
+ logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size)
+ Thread.sleep(100)
+ }
+ Thread.sleep(1000)
+ val timeTaken = System.currentTimeMillis() - startTime
+ assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
+ logInfo("Stopping context")
+ ssc.stop()
+
+ val decoder = Charset.forName("UTF-8").newDecoder()
+
+ assert(outputBuffer.size === input.length)
+ for (i <- 0 until outputBuffer.size) {
+ assert(outputBuffer(i).size === 1)
+ val str = decoder.decode(outputBuffer(i).head.event.getBody)
+ assert(str.toString === input(i).toString)
+ assert(outputBuffer(i).head.event.getHeaders.get("test") === "header")
+ }
+ }
+
test("file input stream") {
// Create a temporary directory