aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/context.py
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@eecs.berkeley.edu>2013-01-23 10:36:18 -0800
committerJosh Rosen <joshrosen@eecs.berkeley.edu>2013-01-23 10:58:50 -0800
commitae2ed2947d43860c74a8d40767e289ca78073977 (patch)
treec72a585db233832ae53e52c4e015b8d52813643c /python/pyspark/context.py
parent35168d9c89904f0dc0bb470c1799f5ca3b04221f (diff)
downloadspark-ae2ed2947d43860c74a8d40767e289ca78073977.tar.gz
spark-ae2ed2947d43860c74a8d40767e289ca78073977.tar.bz2
spark-ae2ed2947d43860c74a8d40767e289ca78073977.zip
Allow PySpark's SparkFiles to be used from driver
Fix minor documentation formatting issues.
Diffstat (limited to 'python/pyspark/context.py')
-rw-r--r--python/pyspark/context.py27
1 files changed, 21 insertions, 6 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index b8d7dc05af..3e33776af0 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -1,12 +1,15 @@
import os
import atexit
import shutil
+import sys
import tempfile
+from threading import Lock
from tempfile import NamedTemporaryFile
from pyspark import accumulators
from pyspark.accumulators import Accumulator
from pyspark.broadcast import Broadcast
+from pyspark.files import SparkFiles
from pyspark.java_gateway import launch_gateway
from pyspark.serializers import dump_pickle, write_with_length, batched
from pyspark.rdd import RDD
@@ -27,6 +30,8 @@ class SparkContext(object):
_writeIteratorToPickleFile = jvm.PythonRDD.writeIteratorToPickleFile
_takePartition = jvm.PythonRDD.takePartition
_next_accum_id = 0
+ _active_spark_context = None
+ _lock = Lock()
def __init__(self, master, jobName, sparkHome=None, pyFiles=None,
environment=None, batchSize=1024):
@@ -46,6 +51,11 @@ class SparkContext(object):
Java object. Set 1 to disable batching or -1 to use an
unlimited batch size.
"""
+ with SparkContext._lock:
+ if SparkContext._active_spark_context:
+ raise ValueError("Cannot run multiple SparkContexts at once")
+ else:
+ SparkContext._active_spark_context = self
self.master = master
self.jobName = jobName
self.sparkHome = sparkHome or None # None becomes null in Py4J
@@ -75,6 +85,8 @@ class SparkContext(object):
# Deploy any code dependencies specified in the constructor
for path in (pyFiles or []):
self.addPyFile(path)
+ SparkFiles._sc = self
+ sys.path.append(SparkFiles.getRootDirectory())
@property
def defaultParallelism(self):
@@ -85,17 +97,20 @@ class SparkContext(object):
return self._jsc.sc().defaultParallelism()
def __del__(self):
- if self._jsc:
- self._jsc.stop()
- if self._accumulatorServer:
- self._accumulatorServer.shutdown()
+ self.stop()
def stop(self):
"""
Shut down the SparkContext.
"""
- self._jsc.stop()
- self._jsc = None
+ if self._jsc:
+ self._jsc.stop()
+ self._jsc = None
+ if self._accumulatorServer:
+ self._accumulatorServer.shutdown()
+ self._accumulatorServer = None
+ with SparkContext._lock:
+ SparkContext._active_spark_context = None
def parallelize(self, c, numSlices=None):
"""