#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import print_function
from optparse import OptionParser
import os
import re
import subprocess
import sys
import time
# Append `SPARK_HOME/dev` to the Python path so that we can import the sparktestsupport module
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "../dev/"))
from sparktestsupport import SPARK_HOME # noqa (suppress pep8 warnings)
from sparktestsupport.shellutils import which # noqa
from sparktestsupport.modules import all_modules # noqa
python_modules = dict((m.name, m) for m in all_modules if m.python_test_goals if m.name != 'root')
def print_red(text):
print('\033[31m' + text + '\033[0m')
LOG_FILE = os.path.join(SPARK_HOME, "python/unit-tests.log")
def run_individual_python_test(test_name, pyspark_python):
env = {'SPARK_TESTING': '1', 'PYSPARK_PYTHON': which(pyspark_python)}
print(" Running test: %s ..." % test_name, end='')
start_time = time.time()
with open(LOG_FILE, 'a') as log_file:
retcode = subprocess.call(
[os.path.join(SPARK_HOME, "bin/pyspark"), test_name],
stderr=log_file, stdout=log_file, env=env)
duration = time.time() - start_time
# Exit on the first failure.
if retcode != 0:
with open(LOG_FILE, 'r') as log_file:
for line in log_file:
if not re.match('[0-9]+', line):
print(line, end='')
print_red("\nHad test failures in %s; see logs." % test_name)
exit(-1)
else:
print("ok (%is)" % duration)
def get_default_python_executables():
python_execs = [x for x in ["python2.6", "python3.4", "pypy"] if which(x)]
if "python2.6" not in python_execs:
print("WARNING: Not testing against `python2.6` because it could not be found; falling"
" back to `python` instead")
python_execs.insert(0, "python")
return python_execs
def parse_opts():
parser = OptionParser(
prog="run-tests"
)
parser.add_option(
"--python-executables", type="string", default=','.join(get_default_python_executables()),
help="A comma-separated list of Python executables to test against (default: %default)"
)
parser.add_option(
"--modules", type="string",
default=",".join(sorted(python_modules.keys())),
help="A comma-separated list of Python modules to test (default: %default)"
)
(opts, args) = parser.parse_args()
if args:
parser.error("Unsupported arguments: %s" % ' '.join(args))
return opts
def main():
opts = parse_opts()
print("Running PySpark tests. Output is in python/%s" % LOG_FILE)
if os.path.exists(LOG_FILE):
os.remove(LOG_FILE)
python_execs = opts.python_executables.split(',')
modules_to_test = []
for module_name in opts.modules.split(','):
if module_name in python_modules:
modules_to_test.append(python_modules[module_name])
else:
print("Error: unrecognized module %s" % module_name)
sys.exit(-1)
print("Will test against the following Python executables: %s" % python_execs)
print("Will test the following Python modules: %s" % [x.name for x in modules_to_test])
start_time = time.time()
for python_exec in python_execs:
python_implementation = subprocess.check_output(
[python_exec, "-c", "import platform; print(platform.python_implementation())"],
universal_newlines=True).strip()
print("Testing with `%s`: " % python_exec, end='')
subprocess.call([python_exec, "--version"])
for module in modules_to_test:
if python_implementation not in module.blacklisted_python_implementations:
print("Running %s tests ..." % module.name)
for test_goal in module.python_test_goals:
run_individual_python_test(test_goal, python_exec)
total_duration = time.time() - start_time
print("Tests passed in %i seconds" % total_duration)
if __name__ == "__main__":
main()