diff options
Diffstat (limited to 'external/flume/src/main')
-rw-r--r-- | external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala | 28 | ||||
-rw-r--r-- | external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala | 10 |
2 files changed, 19 insertions, 19 deletions
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala index 34012b846e..df7605fe57 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala @@ -34,6 +34,8 @@ import org.apache.spark.util.Utils import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream._ +import org.apache.spark.Logging +import org.apache.spark.streaming.receiver.Receiver private[streaming] class FlumeInputDStream[T: ClassTag]( @@ -41,9 +43,9 @@ class FlumeInputDStream[T: ClassTag]( host: String, port: Int, storageLevel: StorageLevel -) extends NetworkInputDStream[SparkFlumeEvent](ssc_) { +) extends ReceiverInputDStream[SparkFlumeEvent](ssc_) { - override def getReceiver(): NetworkReceiver[SparkFlumeEvent] = { + override def getReceiver(): Receiver[SparkFlumeEvent] = { new FlumeReceiver(host, port, storageLevel) } } @@ -115,13 +117,13 @@ private[streaming] object SparkFlumeEvent { private[streaming] class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol { override def append(event : AvroFlumeEvent) : Status = { - receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event) + receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event)) Status.OK } override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = { events.foreach (event => - receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event)) + receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event))) Status.OK } } @@ -133,23 +135,21 @@ class FlumeReceiver( host: String, port: Int, storageLevel: StorageLevel - ) extends NetworkReceiver[SparkFlumeEvent] { + ) extends Receiver[SparkFlumeEvent](storageLevel) with Logging { - lazy val blockGenerator = new BlockGenerator(storageLevel) + lazy val responder = new SpecificResponder( + classOf[AvroSourceProtocol], new FlumeEventServer(this)) + lazy val server = new NettyServer(responder, new InetSocketAddress(host, port)) - protected override def onStart() { - val responder = new SpecificResponder( - classOf[AvroSourceProtocol], new FlumeEventServer(this)) - val server = new NettyServer(responder, new InetSocketAddress(host, port)) - blockGenerator.start() + def onStart() { server.start() logInfo("Flume receiver started") } - protected override def onStop() { - blockGenerator.stop() + def onStop() { + server.close() logInfo("Flume receiver stopped") } - override def getLocationPreference = Some(host) + override def preferredLocation = Some(host) } diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala index 654ba451e7..499f3560ef 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala @@ -19,8 +19,8 @@ package org.apache.spark.streaming.flume import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream} -import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaInputDStream, JavaStreamingContext, JavaDStream} +import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream} object FlumeUtils { /** @@ -35,7 +35,7 @@ object FlumeUtils { hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): DStream[SparkFlumeEvent] = { + ): ReceiverInputDStream[SparkFlumeEvent] = { val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel) inputStream } @@ -50,7 +50,7 @@ object FlumeUtils { jssc: JavaStreamingContext, hostname: String, port: Int - ): JavaDStream[SparkFlumeEvent] = { + ): JavaReceiverInputDStream[SparkFlumeEvent] = { createStream(jssc.ssc, hostname, port) } @@ -65,7 +65,7 @@ object FlumeUtils { hostname: String, port: Int, storageLevel: StorageLevel - ): JavaDStream[SparkFlumeEvent] = { + ): JavaReceiverInputDStream[SparkFlumeEvent] = { createStream(jssc.ssc, hostname, port, storageLevel) } } |