#!/usr/bin/env python2 # # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import itertools import os import re import sys import shutil import subprocess from collections import namedtuple SPARK_HOME = os.path.join(os.path.dirname(os.path.realpath(__file__)), "..") USER_HOME = os.environ.get("HOME") # ------------------------------------------------------------------------------------------------- # Test module definitions and functions for traversing module dependency graph # ------------------------------------------------------------------------------------------------- all_modules = [] class Module(object): """ A module is the basic abstraction in our test runner script. Each module consists of a set of source files, a set of test commands, and a set of dependencies on other modules. We use modules to define a dependency graph that lets determine which tests to run based on which files have changed. """ def __init__(self, name, dependencies, source_file_regexes, build_profile_flags=(), sbt_test_goals=(), should_run_python_tests=False, should_run_r_tests=False): """ Define a new module. :param name: A short module name, for display in logging and error messages. :param dependencies: A set of dependencies for this module. This should only include direct dependencies; transitive dependencies are resolved automatically. :param source_file_regexes: a set of regexes that match source files belonging to this module. These regexes are applied by attempting to match at the beginning of the filename strings. :param build_profile_flags: A set of profile flags that should be passed to Maven or SBT in order to build and test this module (e.g. '-PprofileName'). :param sbt_test_goals: A set of SBT test goals for testing this module. :param should_run_python_tests: If true, changes in this module will trigger Python tests. For now, this has the effect of causing _all_ Python tests to be run, although in the future this should be changed to run only a subset of the Python tests that depend on this module. :param should_run_r_tests: If true, changes in this module will trigger all R tests. """ self.name = name self.dependencies = dependencies self.source_file_prefixes = source_file_regexes self.sbt_test_goals = sbt_test_goals self.build_profile_flags = build_profile_flags self.should_run_python_tests = should_run_python_tests self.should_run_r_tests = should_run_r_tests self.dependent_modules = set() for dep in dependencies: dep.dependent_modules.add(self) all_modules.append(self) def contains_file(self, filename): return any(re.match(p, filename) for p in self.source_file_prefixes) sql = Module( name="sql", dependencies=[], source_file_regexes=[ "sql/(?!hive-thriftserver)", "bin/spark-sql", ], build_profile_flags=[ "-Phive", ], sbt_test_goals=[ "catalyst/test", "sql/test", "hive/test", ] ) hive_thriftserver = Module( name="hive-thriftserver", dependencies=[sql], source_file_regexes=[ "sql/hive-thriftserver", "sbin/start-thriftserver.sh", ], build_profile_flags=[ "-Phive-thriftserver", ], sbt_test_goals=[ "hive-thriftserver/test", ] ) graphx = Module( name="graphx", dependencies=[], source_file_regexes=[ "graphx/", ], sbt_test_goals=[ "graphx/test" ] ) streaming = Module( name="streaming", dependencies=[], source_file_regexes=[ "streaming", ], sbt_test_goals=[ "streaming/test", ] ) streaming_kinesis_asl = Module( name="kinesis-asl", dependencies=[streaming], source_file_regexes=[ "extras/kinesis-asl/", ], build_profile_flags=[ "-Pkinesis-asl", ], sbt_test_goals=[ "kinesis-asl/test", ] ) streaming_zeromq = Module( name="streaming-zeromq", dependencies=[streaming], source_file_regexes=[ "external/zeromq", ], sbt_test_goals=[ "streaming-zeromq/test", ] ) streaming_twitter = Module( name="streaming-twitter", dependencies=[streaming], source_file_regexes=[ "external/twitter", ], sbt_test_goals=[ "streaming-twitter/test", ] ) streaming_mqtt = Module( name="streaming-mqtt", dependencies=[streaming], source_file_regexes=[ "external/mqtt", ], sbt_test_goals=[ "streaming-mqtt/test", ] ) streaming_kafka = Module( name="streaming-kafka", dependencies=[streaming], source_file_regexes=[ "external/kafka", "external/kafka-assembly", ], sbt_test_goals=[ "streaming-kafka/test", ] ) streaming_flume_sink = Module( name="streaming-flume-sink", dependencies=[streaming], source_file_regexes=[ "external/flume-sink", ], sbt_test_goals=[ "streaming-flume-sink/test", ] ) streaming_flume = Module( name="streaming_flume", dependencies=[streaming], source_file_regexes=[ "external/flume", ], sbt_test_goals=[ "streaming-flume/test", ] ) mllib = Module( name="mllib", dependencies=[streaming, sql], source_file_regexes=[ "data/mllib/", "mllib/", ], sbt_test_goals=[ "mllib/test", ] ) examples = Module( name="examples", dependencies=[graphx, mllib, streaming, sql], source_file_regexes=[ "examples/", ], sbt_test_goals=[ "examples/test", ] ) pyspark = Module( name="pyspark", dependencies=[mllib, streaming, streaming_kafka, sql], source_file_regexes=[ "python/" ], should_run_python_tests=True ) sparkr = Module( name="sparkr", dependencies=[sql, mllib], source_file_regexes=[ "R/", ], should_run_r_tests=True ) docs = Module( name="docs", dependencies=[], source_file_regexes=[ "docs/", ] ) ec2 = Module( name="ec2", dependencies=[], source_file_regexes=[ "ec2/", ] ) # The root module is a dummy module which is used to run all of the tests. # No other modules should directly depend on this module. root = Module( name="root", dependencies=[], source_file_regexes=[], # In order to run all of the tests, enable every test profile: build_profile_flags= list(set(itertools.chain.from_iterable(m.build_profile_flags for m in all_modules))), sbt_test_goals=[ "test", ], should_run_python_tests=True, should_run_r_tests=True ) def determine_modules_for_files(filenames): """ Given a list of filenames, return the set of modules that contain those files. If a file is not associated with a more specific submodule, then this method will consider that file to belong to the 'root' module. >>> sorted(x.name for x in determine_modules_for_files(["python/pyspark/a.py", "sql/test/foo"])) ['pyspark', 'sql'] >>> [x.name for x in determine_modules_for_files(["file_not_matched_by_any_subproject"])] ['root'] """ changed_modules = set() for filename in filenames: matched_at_least_one_module = False for module in all_modules: if module.contains_file(filename): changed_modules.add(module) matched_at_least_one_module = True if not matched_at_least_one_module: changed_modules.add(root) return changed_modules def identify_changed_files_from_git_commits(patch_sha, target_branch=None, target_ref=None): """ Given a git commit and target ref, use the set of files changed in the diff in order to determine which modules' tests should be run. >>> [x.name for x in determine_modules_for_files( \ identify_changed_files_from_git_commits("fc0a1475ef", target_ref="5da21f07"))] ['graphx'] >>> 'root' in [x.name for x in determine_modules_for_files( \ identify_changed_files_from_git_commits("50a0496a43", target_ref="6765ef9"))] True """ if target_branch is None and target_ref is None: raise AttributeError("must specify either target_branch or target_ref") elif target_branch is not None and target_ref is not None: raise AttributeError("must specify either target_branch or target_ref, not both") if target_branch is not None: diff_target = target_branch run_cmd(['git', 'fetch', 'origin', str(target_branch+':'+target_branch)]) else: diff_target = target_ref raw_output = subprocess.check_output(['git', 'diff', '--name-only', patch_sha, diff_target]) # Remove any empty strings return [f for f in raw_output.split('\n') if f] def determine_modules_to_test(changed_modules): """ Given a set of modules that have changed, compute the transitive closure of those modules' dependent modules in order to determine the set of modules that should be tested. >>> sorted(x.name for x in determine_modules_to_test([root])) ['root'] >>> sorted(x.name for x in determine_modules_to_test([graphx])) ['examples', 'graphx'] >>> sorted(x.name for x in determine_modules_to_test([sql])) ['examples', 'hive-thriftserver', 'mllib', 'pyspark', 'sparkr', 'sql'] """ # If we're going to have to run all of the tests, then we can just short-circuit # and return 'root'. No module depends on root, so if it appears then it will be # in changed_modules. if root in changed_modules: return [root] modules_to_test = set() for module in changed_modules: modules_to_test = modules_to_test.union(determine_modules_to_test(module.dependent_modules)) return modules_to_test.union(set(changed_modules)) # ------------------------------------------------------------------------------------------------- # Functions for working with subprocesses and shell tools # ------------------------------------------------------------------------------------------------- def get_error_codes(err_code_file): """Function to retrieve all block numbers from the `run-tests-codes.sh` file to maintain backwards compatibility with the `run-tests-jenkins` script""" with open(err_code_file, 'r') as f: err_codes = [e.split()[1].strip().split('=') for e in f if e.startswith("readonly")] return dict(err_codes) ERROR_CODES = get_error_codes(os.path.join(SPARK_HOME, "dev/run-tests-codes.sh")) def exit_from_command_with_retcode(cmd, retcode): print "[error] running", ' '.join(cmd), "; received return code", retcode sys.exit(int(os.environ.get("CURRENT_BLOCK", 255))) def rm_r(path): """Given an arbitrary path properly remove it with the correct python construct if it exists - from: http://stackoverflow.com/a/9559881""" if os.path.isdir(path): shutil.rmtree(path) elif os.path.exists(path): os.remove(path) def run_cmd(cmd): """Given a command as a list of arguments will attempt to execute the command from the determined SPARK_HOME directory and, on failure, print an error message""" if not isinstance(cmd, list): cmd = cmd.split() try: subprocess.check_call(cmd) except subprocess.CalledProcessError as e: exit_from_command_with_retcode(e.cmd, e.returncode) def is_exe(path): """Check if a given path is an executable file - from: http://stackoverflow.com/a/377028""" return os.path.isfile(path) and os.access(path, os.X_OK) def which(program): """Find and return the given program by its absolute path or 'None' - from: http://stackoverflow.com/a/377028""" fpath = os.path.split(program)[0] if fpath: if is_exe(program): return program else: for path in os.environ.get("PATH").split(os.pathsep): path = path.strip('"') exe_file = os.path.join(path, program) if is_exe(exe_file): return exe_file return None def determine_java_executable(): """Will return the path of the java executable that will be used by Spark's tests or `None`""" # Any changes in the way that Spark's build detects java must be reflected # here. Currently the build looks for $JAVA_HOME/bin/java then falls back to # the `java` executable on the path java_home = os.environ.get("JAVA_HOME") # check if there is an executable at $JAVA_HOME/bin/java java_exe = which(os.path.join(java_home, "bin", "java")) if java_home else None # if the java_exe wasn't set, check for a `java` version on the $PATH return java_exe if java_exe else which("java") JavaVersion = namedtuple('JavaVersion', ['major', 'minor', 'patch', 'update']) def determine_java_version(java_exe): """Given a valid java executable will return its version in named tuple format with accessors '.major', '.minor', '.patch', '.update'""" raw_output = subprocess.check_output([java_exe, "-version"], stderr=subprocess.STDOUT) raw_output_lines = raw_output.split('\n') # find raw version string, eg 'java version "1.8.0_25"' raw_version_str = next(x for x in raw_output_lines if " version " in x) version_str = raw_version_str.split()[-1].strip('"') # eg '1.8.0_25' version, update = version_str.split('_') # eg ['1.8.0', '25'] # map over the values and convert them to integers version_info = [int(x) for x in version.split('.') + [update]] return JavaVersion(major=version_info[0], minor=version_info[1], patch=version_info[2], update=version_info[3]) # ------------------------------------------------------------------------------------------------- # Functions for running the other build and test scripts # ------------------------------------------------------------------------------------------------- def set_title_and_block(title, err_block): os.environ["CURRENT_BLOCK"] = ERROR_CODES[err_block] line_str = '=' * 72 print print line_str print title print line_str def run_apache_rat_checks(): set_title_and_block("Running Apache RAT checks", "BLOCK_RAT") run_cmd([os.path.join(SPARK_HOME, "dev", "check-license")]) def run_scala_style_checks(): set_title_and_block("Running Scala style checks", "BLOCK_SCALA_STYLE") run_cmd([os.path.join(SPARK_HOME, "dev", "lint-scala")]) def run_python_style_checks(): set_title_and_block("Running Python style checks", "BLOCK_PYTHON_STYLE") run_cmd([os.path.join(SPARK_HOME, "dev", "lint-python")]) def build_spark_documentation(): set_title_and_block("Building Spark Documentation", "BLOCK_DOCUMENTATION") os.environ["PRODUCTION"] = "1 jekyll build" os.chdir(os.path.join(SPARK_HOME, "docs")) jekyll_bin = which("jekyll") if not jekyll_bin: print "[error] Cannot find a version of `jekyll` on the system; please", print "install one and retry to build documentation." sys.exit(int(os.environ.get("CURRENT_BLOCK", 255))) else: run_cmd([jekyll_bin, "build"]) os.chdir(SPARK_HOME) def exec_maven(mvn_args=()): """Will call Maven in the current directory with the list of mvn_args passed in and returns the subprocess for any further processing""" run_cmd([os.path.join(SPARK_HOME, "build", "mvn")] + mvn_args) def exec_sbt(sbt_args=()): """Will call SBT in the current directory with the list of mvn_args passed in and returns the subprocess for any further processing""" sbt_cmd = [os.path.join(SPARK_HOME, "build", "sbt")] + sbt_args sbt_output_filter = re.compile("^.*[info].*Resolving" + "|" + "^.*[warn].*Merging" + "|" + "^.*[info].*Including") # NOTE: echo "q" is needed because sbt on encountering a build file # with failure (either resolution or compilation) prompts the user for # input either q, r, etc to quit or retry. This echo is there to make it # not block. echo_proc = subprocess.Popen(["echo", "\"q\n\""], stdout=subprocess.PIPE) sbt_proc = subprocess.Popen(sbt_cmd, stdin=echo_proc.stdout, stdout=subprocess.PIPE) echo_proc.wait() for line in iter(sbt_proc.stdout.readline, ''): if not sbt_output_filter.match(line): print line, retcode = sbt_proc.wait() if retcode > 0: exit_from_command_with_retcode(sbt_cmd, retcode) def get_hadoop_profiles(hadoop_version): """ For the given Hadoop version tag, return a list of SBT profile flags for building and testing against that Hadoop version. """ sbt_maven_hadoop_profiles = { "hadoop1.0": ["-Phadoop-1", "-Dhadoop.version=1.0.4"], "hadoop2.0": ["-Phadoop-1", "-Dhadoop.version=2.0.0-mr1-cdh4.1.1"], "hadoop2.2": ["-Pyarn", "-Phadoop-2.2"], "hadoop2.3": ["-Pyarn", "-Phadoop-2.3", "-Dhadoop.version=2.3.0"], } if hadoop_version in sbt_maven_hadoop_profiles: return sbt_maven_hadoop_profiles[hadoop_version] else: print "[error] Could not find", hadoop_version, "in the list. Valid options", print "are", sbt_maven_hadoop_profiles.keys() sys.exit(int(os.environ.get("CURRENT_BLOCK", 255))) def build_spark_maven(hadoop_version): # Enable all of the profiles for the build: build_profiles = get_hadoop_profiles(hadoop_version) + root.build_profile_flags mvn_goals = ["clean", "package", "-DskipTests"] profiles_and_goals = build_profiles + mvn_goals print "[info] Building Spark (w/Hive 0.13.1) using Maven with these arguments:", print " ".join(profiles_and_goals) exec_maven(profiles_and_goals) def build_spark_sbt(hadoop_version): # Enable all of the profiles for the build: build_profiles = get_hadoop_profiles(hadoop_version) + root.build_profile_flags sbt_goals = ["package", "assembly/assembly", "streaming-kafka-assembly/assembly"] profiles_and_goals = build_profiles + sbt_goals print "[info] Building Spark (w/Hive 0.13.1) using SBT with these arguments:", print " ".join(profiles_and_goals) exec_sbt(profiles_and_goals) def build_apache_spark(build_tool, hadoop_version): """Will build Spark against Hive v0.13.1 given the passed in build tool (either `sbt` or `maven`). Defaults to using `sbt`.""" set_title_and_block("Building Spark", "BLOCK_BUILD") rm_r("lib_managed") if build_tool == "maven": build_spark_maven(hadoop_version) else: build_spark_sbt(hadoop_version) def detect_binary_inop_with_mima(): set_title_and_block("Detecting binary incompatibilities with MiMa", "BLOCK_MIMA") run_cmd([os.path.join(SPARK_HOME, "dev", "mima")]) def run_scala_tests_maven(test_profiles): mvn_test_goals = ["test", "--fail-at-end"] profiles_and_goals = test_profiles + mvn_test_goals print "[info] Running Spark tests using Maven with these arguments:", print " ".join(profiles_and_goals) exec_maven(profiles_and_goals) def run_scala_tests_sbt(test_modules, test_profiles): sbt_test_goals = set(itertools.chain.from_iterable(m.sbt_test_goals for m in test_modules)) if not sbt_test_goals: return profiles_and_goals = test_profiles + list(sbt_test_goals) print "[info] Running Spark tests using SBT with these arguments:", print " ".join(profiles_and_goals) exec_sbt(profiles_and_goals) def run_scala_tests(build_tool, hadoop_version, test_modules): """Function to properly execute all tests passed in as a set from the `determine_test_suites` function""" set_title_and_block("Running Spark unit tests", "BLOCK_SPARK_UNIT_TESTS") test_modules = set(test_modules) test_profiles = get_hadoop_profiles(hadoop_version) + \ list(set(itertools.chain.from_iterable(m.build_profile_flags for m in test_modules))) if build_tool == "maven": run_scala_tests_maven(test_profiles) else: run_scala_tests_sbt(test_modules, test_profiles) def run_python_tests(): set_title_and_block("Running PySpark tests", "BLOCK_PYSPARK_UNIT_TESTS") run_cmd([os.path.join(SPARK_HOME, "python", "run-tests")]) def run_sparkr_tests(): set_title_and_block("Running SparkR tests", "BLOCK_SPARKR_UNIT_TESTS") if which("R"): run_cmd([os.path.join(SPARK_HOME, "R", "install-dev.sh")]) run_cmd([os.path.join(SPARK_HOME, "R", "run-tests.sh")]) else: print "Ignoring SparkR tests as R was not found in PATH" def main(): # Ensure the user home directory (HOME) is valid and is an absolute directory if not USER_HOME or not os.path.isabs(USER_HOME): print "[error] Cannot determine your home directory as an absolute path;", print "ensure the $HOME environment variable is set properly." sys.exit(1) os.chdir(SPARK_HOME) rm_r(os.path.join(SPARK_HOME, "work")) rm_r(os.path.join(USER_HOME, ".ivy2", "local", "org.apache.spark")) rm_r(os.path.join(USER_HOME, ".ivy2", "cache", "org.apache.spark")) os.environ["CURRENT_BLOCK"] = ERROR_CODES["BLOCK_GENERAL"] java_exe = determine_java_executable() if not java_exe: print "[error] Cannot find a version of `java` on the system; please", print "install one and retry." sys.exit(2) java_version = determine_java_version(java_exe) if java_version.minor < 8: print "[warn] Java 8 tests will not run because JDK version is < 1.8." if os.environ.get("AMPLAB_JENKINS"): # if we're on the Amplab Jenkins build servers setup variables # to reflect the environment settings build_tool = os.environ.get("AMPLAB_JENKINS_BUILD_TOOL", "sbt") hadoop_version = os.environ.get("AMPLAB_JENKINS_BUILD_PROFILE", "hadoop2.3") test_env = "amplab_jenkins" # add path for Python3 in Jenkins if we're calling from a Jenkins machine os.environ["PATH"] = "/home/anaconda/envs/py3k/bin:" + os.environ.get("PATH") else: # else we're running locally and can use local settings build_tool = "sbt" hadoop_version = "hadoop2.3" test_env = "local" print "[info] Using build tool", build_tool, "with Hadoop profile", hadoop_version, print "under environment", test_env changed_modules = None changed_files = None if test_env == "amplab_jenkins" and os.environ.get("AMP_JENKINS_PRB"): target_branch = os.environ["ghprbTargetBranch"] changed_files = identify_changed_files_from_git_commits("HEAD", target_branch=target_branch) changed_modules = determine_modules_for_files(changed_files) if not changed_modules: changed_modules = [root] print "[info] Found the following changed modules:", ", ".join(x.name for x in changed_modules) test_modules = determine_modules_to_test(changed_modules) # license checks run_apache_rat_checks() # style checks if not changed_files or any(f.endswith(".scala") for f in changed_files): run_scala_style_checks() if not changed_files or any(f.endswith(".py") for f in changed_files): run_python_style_checks() # determine if docs were changed and if we're inside the amplab environment # note - the below commented out until *all* Jenkins workers can get `jekyll` installed # if "DOCS" in changed_modules and test_env == "amplab_jenkins": # build_spark_documentation() # spark build build_apache_spark(build_tool, hadoop_version) # backwards compatibility checks detect_binary_inop_with_mima() # run the test suites run_scala_tests(build_tool, hadoop_version, test_modules) if any(m.should_run_python_tests for m in test_modules): run_python_tests() if any(m.should_run_r_tests for m in test_modules): run_sparkr_tests() def _test(): import doctest failure_count = doctest.testmod()[0] if failure_count: exit(-1) if __name__ == "__main__": _test() main()