diff options
author | Prashant Sharma <prashant.iiith@gmail.com> | 2013-01-16 14:34:58 +0530 |
---|---|---|
committer | Prashant Sharma <prashant.iiith@gmail.com> | 2013-01-19 22:04:07 +0530 |
commit | bb6ab92e31b7aad464cf8262bc3567fdeb4c14c4 (patch) | |
tree | 86222418c39416b4f841b89240380cf900709af0 /streaming/src/main | |
parent | 11bbe231408a3223d110c89519a70184d58408af (diff) | |
download | spark-bb6ab92e31b7aad464cf8262bc3567fdeb4c14c4.tar.gz spark-bb6ab92e31b7aad464cf8262bc3567fdeb4c14c4.tar.bz2 spark-bb6ab92e31b7aad464cf8262bc3567fdeb4c14c4.zip |
Changed method name of createReceiver to getReceiver as it is not intended to be a factory.
Diffstat (limited to 'streaming/src/main')
6 files changed, 28 insertions, 28 deletions
diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala index e4152f3a61..4ddd0f8680 100644 --- a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala +++ b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala @@ -23,7 +23,7 @@ private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) ext */ private[streaming] class NetworkInputTracker( - @transient ssc: StreamingContext, + @transient ssc: StreamingContext, @transient networkInputStreams: Array[NetworkInputDStream[_]]) extends Logging { @@ -65,12 +65,12 @@ class NetworkInputTracker( def receive = { case RegisterReceiver(streamId, receiverActor) => { if (!networkInputStreamMap.contains(streamId)) { - throw new Exception("Register received for unexpected id " + streamId) + throw new Exception("Register received for unexpected id " + streamId) } receiverInfo += ((streamId, receiverActor)) logInfo("Registered receiver for network stream " + streamId + " from " + sender.path.address) sender ! true - } + } case AddBlocks(streamId, blockIds, metadata) => { val tmp = receivedBlockIds.synchronized { if (!receivedBlockIds.contains(streamId)) { @@ -95,8 +95,8 @@ class NetworkInputTracker( /** This thread class runs all the receivers on the cluster. */ class ReceiverExecutor extends Thread { val env = ssc.env - - override def run() { + + override def run() { try { SparkEnv.set(env) startReceivers() @@ -113,7 +113,7 @@ class NetworkInputTracker( */ def startReceivers() { val receivers = networkInputStreams.map(nis => { - val rcvr = nis.createReceiver() + val rcvr = nis.getReceiver() rcvr.setStreamId(nis.id) rcvr }) @@ -141,7 +141,7 @@ class NetworkInputTracker( // Distribute the receivers and start them ssc.sc.runJob(tempRDD, startReceiver) } - + /** Stops the receivers. */ def stopReceivers() { // Signal the receivers to stop diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala index efc7058480..c9644b3a83 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala @@ -25,7 +25,7 @@ class FlumeInputDStream[T: ClassManifest]( storageLevel: StorageLevel ) extends NetworkInputDStream[SparkFlumeEvent](ssc_) { - override def createReceiver(): NetworkReceiver[SparkFlumeEvent] = { + override def getReceiver(): NetworkReceiver[SparkFlumeEvent] = { new FlumeReceiver(host, port, storageLevel) } } @@ -134,4 +134,4 @@ class FlumeReceiver( } override def getLocationPreference = Some(host) -}
\ No newline at end of file +} diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala index 2b4740bdf7..682cb82709 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala @@ -31,7 +31,7 @@ case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any], /** * Input stream that pulls messages from a Kafka Broker. - * + * * @param host Zookeper hostname. * @param port Zookeper port. * @param groupId The group id for this consumer. @@ -54,13 +54,13 @@ class KafkaInputDStream[T: ClassManifest]( // Metadata that keeps track of which messages have already been consumed. var savedOffsets = HashMap[Long, Map[KafkaPartitionKey, Long]]() - + /* NOT USED - Originally intended for fault-tolerance - + // In case of a failure, the offets for a particular timestamp will be restored. @transient var restoredOffsets : Map[KafkaPartitionKey, Long] = null - + override protected[streaming] def addMetadata(metadata: Any) { metadata match { case x : KafkaInputDStreamMetadata => @@ -88,14 +88,14 @@ class KafkaInputDStream[T: ClassManifest]( override protected[streaming] def restoreCheckpointData() { super.restoreCheckpointData() logInfo("Restoring KafkaDStream checkpoint data.") - checkpointData match { - case x : KafkaDStreamCheckpointData => + checkpointData match { + case x : KafkaDStreamCheckpointData => restoredOffsets = x.savedOffsets logInfo("Restored KafkaDStream offsets: " + savedOffsets) } } */ - def createReceiver(): NetworkReceiver[T] = { + def getReceiver(): NetworkReceiver[T] = { new KafkaReceiver(host, port, groupId, topics, initialOffsets, storageLevel) .asInstanceOf[NetworkReceiver[T]] } @@ -103,7 +103,7 @@ class KafkaInputDStream[T: ClassManifest]( private[streaming] class KafkaReceiver(host: String, port: Int, groupId: String, - topics: Map[String, Int], initialOffsets: Map[KafkaPartitionKey, Long], + topics: Map[String, Int], initialOffsets: Map[KafkaPartitionKey, Long], storageLevel: StorageLevel) extends NetworkReceiver[Any] { // Timeout for establishing a connection to Zookeper in ms. @@ -130,7 +130,7 @@ class KafkaReceiver(host: String, port: Int, groupId: String, val zooKeeperEndPoint = host + ":" + port logInfo("Starting Kafka Consumer Stream with group: " + groupId) logInfo("Initial offsets: " + initialOffsets.toString) - + // Zookeper connection properties val props = new Properties() props.put("zk.connect", zooKeeperEndPoint) @@ -161,7 +161,7 @@ class KafkaReceiver(host: String, port: Int, groupId: String, offsets.foreach { case(key, offset) => val topicDirs = new ZKGroupTopicDirs(key.groupId, key.topic) val partitionName = key.brokerId + "-" + key.partId - updatePersistentPath(consumerConnector.zkClient, + updatePersistentPath(consumerConnector.zkClient, topicDirs.consumerOffsetDir + "/" + partitionName, offset.toString) } } @@ -174,7 +174,7 @@ class KafkaReceiver(host: String, port: Int, groupId: String, blockGenerator += msgAndMetadata.message // Updating the offet. The key is (broker, topic, group, partition). - val key = KafkaPartitionKey(msgAndMetadata.topicInfo.brokerId, msgAndMetadata.topic, + val key = KafkaPartitionKey(msgAndMetadata.topicInfo.brokerId, msgAndMetadata.topic, groupId, msgAndMetadata.topicInfo.partition.partId) val offset = msgAndMetadata.topicInfo.getConsumeOffset offsets.put(key, offset) @@ -182,12 +182,12 @@ class KafkaReceiver(host: String, port: Int, groupId: String, // Keep on handling messages true - } + } } } // NOT USED - Originally intended for fault-tolerance - // class KafkaDataHandler(receiver: KafkaReceiver, storageLevel: StorageLevel) + // class KafkaDataHandler(receiver: KafkaReceiver, storageLevel: StorageLevel) // extends BufferingBlockCreator[Any](receiver, storageLevel) { // override def createBlock(blockId: String, iterator: Iterator[Any]) : Block = { diff --git a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala index aa6be95f30..9142deb9ed 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala @@ -20,7 +20,7 @@ import java.util.concurrent.ArrayBlockingQueue /** * Abstract class for defining any InputDStream that has to start a receiver on worker * nodes to receive external data. Specific implementations of NetworkInputDStream must - * define the createReceiver() function that creates the receiver object of type + * define the getReceiver() function that gets the receiver object of type * [[spark.streaming.dstream.NetworkReceiver]] that will be sent to the workers to receive * data. * @param ssc_ Streaming context that will execute this input stream @@ -34,11 +34,11 @@ abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : Streaming val id = ssc.getNewNetworkStreamId() /** - * Creates the receiver object that will be sent to the worker nodes + * Gets the receiver object that will be sent to the worker nodes * to receive data. This method needs to defined by any specific implementation * of a NetworkInputDStream. */ - def createReceiver(): NetworkReceiver[T] + def getReceiver(): NetworkReceiver[T] // Nothing to start or stop as both taken care of by the NetworkInputTracker. def start() {} @@ -46,7 +46,7 @@ abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : Streaming def stop() {} override def compute(validTime: Time): Option[RDD[T]] = { - val blockIds = ssc.networkInputTracker.getBlockIds(id, validTime) + val blockIds = ssc.networkInputTracker.getBlockIds(id, validTime) Some(new BlockRDD[T](ssc.sc, blockIds)) } } diff --git a/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala index 290fab1ce0..74ffa1c2a2 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala @@ -25,7 +25,7 @@ class RawInputDStream[T: ClassManifest]( storageLevel: StorageLevel ) extends NetworkInputDStream[T](ssc_ ) with Logging { - def createReceiver(): NetworkReceiver[T] = { + def getReceiver(): NetworkReceiver[T] = { new RawNetworkReceiver(host, port, storageLevel).asInstanceOf[NetworkReceiver[T]] } } diff --git a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala index d42027092b..4af839ad7f 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala @@ -15,7 +15,7 @@ class SocketInputDStream[T: ClassManifest]( storageLevel: StorageLevel ) extends NetworkInputDStream[T](ssc_) { - def createReceiver(): NetworkReceiver[T] = { + def getReceiver(): NetworkReceiver[T] = { new SocketReceiver(host, port, bytesToObjects, storageLevel) } } |