aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2013-01-21 11:39:30 +0530
committerPrashant Sharma <prashant.s@imaginea.com>2013-01-21 11:39:30 +0530
commit43bfd7bb21e6f8a9d083686a83bcd309a84f937e (patch)
tree89e15f087b3067118c3118b08077294b9d20b715 /streaming
parent86057ec7c868262763d1e31b3f3c94bd43eeafb3 (diff)
downloadspark-43bfd7bb21e6f8a9d083686a83bcd309a84f937e.tar.gz
spark-43bfd7bb21e6f8a9d083686a83bcd309a84f937e.tar.bz2
spark-43bfd7bb21e6f8a9d083686a83bcd309a84f937e.zip
Changed method name of createReceiver to getReceiver as it is not intended to be a factory.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala6
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala2
6 files changed, 8 insertions, 8 deletions
diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
index e4152f3a61..665842a683 100644
--- a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
@@ -113,7 +113,7 @@ class NetworkInputTracker(
*/
def startReceivers() {
val receivers = networkInputStreams.map(nis => {
- val rcvr = nis.createReceiver()
+ val rcvr = nis.getReceiver()
rcvr.setStreamId(nis.id)
rcvr
})
diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala
index efc7058480..83e9e59cad 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala
@@ -25,7 +25,7 @@ class FlumeInputDStream[T: ClassManifest](
storageLevel: StorageLevel
) extends NetworkInputDStream[SparkFlumeEvent](ssc_) {
- override def createReceiver(): NetworkReceiver[SparkFlumeEvent] = {
+ override def getReceiver(): NetworkReceiver[SparkFlumeEvent] = {
new FlumeReceiver(host, port, storageLevel)
}
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
index 2b4740bdf7..0eca400033 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
@@ -95,7 +95,7 @@ class KafkaInputDStream[T: ClassManifest](
}
} */
- def createReceiver(): NetworkReceiver[T] = {
+ def getReceiver(): NetworkReceiver[T] = {
new KafkaReceiver(host, port, groupId, topics, initialOffsets, storageLevel)
.asInstanceOf[NetworkReceiver[T]]
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
index aa6be95f30..e74c2aa448 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
@@ -20,7 +20,7 @@ import java.util.concurrent.ArrayBlockingQueue
/**
* Abstract class for defining any InputDStream that has to start a receiver on worker
* nodes to receive external data. Specific implementations of NetworkInputDStream must
- * define the createReceiver() function that creates the receiver object of type
+ * define the getReceiver() function that gets the receiver object of type
* [[spark.streaming.dstream.NetworkReceiver]] that will be sent to the workers to receive
* data.
* @param ssc_ Streaming context that will execute this input stream
@@ -34,11 +34,11 @@ abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : Streaming
val id = ssc.getNewNetworkStreamId()
/**
- * Creates the receiver object that will be sent to the worker nodes
+ * Gets the receiver object that will be sent to the worker nodes
* to receive data. This method needs to defined by any specific implementation
* of a NetworkInputDStream.
*/
- def createReceiver(): NetworkReceiver[T]
+ def getReceiver(): NetworkReceiver[T]
// Nothing to start or stop as both taken care of by the NetworkInputTracker.
def start() {}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala
index 290fab1ce0..74ffa1c2a2 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala
@@ -25,7 +25,7 @@ class RawInputDStream[T: ClassManifest](
storageLevel: StorageLevel
) extends NetworkInputDStream[T](ssc_ ) with Logging {
- def createReceiver(): NetworkReceiver[T] = {
+ def getReceiver(): NetworkReceiver[T] = {
new RawNetworkReceiver(host, port, storageLevel).asInstanceOf[NetworkReceiver[T]]
}
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
index d42027092b..4af839ad7f 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
@@ -15,7 +15,7 @@ class SocketInputDStream[T: ClassManifest](
storageLevel: StorageLevel
) extends NetworkInputDStream[T](ssc_) {
- def createReceiver(): NetworkReceiver[T] = {
+ def getReceiver(): NetworkReceiver[T] = {
new SocketReceiver(host, port, bytesToObjects, storageLevel)
}
}