aboutsummaryrefslogtreecommitdiff
path: root/external/flume/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'external/flume/src/main')
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala28
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala10
2 files changed, 19 insertions, 19 deletions
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
index 34012b846e..df7605fe57 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -34,6 +34,8 @@ import org.apache.spark.util.Utils
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream._
+import org.apache.spark.Logging
+import org.apache.spark.streaming.receiver.Receiver
private[streaming]
class FlumeInputDStream[T: ClassTag](
@@ -41,9 +43,9 @@ class FlumeInputDStream[T: ClassTag](
host: String,
port: Int,
storageLevel: StorageLevel
-) extends NetworkInputDStream[SparkFlumeEvent](ssc_) {
+) extends ReceiverInputDStream[SparkFlumeEvent](ssc_) {
- override def getReceiver(): NetworkReceiver[SparkFlumeEvent] = {
+ override def getReceiver(): Receiver[SparkFlumeEvent] = {
new FlumeReceiver(host, port, storageLevel)
}
}
@@ -115,13 +117,13 @@ private[streaming] object SparkFlumeEvent {
private[streaming]
class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol {
override def append(event : AvroFlumeEvent) : Status = {
- receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event)
+ receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event))
Status.OK
}
override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = {
events.foreach (event =>
- receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event))
+ receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event)))
Status.OK
}
}
@@ -133,23 +135,21 @@ class FlumeReceiver(
host: String,
port: Int,
storageLevel: StorageLevel
- ) extends NetworkReceiver[SparkFlumeEvent] {
+ ) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {
- lazy val blockGenerator = new BlockGenerator(storageLevel)
+ lazy val responder = new SpecificResponder(
+ classOf[AvroSourceProtocol], new FlumeEventServer(this))
+ lazy val server = new NettyServer(responder, new InetSocketAddress(host, port))
- protected override def onStart() {
- val responder = new SpecificResponder(
- classOf[AvroSourceProtocol], new FlumeEventServer(this))
- val server = new NettyServer(responder, new InetSocketAddress(host, port))
- blockGenerator.start()
+ def onStart() {
server.start()
logInfo("Flume receiver started")
}
- protected override def onStop() {
- blockGenerator.stop()
+ def onStop() {
+ server.close()
logInfo("Flume receiver stopped")
}
- override def getLocationPreference = Some(host)
+ override def preferredLocation = Some(host)
}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
index 654ba451e7..499f3560ef 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
@@ -19,8 +19,8 @@ package org.apache.spark.streaming.flume
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
-import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaInputDStream, JavaStreamingContext, JavaDStream}
+import org.apache.spark.streaming.dstream.{ReceiverInputDStream, DStream}
object FlumeUtils {
/**
@@ -35,7 +35,7 @@ object FlumeUtils {
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): DStream[SparkFlumeEvent] = {
+ ): ReceiverInputDStream[SparkFlumeEvent] = {
val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel)
inputStream
}
@@ -50,7 +50,7 @@ object FlumeUtils {
jssc: JavaStreamingContext,
hostname: String,
port: Int
- ): JavaDStream[SparkFlumeEvent] = {
+ ): JavaReceiverInputDStream[SparkFlumeEvent] = {
createStream(jssc.ssc, hostname, port)
}
@@ -65,7 +65,7 @@ object FlumeUtils {
hostname: String,
port: Int,
storageLevel: StorageLevel
- ): JavaDStream[SparkFlumeEvent] = {
+ ): JavaReceiverInputDStream[SparkFlumeEvent] = {
createStream(jssc.ssc, hostname, port, storageLevel)
}
}