aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala14
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala26
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala8
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala2
7 files changed, 29 insertions, 29 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala b/examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala
index 99ed4cdc1c..2532f27d1a 100644
--- a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala
+++ b/examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala
@@ -22,7 +22,7 @@ class TwitterInputDStream(
storageLevel: StorageLevel
) extends NetworkInputDStream[Status](ssc_) {
- override def createReceiver(): NetworkReceiver[Status] = {
+ override def getReceiver(): NetworkReceiver[Status] = {
new TwitterReceiver(username, password, filters, storageLevel)
}
}
diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
index e4152f3a61..4ddd0f8680 100644
--- a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
@@ -23,7 +23,7 @@ private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) ext
*/
private[streaming]
class NetworkInputTracker(
- @transient ssc: StreamingContext,
+ @transient ssc: StreamingContext,
@transient networkInputStreams: Array[NetworkInputDStream[_]])
extends Logging {
@@ -65,12 +65,12 @@ class NetworkInputTracker(
def receive = {
case RegisterReceiver(streamId, receiverActor) => {
if (!networkInputStreamMap.contains(streamId)) {
- throw new Exception("Register received for unexpected id " + streamId)
+ throw new Exception("Register received for unexpected id " + streamId)
}
receiverInfo += ((streamId, receiverActor))
logInfo("Registered receiver for network stream " + streamId + " from " + sender.path.address)
sender ! true
- }
+ }
case AddBlocks(streamId, blockIds, metadata) => {
val tmp = receivedBlockIds.synchronized {
if (!receivedBlockIds.contains(streamId)) {
@@ -95,8 +95,8 @@ class NetworkInputTracker(
/** This thread class runs all the receivers on the cluster. */
class ReceiverExecutor extends Thread {
val env = ssc.env
-
- override def run() {
+
+ override def run() {
try {
SparkEnv.set(env)
startReceivers()
@@ -113,7 +113,7 @@ class NetworkInputTracker(
*/
def startReceivers() {
val receivers = networkInputStreams.map(nis => {
- val rcvr = nis.createReceiver()
+ val rcvr = nis.getReceiver()
rcvr.setStreamId(nis.id)
rcvr
})
@@ -141,7 +141,7 @@ class NetworkInputTracker(
// Distribute the receivers and start them
ssc.sc.runJob(tempRDD, startReceiver)
}
-
+
/** Stops the receivers. */
def stopReceivers() {
// Signal the receivers to stop
diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala
index efc7058480..c9644b3a83 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala
@@ -25,7 +25,7 @@ class FlumeInputDStream[T: ClassManifest](
storageLevel: StorageLevel
) extends NetworkInputDStream[SparkFlumeEvent](ssc_) {
- override def createReceiver(): NetworkReceiver[SparkFlumeEvent] = {
+ override def getReceiver(): NetworkReceiver[SparkFlumeEvent] = {
new FlumeReceiver(host, port, storageLevel)
}
}
@@ -134,4 +134,4 @@ class FlumeReceiver(
}
override def getLocationPreference = Some(host)
-} \ No newline at end of file
+}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
index 2b4740bdf7..682cb82709 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
@@ -31,7 +31,7 @@ case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any],
/**
* Input stream that pulls messages from a Kafka Broker.
- *
+ *
* @param host Zookeper hostname.
* @param port Zookeper port.
* @param groupId The group id for this consumer.
@@ -54,13 +54,13 @@ class KafkaInputDStream[T: ClassManifest](
// Metadata that keeps track of which messages have already been consumed.
var savedOffsets = HashMap[Long, Map[KafkaPartitionKey, Long]]()
-
+
/* NOT USED - Originally intended for fault-tolerance
-
+
// In case of a failure, the offets for a particular timestamp will be restored.
@transient var restoredOffsets : Map[KafkaPartitionKey, Long] = null
-
+
override protected[streaming] def addMetadata(metadata: Any) {
metadata match {
case x : KafkaInputDStreamMetadata =>
@@ -88,14 +88,14 @@ class KafkaInputDStream[T: ClassManifest](
override protected[streaming] def restoreCheckpointData() {
super.restoreCheckpointData()
logInfo("Restoring KafkaDStream checkpoint data.")
- checkpointData match {
- case x : KafkaDStreamCheckpointData =>
+ checkpointData match {
+ case x : KafkaDStreamCheckpointData =>
restoredOffsets = x.savedOffsets
logInfo("Restored KafkaDStream offsets: " + savedOffsets)
}
} */
- def createReceiver(): NetworkReceiver[T] = {
+ def getReceiver(): NetworkReceiver[T] = {
new KafkaReceiver(host, port, groupId, topics, initialOffsets, storageLevel)
.asInstanceOf[NetworkReceiver[T]]
}
@@ -103,7 +103,7 @@ class KafkaInputDStream[T: ClassManifest](
private[streaming]
class KafkaReceiver(host: String, port: Int, groupId: String,
- topics: Map[String, Int], initialOffsets: Map[KafkaPartitionKey, Long],
+ topics: Map[String, Int], initialOffsets: Map[KafkaPartitionKey, Long],
storageLevel: StorageLevel) extends NetworkReceiver[Any] {
// Timeout for establishing a connection to Zookeper in ms.
@@ -130,7 +130,7 @@ class KafkaReceiver(host: String, port: Int, groupId: String,
val zooKeeperEndPoint = host + ":" + port
logInfo("Starting Kafka Consumer Stream with group: " + groupId)
logInfo("Initial offsets: " + initialOffsets.toString)
-
+
// Zookeper connection properties
val props = new Properties()
props.put("zk.connect", zooKeeperEndPoint)
@@ -161,7 +161,7 @@ class KafkaReceiver(host: String, port: Int, groupId: String,
offsets.foreach { case(key, offset) =>
val topicDirs = new ZKGroupTopicDirs(key.groupId, key.topic)
val partitionName = key.brokerId + "-" + key.partId
- updatePersistentPath(consumerConnector.zkClient,
+ updatePersistentPath(consumerConnector.zkClient,
topicDirs.consumerOffsetDir + "/" + partitionName, offset.toString)
}
}
@@ -174,7 +174,7 @@ class KafkaReceiver(host: String, port: Int, groupId: String,
blockGenerator += msgAndMetadata.message
// Updating the offet. The key is (broker, topic, group, partition).
- val key = KafkaPartitionKey(msgAndMetadata.topicInfo.brokerId, msgAndMetadata.topic,
+ val key = KafkaPartitionKey(msgAndMetadata.topicInfo.brokerId, msgAndMetadata.topic,
groupId, msgAndMetadata.topicInfo.partition.partId)
val offset = msgAndMetadata.topicInfo.getConsumeOffset
offsets.put(key, offset)
@@ -182,12 +182,12 @@ class KafkaReceiver(host: String, port: Int, groupId: String,
// Keep on handling messages
true
- }
+ }
}
}
// NOT USED - Originally intended for fault-tolerance
- // class KafkaDataHandler(receiver: KafkaReceiver, storageLevel: StorageLevel)
+ // class KafkaDataHandler(receiver: KafkaReceiver, storageLevel: StorageLevel)
// extends BufferingBlockCreator[Any](receiver, storageLevel) {
// override def createBlock(blockId: String, iterator: Iterator[Any]) : Block = {
diff --git a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
index aa6be95f30..9142deb9ed 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
@@ -20,7 +20,7 @@ import java.util.concurrent.ArrayBlockingQueue
/**
* Abstract class for defining any InputDStream that has to start a receiver on worker
* nodes to receive external data. Specific implementations of NetworkInputDStream must
- * define the createReceiver() function that creates the receiver object of type
+ * define the getReceiver() function that gets the receiver object of type
* [[spark.streaming.dstream.NetworkReceiver]] that will be sent to the workers to receive
* data.
* @param ssc_ Streaming context that will execute this input stream
@@ -34,11 +34,11 @@ abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : Streaming
val id = ssc.getNewNetworkStreamId()
/**
- * Creates the receiver object that will be sent to the worker nodes
+ * Gets the receiver object that will be sent to the worker nodes
* to receive data. This method needs to defined by any specific implementation
* of a NetworkInputDStream.
*/
- def createReceiver(): NetworkReceiver[T]
+ def getReceiver(): NetworkReceiver[T]
// Nothing to start or stop as both taken care of by the NetworkInputTracker.
def start() {}
@@ -46,7 +46,7 @@ abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : Streaming
def stop() {}
override def compute(validTime: Time): Option[RDD[T]] = {
- val blockIds = ssc.networkInputTracker.getBlockIds(id, validTime)
+ val blockIds = ssc.networkInputTracker.getBlockIds(id, validTime)
Some(new BlockRDD[T](ssc.sc, blockIds))
}
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala
index 290fab1ce0..74ffa1c2a2 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala
@@ -25,7 +25,7 @@ class RawInputDStream[T: ClassManifest](
storageLevel: StorageLevel
) extends NetworkInputDStream[T](ssc_ ) with Logging {
- def createReceiver(): NetworkReceiver[T] = {
+ def getReceiver(): NetworkReceiver[T] = {
new RawNetworkReceiver(host, port, storageLevel).asInstanceOf[NetworkReceiver[T]]
}
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
index d42027092b..4af839ad7f 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
@@ -15,7 +15,7 @@ class SocketInputDStream[T: ClassManifest](
storageLevel: StorageLevel
) extends NetworkInputDStream[T](ssc_) {
- def createReceiver(): NetworkReceiver[T] = {
+ def getReceiver(): NetworkReceiver[T] = {
new SocketReceiver(host, port, bytesToObjects, storageLevel)
}
}