aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/rdd.py8
-rw-r--r--python/pyspark/sql/dataframe.py6
-rw-r--r--python/pyspark/storagelevel.py31
-rw-r--r--python/pyspark/streaming/context.py2
-rw-r--r--python/pyspark/streaming/dstream.py4
-rw-r--r--python/pyspark/streaming/flume.py4
-rw-r--r--python/pyspark/streaming/kafka.py2
-rw-r--r--python/pyspark/streaming/mqtt.py2
8 files changed, 35 insertions, 24 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 00bb9a62e9..a019c05862 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -220,18 +220,18 @@ class RDD(object):
def cache(self):
"""
- Persist this RDD with the default storage level (C{MEMORY_ONLY_SER}).
+ Persist this RDD with the default storage level (C{MEMORY_ONLY}).
"""
self.is_cached = True
- self.persist(StorageLevel.MEMORY_ONLY_SER)
+ self.persist(StorageLevel.MEMORY_ONLY)
return self
- def persist(self, storageLevel=StorageLevel.MEMORY_ONLY_SER):
+ def persist(self, storageLevel=StorageLevel.MEMORY_ONLY):
"""
Set this RDD's storage level to persist its values across operations
after the first time it is computed. This can only be used to assign
a new storage level if the RDD does not have a storage level set yet.
- If no storage level is specified defaults to (C{MEMORY_ONLY_SER}).
+ If no storage level is specified defaults to (C{MEMORY_ONLY}).
>>> rdd = sc.parallelize(["b", "a", "c"])
>>> rdd.persist().is_cached
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 78ab475eb4..24fc291999 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -371,18 +371,18 @@ class DataFrame(object):
@since(1.3)
def cache(self):
- """ Persists with the default storage level (C{MEMORY_ONLY_SER}).
+ """ Persists with the default storage level (C{MEMORY_ONLY}).
"""
self.is_cached = True
self._jdf.cache()
return self
@since(1.3)
- def persist(self, storageLevel=StorageLevel.MEMORY_ONLY_SER):
+ def persist(self, storageLevel=StorageLevel.MEMORY_ONLY):
"""Sets the storage level to persist its values across operations
after the first time it is computed. This can only be used to assign
a new storage level if the RDD does not have a storage level set yet.
- If no storage level is specified defaults to (C{MEMORY_ONLY_SER}).
+ If no storage level is specified defaults to (C{MEMORY_ONLY}).
"""
self.is_cached = True
javaStorageLevel = self._sc._getJavaStorageLevel(storageLevel)
diff --git a/python/pyspark/storagelevel.py b/python/pyspark/storagelevel.py
index 676aa0f714..d4f184a85d 100644
--- a/python/pyspark/storagelevel.py
+++ b/python/pyspark/storagelevel.py
@@ -23,8 +23,10 @@ class StorageLevel(object):
"""
Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory,
whether to drop the RDD to disk if it falls out of memory, whether to keep the data in memory
- in a serialized format, and whether to replicate the RDD partitions on multiple nodes.
- Also contains static constants for some commonly used storage levels, such as MEMORY_ONLY.
+ in a JAVA-specific serialized format, and whether to replicate the RDD partitions on multiple
+ nodes. Also contains static constants for some commonly used storage levels, MEMORY_ONLY.
+ Since the data is always serialized on the Python side, all the constants use the serialized
+ formats.
"""
def __init__(self, useDisk, useMemory, useOffHeap, deserialized, replication=1):
@@ -49,12 +51,21 @@ class StorageLevel(object):
StorageLevel.DISK_ONLY = StorageLevel(True, False, False, False)
StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)
-StorageLevel.MEMORY_ONLY = StorageLevel(False, True, False, True)
-StorageLevel.MEMORY_ONLY_2 = StorageLevel(False, True, False, True, 2)
-StorageLevel.MEMORY_ONLY_SER = StorageLevel(False, True, False, False)
-StorageLevel.MEMORY_ONLY_SER_2 = StorageLevel(False, True, False, False, 2)
-StorageLevel.MEMORY_AND_DISK = StorageLevel(True, True, False, True)
-StorageLevel.MEMORY_AND_DISK_2 = StorageLevel(True, True, False, True, 2)
-StorageLevel.MEMORY_AND_DISK_SER = StorageLevel(True, True, False, False)
-StorageLevel.MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, False, 2)
+StorageLevel.MEMORY_ONLY = StorageLevel(False, True, False, False)
+StorageLevel.MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2)
+StorageLevel.MEMORY_AND_DISK = StorageLevel(True, True, False, False)
+StorageLevel.MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2)
StorageLevel.OFF_HEAP = StorageLevel(False, False, True, False, 1)
+
+"""
+.. note:: The following four storage level constants are deprecated in 2.0, since the records \
+will always be serialized in Python.
+"""
+StorageLevel.MEMORY_ONLY_SER = StorageLevel.MEMORY_ONLY
+""".. note:: Deprecated in 2.0, use ``StorageLevel.MEMORY_ONLY`` instead."""
+StorageLevel.MEMORY_ONLY_SER_2 = StorageLevel.MEMORY_ONLY_2
+""".. note:: Deprecated in 2.0, use ``StorageLevel.MEMORY_ONLY_2`` instead."""
+StorageLevel.MEMORY_AND_DISK_SER = StorageLevel.MEMORY_AND_DISK
+""".. note:: Deprecated in 2.0, use ``StorageLevel.MEMORY_AND_DISK`` instead."""
+StorageLevel.MEMORY_AND_DISK_SER_2 = StorageLevel.MEMORY_AND_DISK_2
+""".. note:: Deprecated in 2.0, use ``StorageLevel.MEMORY_AND_DISK_2`` instead."""
diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py
index 1388b6d044..3deed52be0 100644
--- a/python/pyspark/streaming/context.py
+++ b/python/pyspark/streaming/context.py
@@ -258,7 +258,7 @@ class StreamingContext(object):
"""
self._jssc.checkpoint(directory)
- def socketTextStream(self, hostname, port, storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2):
+ def socketTextStream(self, hostname, port, storageLevel=StorageLevel.MEMORY_AND_DISK_2):
"""
Create an input from TCP source hostname:port. Data is received using
a TCP socket and receive byte is interpreted as UTF8 encoded ``\\n`` delimited
diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py
index b994a53bf2..adc2651740 100644
--- a/python/pyspark/streaming/dstream.py
+++ b/python/pyspark/streaming/dstream.py
@@ -208,10 +208,10 @@ class DStream(object):
def cache(self):
"""
Persist the RDDs of this DStream with the default storage level
- (C{MEMORY_ONLY_SER}).
+ (C{MEMORY_ONLY}).
"""
self.is_cached = True
- self.persist(StorageLevel.MEMORY_ONLY_SER)
+ self.persist(StorageLevel.MEMORY_ONLY)
return self
def persist(self, storageLevel):
diff --git a/python/pyspark/streaming/flume.py b/python/pyspark/streaming/flume.py
index b3d1905365..b1fff0a5c7 100644
--- a/python/pyspark/streaming/flume.py
+++ b/python/pyspark/streaming/flume.py
@@ -40,7 +40,7 @@ class FlumeUtils(object):
@staticmethod
def createStream(ssc, hostname, port,
- storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2,
+ storageLevel=StorageLevel.MEMORY_AND_DISK_2,
enableDecompression=False,
bodyDecoder=utf8_decoder):
"""
@@ -70,7 +70,7 @@ class FlumeUtils(object):
@staticmethod
def createPollingStream(ssc, addresses,
- storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2,
+ storageLevel=StorageLevel.MEMORY_AND_DISK_2,
maxBatchSize=1000,
parallelism=5,
bodyDecoder=utf8_decoder):
diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py
index cdf97ec73a..13f8f9578e 100644
--- a/python/pyspark/streaming/kafka.py
+++ b/python/pyspark/streaming/kafka.py
@@ -40,7 +40,7 @@ class KafkaUtils(object):
@staticmethod
def createStream(ssc, zkQuorum, groupId, topics, kafkaParams=None,
- storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2,
+ storageLevel=StorageLevel.MEMORY_AND_DISK_2,
keyDecoder=utf8_decoder, valueDecoder=utf8_decoder):
"""
Create an input stream that pulls messages from a Kafka Broker.
diff --git a/python/pyspark/streaming/mqtt.py b/python/pyspark/streaming/mqtt.py
index 1ce4093196..3a515ea499 100644
--- a/python/pyspark/streaming/mqtt.py
+++ b/python/pyspark/streaming/mqtt.py
@@ -28,7 +28,7 @@ class MQTTUtils(object):
@staticmethod
def createStream(ssc, brokerUrl, topic,
- storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2):
+ storageLevel=StorageLevel.MEMORY_AND_DISK_2):
"""
Create an input stream that pulls messages from a Mqtt Broker.