aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/streaming/tests.py16
-rw-r--r--python/pyspark/tests.py3
-rwxr-xr-xpython/run-tests164
-rwxr-xr-xpython/run-tests.py132
4 files changed, 153 insertions, 162 deletions
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 57049beea4..91ce681fbe 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -15,6 +15,7 @@
# limitations under the License.
#
+import glob
import os
import sys
from itertools import chain
@@ -677,4 +678,19 @@ class KafkaStreamTests(PySparkStreamingTestCase):
self._validateRddResult(sendData, rdd)
if __name__ == "__main__":
+ SPARK_HOME = os.environ["SPARK_HOME"]
+ kafka_assembly_dir = os.path.join(SPARK_HOME, "external/kafka-assembly")
+ jars = glob.glob(
+ os.path.join(kafka_assembly_dir, "target/scala-*/spark-streaming-kafka-assembly-*.jar"))
+ if not jars:
+ raise Exception(
+ ("Failed to find Spark Streaming kafka assembly jar in %s. " % kafka_assembly_dir) +
+ "You need to build Spark with "
+ "'build/sbt assembly/assembly streaming-kafka-assembly/assembly' or "
+ "'build/mvn package' before running this test")
+ elif len(jars) > 1:
+ raise Exception(("Found multiple Spark Streaming Kafka assembly JARs in %s; please "
+ "remove all but one") % kafka_assembly_dir)
+ else:
+ os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars[0]
unittest.main()
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 7826542368..17256dfc95 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -1421,7 +1421,8 @@ class DaemonTests(unittest.TestCase):
# start daemon
daemon_path = os.path.join(os.path.dirname(__file__), "daemon.py")
- daemon = Popen([sys.executable, daemon_path], stdin=PIPE, stdout=PIPE)
+ python_exec = sys.executable or os.environ.get("PYSPARK_PYTHON")
+ daemon = Popen([python_exec, daemon_path], stdin=PIPE, stdout=PIPE)
# read the port number
port = read_int(daemon.stdout)
diff --git a/python/run-tests b/python/run-tests
index 4468fdb3f2..24949657ed 100755
--- a/python/run-tests
+++ b/python/run-tests
@@ -18,165 +18,7 @@
#
-# Figure out where the Spark framework is installed
-FWDIR="$(cd "`dirname "$0"`"; cd ../; pwd)"
+FWDIR="$(cd "`dirname $0`"/..; pwd)"
+cd "$FWDIR"
-. "$FWDIR"/bin/load-spark-env.sh
-
-# CD into the python directory to find things on the right path
-cd "$FWDIR/python"
-
-FAILED=0
-LOG_FILE=unit-tests.log
-START=$(date +"%s")
-
-rm -f $LOG_FILE
-
-# Remove the metastore and warehouse directory created by the HiveContext tests in Spark SQL
-rm -rf metastore warehouse
-
-function run_test() {
- echo -en "Running test: $1 ... " | tee -a $LOG_FILE
- start=$(date +"%s")
- SPARK_TESTING=1 time "$FWDIR"/bin/pyspark $1 > $LOG_FILE 2>&1
-
- FAILED=$((PIPESTATUS[0]||$FAILED))
-
- # Fail and exit on the first test failure.
- if [[ $FAILED != 0 ]]; then
- cat $LOG_FILE | grep -v "^[0-9][0-9]*" # filter all lines starting with a number.
- echo -en "\033[31m" # Red
- echo "Had test failures; see logs."
- echo -en "\033[0m" # No color
- exit -1
- else
- now=$(date +"%s")
- echo "ok ($(($now - $start))s)"
- fi
-}
-
-function run_core_tests() {
- echo "Run core tests ..."
- run_test "pyspark.rdd"
- run_test "pyspark.context"
- run_test "pyspark.conf"
- run_test "pyspark.broadcast"
- run_test "pyspark.accumulators"
- run_test "pyspark.serializers"
- run_test "pyspark.profiler"
- run_test "pyspark.shuffle"
- run_test "pyspark.tests"
-}
-
-function run_sql_tests() {
- echo "Run sql tests ..."
- run_test "pyspark.sql.types"
- run_test "pyspark.sql.context"
- run_test "pyspark.sql.column"
- run_test "pyspark.sql.dataframe"
- run_test "pyspark.sql.group"
- run_test "pyspark.sql.functions"
- run_test "pyspark.sql.readwriter"
- run_test "pyspark.sql.window"
- run_test "pyspark.sql.tests"
-}
-
-function run_mllib_tests() {
- echo "Run mllib tests ..."
- run_test "pyspark.mllib.classification"
- run_test "pyspark.mllib.clustering"
- run_test "pyspark.mllib.evaluation"
- run_test "pyspark.mllib.feature"
- run_test "pyspark.mllib.fpm"
- run_test "pyspark.mllib.linalg"
- run_test "pyspark.mllib.random"
- run_test "pyspark.mllib.recommendation"
- run_test "pyspark.mllib.regression"
- run_test "pyspark.mllib.stat._statistics"
- run_test "pyspark.mllib.stat.KernelDensity"
- run_test "pyspark.mllib.tree"
- run_test "pyspark.mllib.util"
- run_test "pyspark.mllib.tests"
-}
-
-function run_ml_tests() {
- echo "Run ml tests ..."
- run_test "pyspark.ml.feature"
- run_test "pyspark.ml.classification"
- run_test "pyspark.ml.recommendation"
- run_test "pyspark.ml.regression"
- run_test "pyspark.ml.tuning"
- run_test "pyspark.ml.tests"
- run_test "pyspark.ml.evaluation"
-}
-
-function run_streaming_tests() {
- echo "Run streaming tests ..."
-
- KAFKA_ASSEMBLY_DIR="$FWDIR"/external/kafka-assembly
- JAR_PATH="${KAFKA_ASSEMBLY_DIR}/target/scala-${SPARK_SCALA_VERSION}"
- for f in "${JAR_PATH}"/spark-streaming-kafka-assembly-*.jar; do
- if [[ ! -e "$f" ]]; then
- echo "Failed to find Spark Streaming Kafka assembly jar in $KAFKA_ASSEMBLY_DIR" 1>&2
- echo "You need to build Spark with " \
- "'build/sbt assembly/assembly streaming-kafka-assembly/assembly' or" \
- "'build/mvn package' before running this program" 1>&2
- exit 1
- fi
- KAFKA_ASSEMBLY_JAR="$f"
- done
-
- export PYSPARK_SUBMIT_ARGS="--jars ${KAFKA_ASSEMBLY_JAR} pyspark-shell"
- run_test "pyspark.streaming.util"
- run_test "pyspark.streaming.tests"
-}
-
-echo "Running PySpark tests. Output is in python/$LOG_FILE."
-
-export PYSPARK_PYTHON="python"
-
-# Try to test with Python 2.6, since that's the minimum version that we support:
-if [ $(which python2.6) ]; then
- export PYSPARK_PYTHON="python2.6"
-fi
-
-echo "Testing with Python version:"
-$PYSPARK_PYTHON --version
-
-run_core_tests
-run_sql_tests
-run_mllib_tests
-run_ml_tests
-run_streaming_tests
-
-# Try to test with Python 3
-if [ $(which python3.4) ]; then
- export PYSPARK_PYTHON="python3.4"
- echo "Testing with Python3.4 version:"
- $PYSPARK_PYTHON --version
-
- run_core_tests
- run_sql_tests
- run_mllib_tests
- run_ml_tests
- run_streaming_tests
-fi
-
-# Try to test with PyPy
-if [ $(which pypy) ]; then
- export PYSPARK_PYTHON="pypy"
- echo "Testing with PyPy version:"
- $PYSPARK_PYTHON --version
-
- run_core_tests
- run_sql_tests
- run_streaming_tests
-fi
-
-if [[ $FAILED == 0 ]]; then
- now=$(date +"%s")
- echo -e "\033[32mTests passed \033[0min $(($now - $START)) seconds"
-fi
-
-# TODO: in the long-run, it would be nice to use a test runner like `nose`.
-# The doctest fixtures are the current barrier to doing this.
+exec python -u ./python/run-tests.py "$@"
diff --git a/python/run-tests.py b/python/run-tests.py
new file mode 100755
index 0000000000..7d485b500e
--- /dev/null
+++ b/python/run-tests.py
@@ -0,0 +1,132 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import print_function
+from optparse import OptionParser
+import os
+import re
+import subprocess
+import sys
+import time
+
+
+# Append `SPARK_HOME/dev` to the Python path so that we can import the sparktestsupport module
+sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "../dev/"))
+
+
+from sparktestsupport import SPARK_HOME # noqa (suppress pep8 warnings)
+from sparktestsupport.shellutils import which # noqa
+from sparktestsupport.modules import all_modules # noqa
+
+
+python_modules = dict((m.name, m) for m in all_modules if m.python_test_goals if m.name != 'root')
+
+
+def print_red(text):
+ print('\033[31m' + text + '\033[0m')
+
+
+LOG_FILE = os.path.join(SPARK_HOME, "python/unit-tests.log")
+
+
+def run_individual_python_test(test_name, pyspark_python):
+ env = {'SPARK_TESTING': '1', 'PYSPARK_PYTHON': which(pyspark_python)}
+ print(" Running test: %s ..." % test_name, end='')
+ start_time = time.time()
+ with open(LOG_FILE, 'a') as log_file:
+ retcode = subprocess.call(
+ [os.path.join(SPARK_HOME, "bin/pyspark"), test_name],
+ stderr=log_file, stdout=log_file, env=env)
+ duration = time.time() - start_time
+ # Exit on the first failure.
+ if retcode != 0:
+ with open(LOG_FILE, 'r') as log_file:
+ for line in log_file:
+ if not re.match('[0-9]+', line):
+ print(line, end='')
+ print_red("\nHad test failures in %s; see logs." % test_name)
+ exit(-1)
+ else:
+ print("ok (%is)" % duration)
+
+
+def get_default_python_executables():
+ python_execs = [x for x in ["python2.6", "python3.4", "pypy"] if which(x)]
+ if "python2.6" not in python_execs:
+ print("WARNING: Not testing against `python2.6` because it could not be found; falling"
+ " back to `python` instead")
+ python_execs.insert(0, "python")
+ return python_execs
+
+
+def parse_opts():
+ parser = OptionParser(
+ prog="run-tests"
+ )
+ parser.add_option(
+ "--python-executables", type="string", default=','.join(get_default_python_executables()),
+ help="A comma-separated list of Python executables to test against (default: %default)"
+ )
+ parser.add_option(
+ "--modules", type="string",
+ default=",".join(sorted(python_modules.keys())),
+ help="A comma-separated list of Python modules to test (default: %default)"
+ )
+
+ (opts, args) = parser.parse_args()
+ if args:
+ parser.error("Unsupported arguments: %s" % ' '.join(args))
+ return opts
+
+
+def main():
+ opts = parse_opts()
+ print("Running PySpark tests. Output is in python/%s" % LOG_FILE)
+ if os.path.exists(LOG_FILE):
+ os.remove(LOG_FILE)
+ python_execs = opts.python_executables.split(',')
+ modules_to_test = []
+ for module_name in opts.modules.split(','):
+ if module_name in python_modules:
+ modules_to_test.append(python_modules[module_name])
+ else:
+ print("Error: unrecognized module %s" % module_name)
+ sys.exit(-1)
+ print("Will test against the following Python executables: %s" % python_execs)
+ print("Will test the following Python modules: %s" % [x.name for x in modules_to_test])
+
+ start_time = time.time()
+ for python_exec in python_execs:
+ python_implementation = subprocess.check_output(
+ [python_exec, "-c", "import platform; print(platform.python_implementation())"],
+ universal_newlines=True).strip()
+ print("Testing with `%s`: " % python_exec, end='')
+ subprocess.call([python_exec, "--version"])
+
+ for module in modules_to_test:
+ if python_implementation not in module.blacklisted_python_implementations:
+ print("Running %s tests ..." % module.name)
+ for test_goal in module.python_test_goals:
+ run_individual_python_test(test_goal, python_exec)
+ total_duration = time.time() - start_time
+ print("Tests passed in %i seconds" % total_duration)
+
+
+if __name__ == "__main__":
+ main()