diff options
author | gatorsmile <gatorsmile@gmail.com> | 2015-12-18 20:06:05 -0800 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2015-12-18 20:06:05 -0800 |
commit | 499ac3e69a102f9b10a1d7e14382fa191516f7b5 (patch) | |
tree | dcb84cda0ddb75094a39946dfd09f0ed29dd058c /python/pyspark/streaming | |
parent | a78a91f4d7239c14bd5d0b18cdc87d55594a8d8a (diff) | |
download | spark-499ac3e69a102f9b10a1d7e14382fa191516f7b5.tar.gz spark-499ac3e69a102f9b10a1d7e14382fa191516f7b5.tar.bz2 spark-499ac3e69a102f9b10a1d7e14382fa191516f7b5.zip |
[SPARK-12091] [PYSPARK] Deprecate the JAVA-specific deserialized storage levels
The current default storage level of Python persist API is MEMORY_ONLY_SER. This is different from the default level MEMORY_ONLY in the official document and RDD APIs.
davies Is this inconsistency intentional? Thanks!
Updates: Since the data is always serialized on the Python side, the storage levels of JAVA-specific deserialization are not removed, such as MEMORY_ONLY.
Updates: Based on the reviewers' feedback. In Python, stored objects will always be serialized with the [Pickle](https://docs.python.org/2/library/pickle.html) library, so it does not matter whether you choose a serialized level. The available storage levels in Python include `MEMORY_ONLY`, `MEMORY_ONLY_2`, `MEMORY_AND_DISK`, `MEMORY_AND_DISK_2`, `DISK_ONLY`, `DISK_ONLY_2` and `OFF_HEAP`.
Author: gatorsmile <gatorsmile@gmail.com>
Closes #10092 from gatorsmile/persistStorageLevel.
Diffstat (limited to 'python/pyspark/streaming')
-rw-r--r-- | python/pyspark/streaming/context.py | 2 | ||||
-rw-r--r-- | python/pyspark/streaming/dstream.py | 4 | ||||
-rw-r--r-- | python/pyspark/streaming/flume.py | 4 | ||||
-rw-r--r-- | python/pyspark/streaming/kafka.py | 2 | ||||
-rw-r--r-- | python/pyspark/streaming/mqtt.py | 2 |
5 files changed, 7 insertions, 7 deletions
diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 1388b6d044..3deed52be0 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -258,7 +258,7 @@ class StreamingContext(object): """ self._jssc.checkpoint(directory) - def socketTextStream(self, hostname, port, storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2): + def socketTextStream(self, hostname, port, storageLevel=StorageLevel.MEMORY_AND_DISK_2): """ Create an input from TCP source hostname:port. Data is received using a TCP socket and receive byte is interpreted as UTF8 encoded ``\\n`` delimited diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index b994a53bf2..adc2651740 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -208,10 +208,10 @@ class DStream(object): def cache(self): """ Persist the RDDs of this DStream with the default storage level - (C{MEMORY_ONLY_SER}). + (C{MEMORY_ONLY}). """ self.is_cached = True - self.persist(StorageLevel.MEMORY_ONLY_SER) + self.persist(StorageLevel.MEMORY_ONLY) return self def persist(self, storageLevel): diff --git a/python/pyspark/streaming/flume.py b/python/pyspark/streaming/flume.py index b3d1905365..b1fff0a5c7 100644 --- a/python/pyspark/streaming/flume.py +++ b/python/pyspark/streaming/flume.py @@ -40,7 +40,7 @@ class FlumeUtils(object): @staticmethod def createStream(ssc, hostname, port, - storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2, + storageLevel=StorageLevel.MEMORY_AND_DISK_2, enableDecompression=False, bodyDecoder=utf8_decoder): """ @@ -70,7 +70,7 @@ class FlumeUtils(object): @staticmethod def createPollingStream(ssc, addresses, - storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2, + storageLevel=StorageLevel.MEMORY_AND_DISK_2, maxBatchSize=1000, parallelism=5, bodyDecoder=utf8_decoder): diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py index cdf97ec73a..13f8f9578e 100644 --- a/python/pyspark/streaming/kafka.py +++ b/python/pyspark/streaming/kafka.py @@ -40,7 +40,7 @@ class KafkaUtils(object): @staticmethod def createStream(ssc, zkQuorum, groupId, topics, kafkaParams=None, - storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2, + storageLevel=StorageLevel.MEMORY_AND_DISK_2, keyDecoder=utf8_decoder, valueDecoder=utf8_decoder): """ Create an input stream that pulls messages from a Kafka Broker. diff --git a/python/pyspark/streaming/mqtt.py b/python/pyspark/streaming/mqtt.py index 1ce4093196..3a515ea499 100644 --- a/python/pyspark/streaming/mqtt.py +++ b/python/pyspark/streaming/mqtt.py @@ -28,7 +28,7 @@ class MQTTUtils(object): @staticmethod def createStream(ssc, brokerUrl, topic, - storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2): + storageLevel=StorageLevel.MEMORY_AND_DISK_2): """ Create an input stream that pulls messages from a Mqtt Broker. |