From 1d53792a0a48695824c29274be84b74d8d6a2e6a Mon Sep 17 00:00:00 2001 From: shane-huang Date: Mon, 23 Sep 2013 16:13:46 +0800 Subject: add scripts in bin Signed-off-by: shane-huang --- python/run-tests | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'python/run-tests') diff --git a/python/run-tests b/python/run-tests index cbc554ea9d..8a08ae3df9 100755 --- a/python/run-tests +++ b/python/run-tests @@ -29,7 +29,7 @@ FAILED=0 rm -f unit-tests.log function run_test() { - $FWDIR/pyspark $1 2>&1 | tee -a unit-tests.log + $FWDIR/bin/pyspark $1 2>&1 | tee -a unit-tests.log FAILED=$((PIPESTATUS[0]||$FAILED)) } -- cgit v1.2.3 From 615fb649d66b13371927a051d249433d746c5f19 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Sun, 29 Dec 2013 14:31:45 -0500 Subject: Fix some other Python tests due to initializing JVM in a different way The test in context.py created two different instances of the SparkContext class by copying "globals", so that some tests can have a global "sc" object and others can try initializing their own contexts. This led to two JVM gateways being created since SparkConf also looked at pyspark.context.SparkContext to get the JVM. --- python/pyspark/conf.py | 5 +++-- python/pyspark/context.py | 23 +++++++++++++++-------- python/run-tests | 1 + 3 files changed, 19 insertions(+), 10 deletions(-) (limited to 'python/run-tests') diff --git a/python/pyspark/conf.py b/python/pyspark/conf.py index 56e615c287..eb7a6c13fe 100644 --- a/python/pyspark/conf.py +++ b/python/pyspark/conf.py @@ -50,10 +50,11 @@ u'value1' class SparkConf(object): - def __init__(self, loadDefaults=False): + def __init__(self, loadDefaults=True, _jvm=None): from pyspark.context import SparkContext SparkContext._ensure_initialized() - self._jconf = SparkContext._jvm.SparkConf(loadDefaults) + _jvm = _jvm or SparkContext._jvm + self._jconf = _jvm.SparkConf(loadDefaults) def set(self, key, value): self._jconf.set(key, value) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 97c1526afd..9d75c2b6f1 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -81,7 +81,8 @@ class SparkContext(object): """ SparkContext._ensure_initialized(self) - self.conf = conf or SparkConf() + self.environment = environment or {} + self.conf = conf or SparkConf(_jvm=self._jvm) self._batchSize = batchSize # -1 represents an unlimited batch size self._unbatched_serializer = serializer if batchSize == 1: @@ -90,23 +91,30 @@ class SparkContext(object): self.serializer = BatchedSerializer(self._unbatched_serializer, batchSize) - # Set parameters passed directly on our conf; these operations will be no-ops - # if the parameters were None + # Set parameters passed directly to us on the conf; these operations will be + # no-ops if the parameters were None self.conf.setMaster(master) self.conf.setAppName(appName) self.conf.setSparkHome(sparkHome) - environment = environment or {} - for key, value in environment.iteritems(): - self.conf.setExecutorEnv(key, value) + if environment: + for key, value in environment.iteritems(): + self.conf.setExecutorEnv(key, value) + # Check that we have at least the required parameters if not self.conf.contains("spark.master"): raise Exception("A master URL must be set in your configuration") if not self.conf.contains("spark.appName"): raise Exception("An application name must be set in your configuration") + # Read back our properties from the conf in case we loaded some of them from + # the classpath or an external config file self.master = self.conf.get("spark.master") self.appName = self.conf.get("spark.appName") self.sparkHome = self.conf.getOrElse("spark.home", None) + for (k, v) in self.conf.getAll(): + if k.startswith("spark.executorEnv."): + varName = k[len("spark.executorEnv."):] + self.environment[varName] = v # Create the Java SparkContext through Py4J self._jsc = self._jvm.JavaSparkContext(self.conf._jconf) @@ -147,8 +155,7 @@ class SparkContext(object): if not SparkContext._gateway: SparkContext._gateway = launch_gateway() SparkContext._jvm = SparkContext._gateway.jvm - SparkContext._writeToFile = \ - SparkContext._jvm.PythonRDD.writeToFile + SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile if instance: if SparkContext._active_spark_context and SparkContext._active_spark_context != instance: diff --git a/python/run-tests b/python/run-tests index d4dad672d2..a0898b3c21 100755 --- a/python/run-tests +++ b/python/run-tests @@ -35,6 +35,7 @@ function run_test() { run_test "pyspark/rdd.py" run_test "pyspark/context.py" +run_test "pyspark/conf.py" run_test "-m doctest pyspark/broadcast.py" run_test "-m doctest pyspark/accumulators.py" run_test "-m doctest pyspark/serializers.py" -- cgit v1.2.3 From eaa8a68ff08304f713f4f75d39c61c020e0e691d Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Sun, 29 Dec 2013 20:15:07 -0500 Subject: Fix some Python docs and make sure to unset SPARK_TESTING in Python tests so we don't get the test spark.conf on the classpath. --- python/epydoc.conf | 2 +- python/pyspark/__init__.py | 31 +++++++++++++++++-------------- python/pyspark/broadcast.py | 11 +++++++++++ python/pyspark/conf.py | 10 +++++----- python/pyspark/context.py | 3 ++- python/run-tests | 2 +- 6 files changed, 37 insertions(+), 22 deletions(-) (limited to 'python/run-tests') diff --git a/python/epydoc.conf b/python/epydoc.conf index 0b42e729f8..95a6af0974 100644 --- a/python/epydoc.conf +++ b/python/epydoc.conf @@ -34,4 +34,4 @@ private: no exclude: pyspark.cloudpickle pyspark.worker pyspark.join pyspark.java_gateway pyspark.examples pyspark.shell pyspark.test - pyspark.rddsampler pyspark.daemon + pyspark.rddsampler pyspark.daemon pyspark.mllib._common diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py index f1b95acf09..2b2c3a061a 100644 --- a/python/pyspark/__init__.py +++ b/python/pyspark/__init__.py @@ -20,21 +20,24 @@ PySpark is the Python API for Spark. Public classes: - - L{SparkContext} - Main entry point for Spark functionality. - - L{RDD} - A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. - - L{Broadcast} - A broadcast variable that gets reused across tasks. - - L{Accumulator} - An "add-only" shared variable that tasks can only add values to. - - L{SparkConf} - Access files shipped with jobs. - - L{StorageLevel} - Finer-grained cache persistence levels. + - L{SparkContext} + Main entry point for Spark functionality. + - L{RDD} + A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. + - L{Broadcast} + A broadcast variable that gets reused across tasks. + - L{Accumulator} + An "add-only" shared variable that tasks can only add values to. + - L{SparkConf} + For configuring Spark. + - L{SparkFiles} + Access files shipped with jobs. + - L{StorageLevel} + Finer-grained cache persistence levels. """ + + + import sys import os sys.path.insert(0, os.path.join(os.environ["SPARK_HOME"], "python/lib/py4j0.7.egg")) diff --git a/python/pyspark/broadcast.py b/python/pyspark/broadcast.py index dfdaba274f..43f40f8783 100644 --- a/python/pyspark/broadcast.py +++ b/python/pyspark/broadcast.py @@ -45,7 +45,18 @@ def _from_id(bid): class Broadcast(object): + """ + A broadcast variable created with + L{SparkContext.broadcast()}. + Access its value through C{.value}. + """ + def __init__(self, bid, value, java_broadcast=None, pickle_registry=None): + """ + Should not be called directly by users -- use + L{SparkContext.broadcast()} + instead. + """ self.value = value self.bid = bid self._jbroadcast = java_broadcast diff --git a/python/pyspark/conf.py b/python/pyspark/conf.py index a79f348b52..cf98b0e071 100644 --- a/python/pyspark/conf.py +++ b/python/pyspark/conf.py @@ -55,11 +55,11 @@ class SparkConf(object): parameters as key-value pairs. Most of the time, you would create a SparkConf object with - C{SparkConf()}, which will load values from `spark.*` Java system - properties and any `spark.conf` on your application's classpath. - In this case, system properties take priority over `spark.conf`, - and any parameters you set directly on the `SparkConf` object take - priority over both of those. + C{SparkConf()}, which will load values from C{spark.*} Java system + properties and any C{spark.conf} on your Spark classpath. In this + case, system properties take priority over C{spark.conf}, and any + parameters you set directly on the C{SparkConf} object take priority + over both of those. For unit tests, you can also call C{SparkConf(false)} to skip loading external settings and get the same configuration no matter diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 1244a1495f..8b028027eb 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -267,7 +267,8 @@ class SparkContext(object): def broadcast(self, value): """ - Broadcast a read-only variable to the cluster, returning a C{Broadcast} + Broadcast a read-only variable to the cluster, returning a + L{Broadcast} object for reading it in distributed functions. The variable will be sent to each cluster only once. """ diff --git a/python/run-tests b/python/run-tests index a0898b3c21..4b71fff7c1 100755 --- a/python/run-tests +++ b/python/run-tests @@ -29,7 +29,7 @@ FAILED=0 rm -f unit-tests.log function run_test() { - $FWDIR/pyspark $1 2>&1 | tee -a unit-tests.log + SPARK_TESTING=0 $FWDIR/pyspark $1 2>&1 | tee -a unit-tests.log FAILED=$((PIPESTATUS[0]||$FAILED)) } -- cgit v1.2.3