aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/context.py
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2013-09-07 14:41:31 -0700
committerAaron Davidson <aaron@databricks.com>2013-09-07 14:41:31 -0700
commitc1cc8c4da239965e8ad478089b27e9c694088978 (patch)
tree4fb6bd7a08f28121e1d899cf47558e6957bbed9c /python/pyspark/context.py
parent8001687af597056f630fb81f1edbcaf354c5388a (diff)
downloadspark-c1cc8c4da239965e8ad478089b27e9c694088978.tar.gz
spark-c1cc8c4da239965e8ad478089b27e9c694088978.tar.bz2
spark-c1cc8c4da239965e8ad478089b27e9c694088978.zip
Export StorageLevel and refactor
Diffstat (limited to 'python/pyspark/context.py')
-rw-r--r--python/pyspark/context.py35
1 files changed, 12 insertions, 23 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 4c48cd3f37..efd7828df6 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -27,6 +27,7 @@ from pyspark.broadcast import Broadcast
from pyspark.files import SparkFiles
from pyspark.java_gateway import launch_gateway
from pyspark.serializers import dump_pickle, write_with_length, batched
+from pyspark.storagelevel import StorageLevel
from pyspark.rdd import RDD
from py4j.java_collections import ListConverter
@@ -119,29 +120,6 @@ class SparkContext(object):
self._temp_dir = \
self._jvm.org.apache.spark.util.Utils.createTempDir(local_dir).getAbsolutePath()
- self._initStorageLevel()
-
- def _initStorageLevel(self):
- """
- Initializes the StorageLevel object, which mimics the behavior of the scala object
- by the same name. e.g., StorageLevel.DISK_ONLY returns the equivalent Java StorageLevel.
- """
- newStorageLevel = self._jvm.org.apache.spark.storage.StorageLevel
- levels = {
- 'NONE': newStorageLevel(False, False, False, 1),
- 'DISK_ONLY': newStorageLevel(True, False, False, 1),
- 'DISK_ONLY_2': newStorageLevel(True, False, False, 2),
- 'MEMORY_ONLY': newStorageLevel(False, True, True, 1),
- 'MEMORY_ONLY_2': newStorageLevel(False, True, True, 2),
- 'MEMORY_ONLY_SER': newStorageLevel(False, True, False, 1),
- 'MEMORY_ONLY_SER_2': newStorageLevel(False, True, False, 2),
- 'MEMORY_AND_DISK': newStorageLevel(True, True, True, 1),
- 'MEMORY_AND_DISK_2': newStorageLevel(True, True, True, 2),
- 'MEMORY_AND_DISK_SER': newStorageLevel(True, True, False, 1),
- 'MEMORY_AND_DISK_SER_2': newStorageLevel(True, True, False, 2),
- }
- self.StorageLevel = type('StorageLevel', (), levels)
-
@property
def defaultParallelism(self):
"""
@@ -303,6 +281,17 @@ class SparkContext(object):
"""
self._jsc.sc().setCheckpointDir(dirName, useExisting)
+ def _getJavaStorageLevel(self, storageLevel):
+ """
+ Returns a Java StorageLevel based on a pyspark.StorageLevel.
+ """
+ if not isinstance(storageLevel, StorageLevel):
+ raise Exception("storageLevel must be of type pyspark.StorageLevel")
+
+ newStorageLevel = self._jvm.org.apache.spark.storage.StorageLevel
+ return newStorageLevel(storageLevel.useDisk, storageLevel.useMemory,
+ storageLevel.deserialized, storageLevel.replication)
+
def _test():
import atexit
import doctest