aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorPrashant Sharma <scrapcodes@gmail.com>2013-09-15 10:55:12 +0530
committerPrashant Sharma <scrapcodes@gmail.com>2013-09-15 10:55:12 +0530
commit383e151fd7138cc6a143b3a38037cc3038c2a8b9 (patch)
tree0dbb2c0d8fdeff4c37a577eb96acb87ee7838a16 /python
parent20c65bc334091d8d05fb680551155aa182d98f7d (diff)
parentc4c1db2dd5b2ec0a8182369ecdb0e14f4e199822 (diff)
downloadspark-383e151fd7138cc6a143b3a38037cc3038c2a8b9.tar.gz
spark-383e151fd7138cc6a143b3a38037cc3038c2a8b9.tar.bz2
spark-383e151fd7138cc6a143b3a38037cc3038c2a8b9.zip
Merge branch 'master' of git://github.com/mesos/spark into scala-2.10
Conflicts: core/src/main/scala/org/apache/spark/SparkContext.scala project/SparkBuild.scala
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/__init__.py5
-rw-r--r--python/pyspark/context.py11
-rw-r--r--python/pyspark/rdd.py19
-rw-r--r--python/pyspark/shell.py1
-rw-r--r--python/pyspark/storagelevel.py43
5 files changed, 78 insertions, 1 deletions
diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py
index fd5972d381..1f35f6f939 100644
--- a/python/pyspark/__init__.py
+++ b/python/pyspark/__init__.py
@@ -30,6 +30,8 @@ Public classes:
An "add-only" shared variable that tasks can only add values to.
- L{SparkFiles<pyspark.files.SparkFiles>}
Access files shipped with jobs.
+ - L{StorageLevel<pyspark.storagelevel.StorageLevel>}
+ Finer-grained cache persistence levels.
"""
import sys
import os
@@ -39,6 +41,7 @@ sys.path.insert(0, os.path.join(os.environ["SPARK_HOME"], "python/lib/py4j0.7.eg
from pyspark.context import SparkContext
from pyspark.rdd import RDD
from pyspark.files import SparkFiles
+from pyspark.storagelevel import StorageLevel
-__all__ = ["SparkContext", "RDD", "SparkFiles"]
+__all__ = ["SparkContext", "RDD", "SparkFiles", "StorageLevel"]
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 8fbf296509..597110321a 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -27,6 +27,7 @@ from pyspark.broadcast import Broadcast
from pyspark.files import SparkFiles
from pyspark.java_gateway import launch_gateway
from pyspark.serializers import dump_pickle, write_with_length, batched
+from pyspark.storagelevel import StorageLevel
from pyspark.rdd import RDD
from py4j.java_collections import ListConverter
@@ -279,6 +280,16 @@ class SparkContext(object):
"""
self._jsc.sc().setCheckpointDir(dirName, useExisting)
+ def _getJavaStorageLevel(self, storageLevel):
+ """
+ Returns a Java StorageLevel based on a pyspark.StorageLevel.
+ """
+ if not isinstance(storageLevel, StorageLevel):
+ raise Exception("storageLevel must be of type pyspark.StorageLevel")
+
+ newStorageLevel = self._jvm.org.apache.spark.storage.StorageLevel
+ return newStorageLevel(storageLevel.useDisk, storageLevel.useMemory,
+ storageLevel.deserialized, storageLevel.replication)
def _test():
import atexit
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 1831435b33..7611b13e82 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -70,6 +70,25 @@ class RDD(object):
self._jrdd.cache()
return self
+ def persist(self, storageLevel):
+ """
+ Set this RDD's storage level to persist its values across operations after the first time
+ it is computed. This can only be used to assign a new storage level if the RDD does not
+ have a storage level set yet.
+ """
+ self.is_cached = True
+ javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel)
+ self._jrdd.persist(javaStorageLevel)
+ return self
+
+ def unpersist(self):
+ """
+ Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
+ """
+ self.is_cached = False
+ self._jrdd.unpersist()
+ return self
+
def checkpoint(self):
"""
Mark this RDD for checkpointing. It will be saved to a file inside the
diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py
index 54823f8037..dc205b306f 100644
--- a/python/pyspark/shell.py
+++ b/python/pyspark/shell.py
@@ -24,6 +24,7 @@ import os
import platform
import pyspark
from pyspark.context import SparkContext
+from pyspark.storagelevel import StorageLevel
# this is the equivalent of ADD_JARS
add_files = os.environ.get("ADD_FILES").split(',') if os.environ.get("ADD_FILES") != None else None
diff --git a/python/pyspark/storagelevel.py b/python/pyspark/storagelevel.py
new file mode 100644
index 0000000000..b31f4762e6
--- /dev/null
+++ b/python/pyspark/storagelevel.py
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+__all__ = ["StorageLevel"]
+
+class StorageLevel:
+ """
+ Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory,
+ whether to drop the RDD to disk if it falls out of memory, whether to keep the data in memory
+ in a serialized format, and whether to replicate the RDD partitions on multiple nodes.
+ Also contains static constants for some commonly used storage levels, such as MEMORY_ONLY.
+ """
+
+ def __init__(self, useDisk, useMemory, deserialized, replication = 1):
+ self.useDisk = useDisk
+ self.useMemory = useMemory
+ self.deserialized = deserialized
+ self.replication = replication
+
+StorageLevel.DISK_ONLY = StorageLevel(True, False, False)
+StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, 2)
+StorageLevel.MEMORY_ONLY = StorageLevel(False, True, True)
+StorageLevel.MEMORY_ONLY_2 = StorageLevel(False, True, True, 2)
+StorageLevel.MEMORY_ONLY_SER = StorageLevel(False, True, False)
+StorageLevel.MEMORY_ONLY_SER_2 = StorageLevel(False, True, False, 2)
+StorageLevel.MEMORY_AND_DISK = StorageLevel(True, True, True)
+StorageLevel.MEMORY_AND_DISK_2 = StorageLevel(True, True, True, 2)
+StorageLevel.MEMORY_AND_DISK_SER = StorageLevel(True, True, False)
+StorageLevel.MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, 2)