aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/context.py46
-rw-r--r--python/pyspark/shell.py4
2 files changed, 26 insertions, 24 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 514d56e200..4c48cd3f37 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -47,6 +47,7 @@ class SparkContext(object):
_active_spark_context = None
_lock = Lock()
_python_includes = None # zip and egg files that need to be added to PYTHONPATH
+ StorageLevel = None
def __init__(self, master, jobName, sparkHome=None, pyFiles=None,
environment=None, batchSize=1024):
@@ -118,6 +119,29 @@ class SparkContext(object):
self._temp_dir = \
self._jvm.org.apache.spark.util.Utils.createTempDir(local_dir).getAbsolutePath()
+ self._initStorageLevel()
+
+ def _initStorageLevel(self):
+ """
+ Initializes the StorageLevel object, which mimics the behavior of the scala object
+ by the same name. e.g., StorageLevel.DISK_ONLY returns the equivalent Java StorageLevel.
+ """
+ newStorageLevel = self._jvm.org.apache.spark.storage.StorageLevel
+ levels = {
+ 'NONE': newStorageLevel(False, False, False, 1),
+ 'DISK_ONLY': newStorageLevel(True, False, False, 1),
+ 'DISK_ONLY_2': newStorageLevel(True, False, False, 2),
+ 'MEMORY_ONLY': newStorageLevel(False, True, True, 1),
+ 'MEMORY_ONLY_2': newStorageLevel(False, True, True, 2),
+ 'MEMORY_ONLY_SER': newStorageLevel(False, True, False, 1),
+ 'MEMORY_ONLY_SER_2': newStorageLevel(False, True, False, 2),
+ 'MEMORY_AND_DISK': newStorageLevel(True, True, True, 1),
+ 'MEMORY_AND_DISK_2': newStorageLevel(True, True, True, 2),
+ 'MEMORY_AND_DISK_SER': newStorageLevel(True, True, False, 1),
+ 'MEMORY_AND_DISK_SER_2': newStorageLevel(True, True, False, 2),
+ }
+ self.StorageLevel = type('StorageLevel', (), levels)
+
@property
def defaultParallelism(self):
"""
@@ -279,28 +303,6 @@ class SparkContext(object):
"""
self._jsc.sc().setCheckpointDir(dirName, useExisting)
-class StorageLevelReader:
- """
- Mimics the Scala StorageLevel by delegating all attribute requests
- (e.g., StorageLevel.DISK_ONLY) to the JVM for reflection.
- Memoizes results to reduce JVM call/memory overheads.
- """
-
- def __init__(self, sc):
- self.sc = sc
- self.memoized = {}
-
- def __getattr__(self, name):
- if name in self.memoized:
- return self.memoized[name]
-
- try:
- storageLevel = self.sc._jvm.PythonRDD.getStorageLevelByName(name)
- self.memoized[name] = storageLevel
- return storageLevel
- except:
- print "Failed to find StorageLevel:", name
-
def _test():
import atexit
import doctest
diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py
index 9acc176d55..e374ca4ee4 100644
--- a/python/pyspark/shell.py
+++ b/python/pyspark/shell.py
@@ -23,13 +23,13 @@ This file is designed to be launched as a PYTHONSTARTUP script.
import os
import platform
import pyspark
-from pyspark.context import SparkContext, StorageLevelReader
+from pyspark.context import SparkContext
# this is the equivalent of ADD_JARS
add_files = os.environ.get("ADD_FILES").split(',') if os.environ.get("ADD_FILES") != None else None
sc = SparkContext(os.environ.get("MASTER", "local"), "PySparkShell", pyFiles=add_files)
-StorageLevel = StorageLevelReader(sc)
+StorageLevel = sc.StorageLevel # alias StorageLevel to global scope
print """Welcome to
____ __