# -*- coding: utf-8 -*- # Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. # See https://llvm.org/LICENSE.txt for license information. # SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception """ This module is a collection of methods commonly used in this project. """ import collections import functools import json import logging import os import os.path import re import shlex import subprocess import sys ENVIRONMENT_KEY = "INTERCEPT_BUILD" Execution = collections.namedtuple("Execution", ["pid", "cwd", "cmd"]) CtuConfig = collections.namedtuple( "CtuConfig", ["collect", "analyze", "dir", "extdef_map_cmd"] ) def duplicate_check(method): """Predicate to detect duplicated entries. Unique hash method can be use to detect duplicates. Entries are represented as dictionaries, which has no default hash method. This implementation uses a set datatype to store the unique hash values. This method returns a method which can detect the duplicate values.""" def predicate(entry): entry_hash = predicate.unique(entry) if entry_hash not in predicate.state: predicate.state.add(entry_hash) return False return True predicate.unique = method predicate.state = set() return predicate def run_build(command, *args, **kwargs): """Run and report build command execution :param command: array of tokens :return: exit code of the process """ environment = kwargs.get("env", os.environ) logging.debug("run build %s, in environment: %s", command, environment) exit_code = subprocess.call(command, *args, **kwargs) logging.debug("build finished with exit code: %d", exit_code) return exit_code def run_command(command, cwd=None): """Run a given command and report the execution. :param command: array of tokens :param cwd: the working directory where the command will be executed :return: output of the command """ def decode_when_needed(result): """check_output returns bytes or string depend on python version""" return result.decode("utf-8") if isinstance(result, bytes) else result try: directory = os.path.abspath(cwd) if cwd else os.getcwd() logging.debug("exec command %s in %s", command, directory) output = subprocess.check_output( command, cwd=directory, stderr=subprocess.STDOUT ) return decode_when_needed(output).splitlines() except subprocess.CalledProcessError as ex: ex.output = decode_when_needed(ex.output).splitlines() raise ex def reconfigure_logging(verbose_level): """Reconfigure logging level and format based on the verbose flag. :param verbose_level: number of `-v` flags received by the command :return: no return value """ # Exit when nothing to do. if verbose_level == 0: return root = logging.getLogger() # Tune logging level. level = logging.WARNING - min(logging.WARNING, (10 * verbose_level)) root.setLevel(level) # Be verbose with messages. if verbose_level <= 3: fmt_string = "%(name)s: %(levelname)s: %(message)s" else: fmt_string = "%(name)s: %(levelname)s: %(funcName)s: %(message)s" handler = logging.StreamHandler(sys.stdout) handler.setFormatter(logging.Formatter(fmt=fmt_string)) root.handlers = [handler] def command_entry_point(function): """Decorator for command entry methods. The decorator initialize/shutdown logging and guard on programming errors (catch exceptions). The decorated method can have arbitrary parameters, the return value will be the exit code of the process.""" @functools.wraps(function) def wrapper(*args, **kwargs): """Do housekeeping tasks and execute the wrapped method.""" try: logging.basicConfig( format="%(name)s: %(message)s", level=logging.WARNING, stream=sys.stdout ) # This hack to get the executable name as %(name). logging.getLogger().name = os.path.basename(sys.argv[0]) return function(*args, **kwargs) except KeyboardInterrupt: logging.warning("Keyboard interrupt") return 130 # Signal received exit code for bash. except Exception: logging.exception("Internal error.") if logging.getLogger().isEnabledFor(logging.DEBUG): logging.error( "Please report this bug and attach the output " "to the bug report" ) else: logging.error( "Please run this command again and turn on " "verbose mode (add '-vvvv' as argument)." ) return 64 # Some non used exit code for internal errors. finally: logging.shutdown() return wrapper def compiler_wrapper(function): """Implements compiler wrapper base functionality. A compiler wrapper executes the real compiler, then implement some functionality, then returns with the real compiler exit code. :param function: the extra functionality what the wrapper want to do on top of the compiler call. If it throws exception, it will be caught and logged. :return: the exit code of the real compiler. The :param function: will receive the following arguments: :param result: the exit code of the compilation. :param execution: the command executed by the wrapper.""" def is_cxx_compiler(): """Find out was it a C++ compiler call. Compiler wrapper names contain the compiler type. C++ compiler wrappers ends with `c++`, but might have `.exe` extension on windows.""" wrapper_command = os.path.basename(sys.argv[0]) return re.match(r"(.+)c\+\+(.*)", wrapper_command) def run_compiler(executable): """Execute compilation with the real compiler.""" command = executable + sys.argv[1:] logging.debug("compilation: %s", command) result = subprocess.call(command) logging.debug("compilation exit code: %d", result) return result # Get relevant parameters from environment. parameters = json.loads(os.environ[ENVIRONMENT_KEY]) reconfigure_logging(parameters["verbose"]) # Execute the requested compilation. Do crash if anything goes wrong. cxx = is_cxx_compiler() compiler = parameters["cxx"] if cxx else parameters["cc"] result = run_compiler(compiler) # Call the wrapped method and ignore it's return value. try: call = Execution( pid=os.getpid(), cwd=os.getcwd(), cmd=["c++" if cxx else "cc"] + sys.argv[1:], ) function(result, call) except: logging.exception("Compiler wrapper failed complete.") finally: # Always return the real compiler exit code. return result def wrapper_environment(args): """Set up environment for interpose compiler wrapper.""" return { ENVIRONMENT_KEY: json.dumps( { "verbose": args.verbose, "cc": shlex.split(args.cc), "cxx": shlex.split(args.cxx), } ) }