aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/context.py
diff options
context:
space:
mode:
authorAndre Schumacher <schumach@icsi.berkeley.edu>2013-08-15 16:01:19 -0700
committerAndre Schumacher <schumach@icsi.berkeley.edu>2013-08-16 11:58:20 -0700
commitc7e348faec45ad1d996d16639015c4bc4fc3bc92 (patch)
tree45e69b999c4b4af6bd7528e3dcc860bce264e14f /python/pyspark/context.py
parent659553b21ddd7504889ce113a816c1db4a73f167 (diff)
downloadspark-c7e348faec45ad1d996d16639015c4bc4fc3bc92.tar.gz
spark-c7e348faec45ad1d996d16639015c4bc4fc3bc92.tar.bz2
spark-c7e348faec45ad1d996d16639015c4bc4fc3bc92.zip
Implementing SPARK-878 for PySpark: adding zip and egg files to context and passing it down to workers which add these to their sys.path
Diffstat (limited to 'python/pyspark/context.py')
-rw-r--r--python/pyspark/context.py14
1 files changed, 11 insertions, 3 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index c2b49ff37a..2803ce90f3 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -46,6 +46,7 @@ class SparkContext(object):
_next_accum_id = 0
_active_spark_context = None
_lock = Lock()
+ _python_includes = None # zip and egg files that need to be added to PYTHONPATH
def __init__(self, master, jobName, sparkHome=None, pyFiles=None,
environment=None, batchSize=1024):
@@ -103,11 +104,14 @@ class SparkContext(object):
# send.
self._pickled_broadcast_vars = set()
+ SparkFiles._sc = self
+ root_dir = SparkFiles.getRootDirectory()
+ sys.path.append(root_dir)
+
# Deploy any code dependencies specified in the constructor
+ self._python_includes = list()
for path in (pyFiles or []):
self.addPyFile(path)
- SparkFiles._sc = self
- sys.path.append(SparkFiles.getRootDirectory())
# Create a temporary directory inside spark.local.dir:
local_dir = self._jvm.spark.Utils.getLocalDir()
@@ -257,7 +261,11 @@ class SparkContext(object):
HTTP, HTTPS or FTP URI.
"""
self.addFile(path)
- filename = path.split("/")[-1]
+ (dirname, filename) = os.path.split(path) # dirname may be directory or HDFS/S3 prefix
+
+ if filename.endswith('.zip') or filename.endswith('.ZIP') or filename.endswith('.egg'):
+ self._python_includes.append(filename)
+ sys.path.append(os.path.join(SparkFiles.getRootDirectory(), filename)) # for tests in local mode
def setCheckpointDir(self, dirName, useExisting=False):
"""