aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2015-12-18 20:06:05 -0800
committerDavies Liu <davies.liu@gmail.com>2015-12-18 20:06:05 -0800
commit499ac3e69a102f9b10a1d7e14382fa191516f7b5 (patch)
treedcb84cda0ddb75094a39946dfd09f0ed29dd058c /python/pyspark/streaming
parenta78a91f4d7239c14bd5d0b18cdc87d55594a8d8a (diff)
downloadspark-499ac3e69a102f9b10a1d7e14382fa191516f7b5.tar.gz
spark-499ac3e69a102f9b10a1d7e14382fa191516f7b5.tar.bz2
spark-499ac3e69a102f9b10a1d7e14382fa191516f7b5.zip
[SPARK-12091] [PYSPARK] Deprecate the JAVA-specific deserialized storage levels
The current default storage level of Python persist API is MEMORY_ONLY_SER. This is different from the default level MEMORY_ONLY in the official document and RDD APIs. davies Is this inconsistency intentional? Thanks! Updates: Since the data is always serialized on the Python side, the storage levels of JAVA-specific deserialization are not removed, such as MEMORY_ONLY. Updates: Based on the reviewers' feedback. In Python, stored objects will always be serialized with the [Pickle](https://docs.python.org/2/library/pickle.html) library, so it does not matter whether you choose a serialized level. The available storage levels in Python include `MEMORY_ONLY`, `MEMORY_ONLY_2`, `MEMORY_AND_DISK`, `MEMORY_AND_DISK_2`, `DISK_ONLY`, `DISK_ONLY_2` and `OFF_HEAP`. Author: gatorsmile <gatorsmile@gmail.com> Closes #10092 from gatorsmile/persistStorageLevel.
Diffstat (limited to 'python/pyspark/streaming')
-rw-r--r--python/pyspark/streaming/context.py2
-rw-r--r--python/pyspark/streaming/dstream.py4
-rw-r--r--python/pyspark/streaming/flume.py4
-rw-r--r--python/pyspark/streaming/kafka.py2
-rw-r--r--python/pyspark/streaming/mqtt.py2
5 files changed, 7 insertions, 7 deletions
diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py
index 1388b6d044..3deed52be0 100644
--- a/python/pyspark/streaming/context.py
+++ b/python/pyspark/streaming/context.py
@@ -258,7 +258,7 @@ class StreamingContext(object):
"""
self._jssc.checkpoint(directory)
- def socketTextStream(self, hostname, port, storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2):
+ def socketTextStream(self, hostname, port, storageLevel=StorageLevel.MEMORY_AND_DISK_2):
"""
Create an input from TCP source hostname:port. Data is received using
a TCP socket and receive byte is interpreted as UTF8 encoded ``\\n`` delimited
diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py
index b994a53bf2..adc2651740 100644
--- a/python/pyspark/streaming/dstream.py
+++ b/python/pyspark/streaming/dstream.py
@@ -208,10 +208,10 @@ class DStream(object):
def cache(self):
"""
Persist the RDDs of this DStream with the default storage level
- (C{MEMORY_ONLY_SER}).
+ (C{MEMORY_ONLY}).
"""
self.is_cached = True
- self.persist(StorageLevel.MEMORY_ONLY_SER)
+ self.persist(StorageLevel.MEMORY_ONLY)
return self
def persist(self, storageLevel):
diff --git a/python/pyspark/streaming/flume.py b/python/pyspark/streaming/flume.py
index b3d1905365..b1fff0a5c7 100644
--- a/python/pyspark/streaming/flume.py
+++ b/python/pyspark/streaming/flume.py
@@ -40,7 +40,7 @@ class FlumeUtils(object):
@staticmethod
def createStream(ssc, hostname, port,
- storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2,
+ storageLevel=StorageLevel.MEMORY_AND_DISK_2,
enableDecompression=False,
bodyDecoder=utf8_decoder):
"""
@@ -70,7 +70,7 @@ class FlumeUtils(object):
@staticmethod
def createPollingStream(ssc, addresses,
- storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2,
+ storageLevel=StorageLevel.MEMORY_AND_DISK_2,
maxBatchSize=1000,
parallelism=5,
bodyDecoder=utf8_decoder):
diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py
index cdf97ec73a..13f8f9578e 100644
--- a/python/pyspark/streaming/kafka.py
+++ b/python/pyspark/streaming/kafka.py
@@ -40,7 +40,7 @@ class KafkaUtils(object):
@staticmethod
def createStream(ssc, zkQuorum, groupId, topics, kafkaParams=None,
- storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2,
+ storageLevel=StorageLevel.MEMORY_AND_DISK_2,
keyDecoder=utf8_decoder, valueDecoder=utf8_decoder):
"""
Create an input stream that pulls messages from a Kafka Broker.
diff --git a/python/pyspark/streaming/mqtt.py b/python/pyspark/streaming/mqtt.py
index 1ce4093196..3a515ea499 100644
--- a/python/pyspark/streaming/mqtt.py
+++ b/python/pyspark/streaming/mqtt.py
@@ -28,7 +28,7 @@ class MQTTUtils(object):
@staticmethod
def createStream(ssc, brokerUrl, topic,
- storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2):
+ storageLevel=StorageLevel.MEMORY_AND_DISK_2):
"""
Create an input stream that pulls messages from a Mqtt Broker.