diff options
Diffstat (limited to 'external/flume')
4 files changed, 26 insertions, 24 deletions
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala index 34012b846e..df7605fe57 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala @@ -34,6 +34,8 @@ import org.apache.spark.util.Utils import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream._ +import org.apache.spark.Logging +import org.apache.spark.streaming.receiver.Receiver private[streaming] class FlumeInputDStream[T: ClassTag]( @@ -41,9 +43,9 @@ class FlumeInputDStream[T: ClassTag]( host: String, port: Int, storageLevel: StorageLevel -) extends NetworkInputDStream[SparkFlumeEvent](ssc_) { +) extends ReceiverInputDStream[SparkFlumeEvent](ssc_) { - override def getReceiver(): NetworkReceiver[SparkFlumeEvent] = { + override def getReceiver(): Receiver[SparkFlumeEvent] = { new FlumeReceiver(host, port, storageLevel) } } @@ -115,13 +117,13 @@ private[streaming] object SparkFlumeEvent { private[streaming] class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol { override def append(event : AvroFlumeEvent) : Status = { - receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event) + receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event)) Status.OK } override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = { events.foreach (event => - receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event)) + receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event))) Status.OK } } @@ -133,23 +135,21 @@ class FlumeReceiver( host: String, port: Int, storageLevel: StorageLevel - ) extends NetworkReceiver[SparkFlumeEvent] { + ) extends Receiver[SparkFlumeEvent](storageLevel) with Logging { - lazy val blockGenerator = new BlockGenerator(storageLevel) + lazy val responder = new SpecificResponder( + classOf[AvroSourceProtocol], new FlumeEventServer(this)) + lazy val server = new NettyServer(responder, new InetSocketAddress(host, port)) - protected override def onStart() { - val responder = new SpecificResponder( - classOf[AvroSourceProtocol], new FlumeEventServer(this)) - val server = new NettyServer(responder, new InetSocketAddress(host, port)) - blockGenerator.start() + def onStart() { server.start() logInfo("Flume receiver started") } - protected override def onStop() { - blockGenerator.stop() + def onStop() { + server.close() logInfo("Flume receiver stopped") } - override def getLocationPreference = Some(host) + override def preferredLocation = Some(host) } diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala index 654ba451e7..499f3560ef 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala @@ -19,8 +19,8 @@ package org.apache.spark.streaming.flume import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream} -import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaInputDStream, JavaStreamingContext, JavaDStream} +import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream} object FlumeUtils { /** @@ -35,7 +35,7 @@ object FlumeUtils { hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): DStream[SparkFlumeEvent] = { + ): ReceiverInputDStream[SparkFlumeEvent] = { val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel) inputStream } @@ -50,7 +50,7 @@ object FlumeUtils { jssc: JavaStreamingContext, hostname: String, port: Int - ): JavaDStream[SparkFlumeEvent] = { + ): JavaReceiverInputDStream[SparkFlumeEvent] = { createStream(jssc.ssc, hostname, port) } @@ -65,7 +65,7 @@ object FlumeUtils { hostname: String, port: Int, storageLevel: StorageLevel - ): JavaDStream[SparkFlumeEvent] = { + ): JavaReceiverInputDStream[SparkFlumeEvent] = { createStream(jssc.ssc, hostname, port, storageLevel) } } diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java index 733389b98d..e0ad4f1015 100644 --- a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java +++ b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java @@ -19,16 +19,16 @@ package org.apache.spark.streaming.flume; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.LocalJavaStreamingContext; -import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.junit.Test; public class JavaFlumeStreamSuite extends LocalJavaStreamingContext { @Test public void testFlumeStream() { // tests the API, does not actually test data receiving - JavaDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345); - JavaDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345, + JavaReceiverInputDStream<SparkFlumeEvent> test1 = FlumeUtils.createStream(ssc, "localhost", 12345); + JavaReceiverInputDStream<SparkFlumeEvent> test2 = FlumeUtils.createStream(ssc, "localhost", 12345, StorageLevel.MEMORY_AND_DISK_SER_2()); } } diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala index 8bc43972ab..78603200d2 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala @@ -31,6 +31,7 @@ import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase} import org.apache.spark.streaming.util.ManualClock +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream class FlumeStreamSuite extends TestSuiteBase { @@ -39,10 +40,11 @@ class FlumeStreamSuite extends TestSuiteBase { test("flume input stream") { // Set up the streaming context and input streams val ssc = new StreamingContext(conf, batchDuration) - val flumeStream = FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK) + val flumeStream: JavaReceiverInputDStream[SparkFlumeEvent] = + FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK) val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]] with SynchronizedBuffer[Seq[SparkFlumeEvent]] - val outputStream = new TestOutputStream(flumeStream, outputBuffer) + val outputStream = new TestOutputStream(flumeStream.receiverInputDStream, outputBuffer) outputStream.register() ssc.start() |