aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/context.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/context.py')
-rw-r--r--python/pyspark/context.py42
1 files changed, 28 insertions, 14 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 78dccc4047..1dc2fec0ae 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -15,6 +15,8 @@
# limitations under the License.
#
+from __future__ import print_function
+
import os
import shutil
import sys
@@ -32,11 +34,14 @@ from pyspark.java_gateway import launch_gateway
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
PairDeserializer, AutoBatchedSerializer, NoOpSerializer
from pyspark.storagelevel import StorageLevel
-from pyspark.rdd import RDD, _load_from_socket
+from pyspark.rdd import RDD, _load_from_socket, ignore_unicode_prefix
from pyspark.traceback_utils import CallSite, first_spark_call
from pyspark.status import StatusTracker
from pyspark.profiler import ProfilerCollector, BasicProfiler
+if sys.version > '3':
+ xrange = range
+
__all__ = ['SparkContext']
@@ -133,7 +138,7 @@ class SparkContext(object):
if sparkHome:
self._conf.setSparkHome(sparkHome)
if environment:
- for key, value in environment.iteritems():
+ for key, value in environment.items():
self._conf.setExecutorEnv(key, value)
for key, value in DEFAULT_CONFIGS.items():
self._conf.setIfMissing(key, value)
@@ -153,6 +158,10 @@ class SparkContext(object):
if k.startswith("spark.executorEnv."):
varName = k[len("spark.executorEnv."):]
self.environment[varName] = v
+ if sys.version >= '3.3' and 'PYTHONHASHSEED' not in os.environ:
+ # disable randomness of hash of string in worker, if this is not
+ # launched by spark-submit
+ self.environment["PYTHONHASHSEED"] = "0"
# Create the Java SparkContext through Py4J
self._jsc = jsc or self._initialize_context(self._conf._jconf)
@@ -323,7 +332,7 @@ class SparkContext(object):
start0 = c[0]
def getStart(split):
- return start0 + (split * size / numSlices) * step
+ return start0 + int((split * size / numSlices)) * step
def f(split, iterator):
return xrange(getStart(split), getStart(split + 1), step)
@@ -357,6 +366,7 @@ class SparkContext(object):
minPartitions = minPartitions or self.defaultMinPartitions
return RDD(self._jsc.objectFile(name, minPartitions), self)
+ @ignore_unicode_prefix
def textFile(self, name, minPartitions=None, use_unicode=True):
"""
Read a text file from HDFS, a local file system (available on all
@@ -369,7 +379,7 @@ class SparkContext(object):
>>> path = os.path.join(tempdir, "sample-text.txt")
>>> with open(path, "w") as testFile:
- ... testFile.write("Hello world!")
+ ... _ = testFile.write("Hello world!")
>>> textFile = sc.textFile(path)
>>> textFile.collect()
[u'Hello world!']
@@ -378,6 +388,7 @@ class SparkContext(object):
return RDD(self._jsc.textFile(name, minPartitions), self,
UTF8Deserializer(use_unicode))
+ @ignore_unicode_prefix
def wholeTextFiles(self, path, minPartitions=None, use_unicode=True):
"""
Read a directory of text files from HDFS, a local file system
@@ -411,9 +422,9 @@ class SparkContext(object):
>>> dirPath = os.path.join(tempdir, "files")
>>> os.mkdir(dirPath)
>>> with open(os.path.join(dirPath, "1.txt"), "w") as file1:
- ... file1.write("1")
+ ... _ = file1.write("1")
>>> with open(os.path.join(dirPath, "2.txt"), "w") as file2:
- ... file2.write("2")
+ ... _ = file2.write("2")
>>> textFiles = sc.wholeTextFiles(dirPath)
>>> sorted(textFiles.collect())
[(u'.../1.txt', u'1'), (u'.../2.txt', u'2')]
@@ -456,7 +467,7 @@ class SparkContext(object):
jm = self._jvm.java.util.HashMap()
if not d:
d = {}
- for k, v in d.iteritems():
+ for k, v in d.items():
jm[k] = v
return jm
@@ -608,6 +619,7 @@ class SparkContext(object):
jrdd = self._jsc.checkpointFile(name)
return RDD(jrdd, self, input_deserializer)
+ @ignore_unicode_prefix
def union(self, rdds):
"""
Build the union of a list of RDDs.
@@ -618,7 +630,7 @@ class SparkContext(object):
>>> path = os.path.join(tempdir, "union-text.txt")
>>> with open(path, "w") as testFile:
- ... testFile.write("Hello")
+ ... _ = testFile.write("Hello")
>>> textFile = sc.textFile(path)
>>> textFile.collect()
[u'Hello']
@@ -677,7 +689,7 @@ class SparkContext(object):
>>> from pyspark import SparkFiles
>>> path = os.path.join(tempdir, "test.txt")
>>> with open(path, "w") as testFile:
- ... testFile.write("100")
+ ... _ = testFile.write("100")
>>> sc.addFile(path)
>>> def func(iterator):
... with open(SparkFiles.get("test.txt")) as testFile:
@@ -705,11 +717,13 @@ class SparkContext(object):
"""
self.addFile(path)
(dirname, filename) = os.path.split(path) # dirname may be directory or HDFS/S3 prefix
-
if filename[-4:].lower() in self.PACKAGE_EXTENSIONS:
self._python_includes.append(filename)
# for tests in local mode
sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), filename))
+ if sys.version > '3':
+ import importlib
+ importlib.invalidate_caches()
def setCheckpointDir(self, dirName):
"""
@@ -744,7 +758,7 @@ class SparkContext(object):
The application can use L{SparkContext.cancelJobGroup} to cancel all
running jobs in this group.
- >>> import thread, threading
+ >>> import threading
>>> from time import sleep
>>> result = "Not Set"
>>> lock = threading.Lock()
@@ -763,10 +777,10 @@ class SparkContext(object):
... sleep(5)
... sc.cancelJobGroup("job_to_cancel")
>>> supress = lock.acquire()
- >>> supress = thread.start_new_thread(start_job, (10,))
- >>> supress = thread.start_new_thread(stop_job, tuple())
+ >>> supress = threading.Thread(target=start_job, args=(10,)).start()
+ >>> supress = threading.Thread(target=stop_job).start()
>>> supress = lock.acquire()
- >>> print result
+ >>> print(result)
Cancelled
If interruptOnCancel is set to true for the job group, then job cancellation will result