diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-12 19:02:27 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-12 19:02:27 -0800 |
commit | 034f89aaab1db95e8908432f2445d6841526efcf (patch) | |
tree | 894aebd0f08d22f6b78c4d1849e522c50b8b6730 /external | |
parent | 74d0126257838f29e3fad519b9f1a5acde88bef6 (diff) | |
download | spark-034f89aaab1db95e8908432f2445d6841526efcf.tar.gz spark-034f89aaab1db95e8908432f2445d6841526efcf.tar.bz2 spark-034f89aaab1db95e8908432f2445d6841526efcf.zip |
Fixed persistence logic of WindowedDStream, and fixed default persistence level of input streams.
Diffstat (limited to 'external')
4 files changed, 8 insertions, 1 deletions
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala index a01c17ac5d..a6af53e4a6 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala @@ -43,6 +43,7 @@ object FlumeUtils { /** * Creates a input stream from a Flume source. + * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. * @param hostname Hostname of the slave machine to which the flume data will be sent * @param port Port of the slave machine to which the flume data will be sent */ diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index df4ecac8d1..76f9c46657 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -78,6 +78,7 @@ object KafkaUtils { /** * Create an input stream that pulls messages form a Kafka Broker. + * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. * @param jssc JavaStreamingContext object * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..) * @param groupId The group id for this consumer @@ -128,7 +129,7 @@ object KafkaUtils { * see http://kafka.apache.org/08/configuration.html * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * in its own thread - * @param storageLevel RDD storage level. Defaults to MEMORY_AND_DISK_2. + * @param storageLevel RDD storage level. */ def createStream[K, V, U <: Decoder[_], T <: Decoder[_]]( jssc: JavaStreamingContext, diff --git a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala index eacb26f6c5..caa86b27a0 100644 --- a/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala +++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTUtils.scala @@ -44,6 +44,7 @@ object MQTTUtils { /** * Create an input stream that receives messages pushed by a MQTT publisher. + * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. * @param jssc JavaStreamingContext object * @param brokerUrl Url of remote MQTT publisher * @param topic Topic name to subscribe to diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala index 8ea52c4e5b..a23d685144 100644 --- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala +++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterUtils.scala @@ -51,6 +51,7 @@ object TwitterUtils { * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and * twitter4j.oauth.accessTokenSecret. + * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. * @param jssc JavaStreamingContext object */ def createStream(jssc: JavaStreamingContext): JavaDStream[Status] = { @@ -62,6 +63,7 @@ object TwitterUtils { * OAuth authentication; this requires the system properties twitter4j.oauth.consumerKey, * twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and * twitter4j.oauth.accessTokenSecret. + * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. * @param jssc JavaStreamingContext object * @param filters Set of filter strings to get only those tweets that match them */ @@ -88,6 +90,7 @@ object TwitterUtils { /** * Create a input stream that returns tweets received from Twitter. + * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. * @param jssc JavaStreamingContext object * @param twitterAuth Twitter4J Authorization */ @@ -97,6 +100,7 @@ object TwitterUtils { /** * Create a input stream that returns tweets received from Twitter. + * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. * @param jssc JavaStreamingContext object * @param twitterAuth Twitter4J Authorization * @param filters Set of filter strings to get only those tweets that match them |