From a63d4c7dc2970900b116f7287e3d6b302d9d5698 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Thu, 5 Sep 2013 23:36:27 -0700 Subject: SPARK-660: Add StorageLevel support in Python It uses reflection... I am not proud of that fact, but it at least ensures compatibility (sans refactoring of the StorageLevel stuff). --- python/pyspark/context.py | 14 ++++++++++++++ python/pyspark/rdd.py | 18 ++++++++++++++++++ python/pyspark/shell.py | 3 ++- 3 files changed, 34 insertions(+), 1 deletion(-) (limited to 'python') diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 8fbf296509..49f9b4610d 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -279,6 +279,20 @@ class SparkContext(object): """ self._jsc.sc().setCheckpointDir(dirName, useExisting) +class StorageLevelReader: + """ + Mimics the Scala StorageLevel by directing all attribute requests + (e.g., StorageLevel.DISK_ONLY) to the JVM for reflection. + """ + + def __init__(self, sc): + self.sc = sc + + def __getattr__(self, name): + try: + return self.sc._jvm.PythonRDD.getStorageLevel(name) + except: + print "Failed to find StorageLevel:", name def _test(): import atexit diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 914118ccdd..332258f5d1 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -70,6 +70,24 @@ class RDD(object): self._jrdd.cache() return self + def persist(self, storageLevel): + """ + Set this RDD's storage level to persist its values across operations after the first time + it is computed. This can only be used to assign a new storage level if the RDD does not + have a storage level set yet. + """ + self.is_cached = True + self._jrdd.persist(storageLevel) + return self + + def unpersist(self): + """ + Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. + """ + self.is_cached = False + self._jrdd.unpersist() + return self + def checkpoint(self): """ Mark this RDD for checkpointing. It will be saved to a file inside the diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py index 54823f8037..9acc176d55 100644 --- a/python/pyspark/shell.py +++ b/python/pyspark/shell.py @@ -23,12 +23,13 @@ This file is designed to be launched as a PYTHONSTARTUP script. import os import platform import pyspark -from pyspark.context import SparkContext +from pyspark.context import SparkContext, StorageLevelReader # this is the equivalent of ADD_JARS add_files = os.environ.get("ADD_FILES").split(',') if os.environ.get("ADD_FILES") != None else None sc = SparkContext(os.environ.get("MASTER", "local"), "PySparkShell", pyFiles=add_files) +StorageLevel = StorageLevelReader(sc) print """Welcome to ____ __ -- cgit v1.2.3