aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/context.py
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@eecs.berkeley.edu>2013-01-21 16:42:24 -0800
committerJosh Rosen <joshrosen@eecs.berkeley.edu>2013-01-21 17:34:17 -0800
commitef711902c1f42db14c8ddd524195f0a9efb56e65 (patch)
treee770a7439d3983c13346cbd81aa1eeeef23e2571 /python/pyspark/context.py
parent506077c9938cd411842fe42404aa6b74b45b23a1 (diff)
downloadspark-ef711902c1f42db14c8ddd524195f0a9efb56e65.tar.gz
spark-ef711902c1f42db14c8ddd524195f0a9efb56e65.tar.bz2
spark-ef711902c1f42db14c8ddd524195f0a9efb56e65.zip
Don't download files to master's working directory.
This should avoid exceptions caused by existing files with different contents. I also removed some unused code.
Diffstat (limited to 'python/pyspark/context.py')
-rw-r--r--python/pyspark/context.py40
1 files changed, 36 insertions, 4 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index dcbed37270..ec0cc7c2f9 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -1,5 +1,7 @@
import os
import atexit
+import shutil
+import tempfile
from tempfile import NamedTemporaryFile
from pyspark import accumulators
@@ -173,10 +175,26 @@ class SparkContext(object):
def addFile(self, path):
"""
- Add a file to be downloaded into the working directory of this Spark
- job on every node. The C{path} passed can be either a local file,
- a file in HDFS (or other Hadoop-supported filesystems), or an HTTP,
- HTTPS or FTP URI.
+ Add a file to be downloaded with this Spark job on every node.
+ The C{path} passed can be either a local file, a file in HDFS
+ (or other Hadoop-supported filesystems), or an HTTP, HTTPS or
+ FTP URI.
+
+ To access the file in Spark jobs, use
+ L{SparkFiles.get(path)<pyspark.files.SparkFiles.get>} to find its
+ download location.
+
+ >>> from pyspark import SparkFiles
+ >>> path = os.path.join(tempdir, "test.txt")
+ >>> with open(path, "w") as testFile:
+ ... testFile.write("100")
+ >>> sc.addFile(path)
+ >>> def func(iterator):
+ ... with open(SparkFiles.get("test.txt")) as testFile:
+ ... fileVal = int(testFile.readline())
+ ... return [x * 100 for x in iterator]
+ >>> sc.parallelize([1, 2, 3, 4]).mapPartitions(func).collect()
+ [100, 200, 300, 400]
"""
self._jsc.sc().addFile(path)
@@ -211,3 +229,17 @@ class SparkContext(object):
accidental overriding of checkpoint files in the existing directory.
"""
self._jsc.sc().setCheckpointDir(dirName, useExisting)
+
+
+def _test():
+ import doctest
+ globs = globals().copy()
+ globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
+ globs['tempdir'] = tempfile.mkdtemp()
+ atexit.register(lambda: shutil.rmtree(globs['tempdir']))
+ doctest.testmod(globs=globs)
+ globs['sc'].stop()
+
+
+if __name__ == "__main__":
+ _test()