diff options
author | Prashant Sharma <scrapcodes@gmail.com> | 2013-09-15 10:55:12 +0530 |
---|---|---|
committer | Prashant Sharma <scrapcodes@gmail.com> | 2013-09-15 10:55:12 +0530 |
commit | 383e151fd7138cc6a143b3a38037cc3038c2a8b9 (patch) | |
tree | 0dbb2c0d8fdeff4c37a577eb96acb87ee7838a16 /python/pyspark | |
parent | 20c65bc334091d8d05fb680551155aa182d98f7d (diff) | |
parent | c4c1db2dd5b2ec0a8182369ecdb0e14f4e199822 (diff) | |
download | spark-383e151fd7138cc6a143b3a38037cc3038c2a8b9.tar.gz spark-383e151fd7138cc6a143b3a38037cc3038c2a8b9.tar.bz2 spark-383e151fd7138cc6a143b3a38037cc3038c2a8b9.zip |
Merge branch 'master' of git://github.com/mesos/spark into scala-2.10
Conflicts:
core/src/main/scala/org/apache/spark/SparkContext.scala
project/SparkBuild.scala
Diffstat (limited to 'python/pyspark')
-rw-r--r-- | python/pyspark/__init__.py | 5 | ||||
-rw-r--r-- | python/pyspark/context.py | 11 | ||||
-rw-r--r-- | python/pyspark/rdd.py | 19 | ||||
-rw-r--r-- | python/pyspark/shell.py | 1 | ||||
-rw-r--r-- | python/pyspark/storagelevel.py | 43 |
5 files changed, 78 insertions, 1 deletions
diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py index fd5972d381..1f35f6f939 100644 --- a/python/pyspark/__init__.py +++ b/python/pyspark/__init__.py @@ -30,6 +30,8 @@ Public classes: An "add-only" shared variable that tasks can only add values to. - L{SparkFiles<pyspark.files.SparkFiles>} Access files shipped with jobs. + - L{StorageLevel<pyspark.storagelevel.StorageLevel>} + Finer-grained cache persistence levels. """ import sys import os @@ -39,6 +41,7 @@ sys.path.insert(0, os.path.join(os.environ["SPARK_HOME"], "python/lib/py4j0.7.eg from pyspark.context import SparkContext from pyspark.rdd import RDD from pyspark.files import SparkFiles +from pyspark.storagelevel import StorageLevel -__all__ = ["SparkContext", "RDD", "SparkFiles"] +__all__ = ["SparkContext", "RDD", "SparkFiles", "StorageLevel"] diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 8fbf296509..597110321a 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -27,6 +27,7 @@ from pyspark.broadcast import Broadcast from pyspark.files import SparkFiles from pyspark.java_gateway import launch_gateway from pyspark.serializers import dump_pickle, write_with_length, batched +from pyspark.storagelevel import StorageLevel from pyspark.rdd import RDD from py4j.java_collections import ListConverter @@ -279,6 +280,16 @@ class SparkContext(object): """ self._jsc.sc().setCheckpointDir(dirName, useExisting) + def _getJavaStorageLevel(self, storageLevel): + """ + Returns a Java StorageLevel based on a pyspark.StorageLevel. + """ + if not isinstance(storageLevel, StorageLevel): + raise Exception("storageLevel must be of type pyspark.StorageLevel") + + newStorageLevel = self._jvm.org.apache.spark.storage.StorageLevel + return newStorageLevel(storageLevel.useDisk, storageLevel.useMemory, + storageLevel.deserialized, storageLevel.replication) def _test(): import atexit diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 1831435b33..7611b13e82 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -70,6 +70,25 @@ class RDD(object): self._jrdd.cache() return self + def persist(self, storageLevel): + """ + Set this RDD's storage level to persist its values across operations after the first time + it is computed. This can only be used to assign a new storage level if the RDD does not + have a storage level set yet. + """ + self.is_cached = True + javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel) + self._jrdd.persist(javaStorageLevel) + return self + + def unpersist(self): + """ + Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. + """ + self.is_cached = False + self._jrdd.unpersist() + return self + def checkpoint(self): """ Mark this RDD for checkpointing. It will be saved to a file inside the diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py index 54823f8037..dc205b306f 100644 --- a/python/pyspark/shell.py +++ b/python/pyspark/shell.py @@ -24,6 +24,7 @@ import os import platform import pyspark from pyspark.context import SparkContext +from pyspark.storagelevel import StorageLevel # this is the equivalent of ADD_JARS add_files = os.environ.get("ADD_FILES").split(',') if os.environ.get("ADD_FILES") != None else None diff --git a/python/pyspark/storagelevel.py b/python/pyspark/storagelevel.py new file mode 100644 index 0000000000..b31f4762e6 --- /dev/null +++ b/python/pyspark/storagelevel.py @@ -0,0 +1,43 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +__all__ = ["StorageLevel"] + +class StorageLevel: + """ + Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory, + whether to drop the RDD to disk if it falls out of memory, whether to keep the data in memory + in a serialized format, and whether to replicate the RDD partitions on multiple nodes. + Also contains static constants for some commonly used storage levels, such as MEMORY_ONLY. + """ + + def __init__(self, useDisk, useMemory, deserialized, replication = 1): + self.useDisk = useDisk + self.useMemory = useMemory + self.deserialized = deserialized + self.replication = replication + +StorageLevel.DISK_ONLY = StorageLevel(True, False, False) +StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, 2) +StorageLevel.MEMORY_ONLY = StorageLevel(False, True, True) +StorageLevel.MEMORY_ONLY_2 = StorageLevel(False, True, True, 2) +StorageLevel.MEMORY_ONLY_SER = StorageLevel(False, True, False) +StorageLevel.MEMORY_ONLY_SER_2 = StorageLevel(False, True, False, 2) +StorageLevel.MEMORY_AND_DISK = StorageLevel(True, True, True) +StorageLevel.MEMORY_AND_DISK_2 = StorageLevel(True, True, True, 2) +StorageLevel.MEMORY_AND_DISK_SER = StorageLevel(True, True, False) +StorageLevel.MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, 2) |