aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorDenny <dennybritz@gmail.com>2012-11-19 09:42:35 -0800
committerDenny <dennybritz@gmail.com>2012-11-19 09:42:35 -0800
commit6757ed6a40121ee97a15506af8717bb8d97cf1ec (patch)
tree167a48141109d12424dc5d18e2898d339a64f3a7 /streaming/src
parentf56befa9141378071ceb5d6e8c52fe50b79e4432 (diff)
downloadspark-6757ed6a40121ee97a15506af8717bb8d97cf1ec.tar.gz
spark-6757ed6a40121ee97a15506af8717bb8d97cf1ec.tar.bz2
spark-6757ed6a40121ee97a15506af8717bb8d97cf1ec.zip
Comment out code for fault-tolerance.
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala35
1 files changed, 18 insertions, 17 deletions
diff --git a/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala
index 318537532c..3685d6c666 100644
--- a/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala
@@ -15,8 +15,10 @@ import spark.storage.StorageLevel
// Key for a specific Kafka Partition: (broker, topic, group, part)
case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, partId: Int)
+// NOT USED - Originally intended for fault-tolerance
// Metadata for a Kafka Stream that it sent to the Master
case class KafkaInputDStreamMetadata(timestamp: Long, data: Map[KafkaPartitionKey, Long])
+// NOT USED - Originally intended for fault-tolerance
// Checkpoint data specific to a KafkaInputDstream
case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any],
savedOffsets: Map[KafkaPartitionKey, Long]) extends DStreamCheckpointData(kafkaRdds)
@@ -45,9 +47,13 @@ class KafkaInputDStream[T: ClassManifest](
// Metadata that keeps track of which messages have already been consumed.
var savedOffsets = HashMap[Long, Map[KafkaPartitionKey, Long]]()
+
+ /* NOT USED - Originally intended for fault-tolerance
+
// In case of a failure, the offets for a particular timestamp will be restored.
@transient var restoredOffsets : Map[KafkaPartitionKey, Long] = null
+
override protected[streaming] def addMetadata(metadata: Any) {
metadata match {
case x : KafkaInputDStreamMetadata =>
@@ -80,18 +86,11 @@ class KafkaInputDStream[T: ClassManifest](
restoredOffsets = x.savedOffsets
logInfo("Restored KafkaDStream offsets: " + savedOffsets)
}
- }
+ } */
def createReceiver(): NetworkReceiver[T] = {
- // We have restored from a checkpoint, use the restored offsets
- if (restoredOffsets != null) {
- new KafkaReceiver(id, host, port, groupId, topics, restoredOffsets, storageLevel)
- .asInstanceOf[NetworkReceiver[T]]
- } else {
- new KafkaReceiver(id, host, port, groupId, topics, initialOffsets, storageLevel)
+ new KafkaReceiver(id, host, port, groupId, topics, initialOffsets, storageLevel)
.asInstanceOf[NetworkReceiver[T]]
- }
-
}
}
@@ -103,7 +102,7 @@ class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String,
val ZK_TIMEOUT = 10000
// Handles pushing data into the BlockManager
- lazy protected val dataHandler = new KafkaDataHandler(this, storageLevel)
+ lazy protected val dataHandler = new DataHandler(this, storageLevel)
// Keeps track of the current offsets. Maps from (broker, topic, group, part) -> Offset
lazy val offsets = HashMap[KafkaPartitionKey, Long]()
// Connection to Kafka
@@ -181,13 +180,15 @@ class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String,
}
}
- class KafkaDataHandler(receiver: KafkaReceiver, storageLevel: StorageLevel)
- extends DataHandler[Any](receiver, storageLevel) {
+ // NOT USED - Originally intended for fault-tolerance
+ // class KafkaDataHandler(receiver: KafkaReceiver, storageLevel: StorageLevel)
+ // extends DataHandler[Any](receiver, storageLevel) {
- override def createBlock(blockId: String, iterator: Iterator[Any]) : Block = {
- // Creates a new Block with Kafka-specific Metadata
- new Block(blockId, iterator, KafkaInputDStreamMetadata(System.currentTimeMillis, offsets.toMap))
- }
+ // override def createBlock(blockId: String, iterator: Iterator[Any]) : Block = {
+ // // Creates a new Block with Kafka-specific Metadata
+ // new Block(blockId, iterator, KafkaInputDStreamMetadata(System.currentTimeMillis, offsets.toMap))
+ // }
- }
+ // }
+
}