diff options
author | Andre Schumacher <schumach@icsi.berkeley.edu> | 2013-08-15 16:01:19 -0700 |
---|---|---|
committer | Andre Schumacher <schumach@icsi.berkeley.edu> | 2013-08-16 11:58:20 -0700 |
commit | c7e348faec45ad1d996d16639015c4bc4fc3bc92 (patch) | |
tree | 45e69b999c4b4af6bd7528e3dcc860bce264e14f /python/pyspark/context.py | |
parent | 659553b21ddd7504889ce113a816c1db4a73f167 (diff) | |
download | spark-c7e348faec45ad1d996d16639015c4bc4fc3bc92.tar.gz spark-c7e348faec45ad1d996d16639015c4bc4fc3bc92.tar.bz2 spark-c7e348faec45ad1d996d16639015c4bc4fc3bc92.zip |
Implementing SPARK-878 for PySpark: adding zip and egg files to context and passing it down to workers which add these to their sys.path
Diffstat (limited to 'python/pyspark/context.py')
-rw-r--r-- | python/pyspark/context.py | 14 |
1 files changed, 11 insertions, 3 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py index c2b49ff37a..2803ce90f3 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -46,6 +46,7 @@ class SparkContext(object): _next_accum_id = 0 _active_spark_context = None _lock = Lock() + _python_includes = None # zip and egg files that need to be added to PYTHONPATH def __init__(self, master, jobName, sparkHome=None, pyFiles=None, environment=None, batchSize=1024): @@ -103,11 +104,14 @@ class SparkContext(object): # send. self._pickled_broadcast_vars = set() + SparkFiles._sc = self + root_dir = SparkFiles.getRootDirectory() + sys.path.append(root_dir) + # Deploy any code dependencies specified in the constructor + self._python_includes = list() for path in (pyFiles or []): self.addPyFile(path) - SparkFiles._sc = self - sys.path.append(SparkFiles.getRootDirectory()) # Create a temporary directory inside spark.local.dir: local_dir = self._jvm.spark.Utils.getLocalDir() @@ -257,7 +261,11 @@ class SparkContext(object): HTTP, HTTPS or FTP URI. """ self.addFile(path) - filename = path.split("/")[-1] + (dirname, filename) = os.path.split(path) # dirname may be directory or HDFS/S3 prefix + + if filename.endswith('.zip') or filename.endswith('.ZIP') or filename.endswith('.egg'): + self._python_includes.append(filename) + sys.path.append(os.path.join(SparkFiles.getRootDirectory(), filename)) # for tests in local mode def setCheckpointDir(self, dirName, useExisting=False): """ |