diff --git a/conftest.py b/conftest.py index e1ef7311..c61de771 100644 --- a/conftest.py +++ b/conftest.py @@ -40,7 +40,6 @@ Finally the tests aren't 100% reliable as they rely on quite a bit of network traffic, it's possible that the tests fail due to network issues rather than logic errors. """ - from __future__ import annotations import base64 @@ -71,6 +70,7 @@ import typing import uuid import warnings import xmlrpc.client +from collections.abc import Iterator from contextlib import closing from dataclasses import dataclass from typing import Optional @@ -108,7 +108,7 @@ def pytest_addoption(parser): "The tunneling script should respond gracefully to SIGINT and " "SIGTERM.") -def is_manager(config): +def is_manager(config: pytest.Config) -> bool: return not hasattr(config, 'workerinput') def pytest_configure(config: pytest.Config) -> None: @@ -123,7 +123,67 @@ def pytest_configure(config: pytest.Config) -> None: "expect_log_errors(reason): allow and require tracebacks in the log", ) -def pytest_unconfigure(config): + if not config.getoption('--export-traces', None): + return + + from opentelemetry import trace + tracer = trace.get_tracer('mergebot-tests') + + # if the pytest-opentelemetry plugin is enabled hook otel into the test suite APIs + # region enable requests for github calls + from opentelemetry.instrumentation.requests import RequestsInstrumentor + RequestsInstrumentor().instrument() + # endregion + + # region hook opentelemetry into xmlrpc for Odoo RPC calls + from opentelemetry.propagate import inject + from opentelemetry.propagators.textmap import Setter + # the default setter assumes a dict, but xmlrpc uses headers lists + class ListSetter(Setter[list[tuple[str, str]]]): + def set(self, carrier: list[tuple[str, str]], key: str, value: str) -> None: + carrier.append((key, value)) + list_setter = ListSetter() + + wrapped_request = xmlrpc.client.Transport.request + @functools.wraps(wrapped_request) + def instrumented_request(self, host, handler, request_body, verbose=False): + m = re.search( + rb'([^<]+)', + request_body, + ) + if m[1] == b'authenticate': + # ignore these because we spam authenticate to know when the server + # is up (alternatively find a way to remove the span on auth error response) + return wrapped_request(self, host, handler, request_body, verbose) + if m[1] in (b'execute_kw', b'execute'): + # dumbshit OdooRPC call, actual path is the combination of args 4 (object) and 5 (method) + (_, _, _, objname, method, *_), _ = xmlrpc.client.loads( + request_body, + use_datetime=True, + use_builtin_types=True, + ) + span_name = f'{objname}.{method}()' + else: + span_name = m[1].decode() + + with tracer.start_as_current_span(span_name, kind=trace.SpanKind.CLIENT): + return wrapped_request(self, host, handler, request_body, verbose) + xmlrpc.client.Transport.request = instrumented_request + + # TODO: create a dedicated call span as the requests instrumentor does? + # + # This is more complicated though because the request gets the + # serialized body so we'd need to get the methodname back out of the + # `request_body`... otoh that's just `{name}` + wrapped_send_headers = xmlrpc.client.Transport.send_headers + @functools.wraps(wrapped_send_headers) + def instrumented_send_headers(self, connection: http.client.HTTPConnection, headers: list[tuple[str, str]]) -> None: + inject(headers, setter=list_setter) + wrapped_send_headers(self, connection, headers) + xmlrpc.client.Transport.send_headers = instrumented_send_headers + # endregion + +def pytest_unconfigure(config: pytest.Config) -> None: if not is_manager(config): return @@ -218,11 +278,11 @@ def setreviewers(partners): return _ @pytest.fixture -def users(partners, rolemap): +def users(partners, rolemap) -> dict[str, str]: return {k: v['login'] for k, v in rolemap.items()} @pytest.fixture(scope='session') -def tunnel(pytestconfig: pytest.Config, port: int): +def tunnel(pytestconfig: pytest.Config, port: int) -> Iterator[str]: """ Creates a tunnel to localhost:, should yield the publicly routable address & terminate the process at the end of the session """ @@ -289,7 +349,11 @@ class DbDict(dict): return db @pytest.fixture(scope='session') -def dbcache(request, tmp_path_factory, addons_path): +def dbcache( + request: pytest.FixtureRequest, + tmp_path_factory: pytest.TempPathFactory, + addons_path: str, +) -> Iterator[DbDict]: """ Creates template DB once per run, then just duplicates it before starting odoo and running the testcase """ @@ -303,14 +367,20 @@ def dbcache(request, tmp_path_factory, addons_path): yield dbs @pytest.fixture -def db(request, module, dbcache, tmpdir): +def db( + request: pytest.FixtureRequest, + module: str, + dbcache: DbDict, + tmp_path: pathlib.Path, +) -> Iterator[str]: template_db = dbcache[module] rundb = str(uuid.uuid4()) subprocess.run(['createdb', '-T', template_db, rundb], check=True) - share = tmpdir.mkdir('share') + share = tmp_path.joinpath('share') + share.mkdir() shutil.copytree( - str(dbcache._shared_dir / f'shared-{module}'), - str(share), + dbcache._shared_dir / f'shared-{module}', + share, dirs_exist_ok=True, ) (share / 'Odoo' / 'filestore' / template_db).rename( @@ -324,7 +394,7 @@ def db(request, module, dbcache, tmpdir): def wait_for_hook(): time.sleep(WEBHOOK_WAIT_TIME) -def wait_for_server(db, port, proc, mod, timeout=120): +def wait_for_server(db: str, port: int, proc: subprocess.Popen, mod: str, timeout: int = 120) -> None: """ Polls for server to be response & have installed our module. Raises socket.timeout on failure @@ -369,84 +439,38 @@ def page(port): yield get @pytest.fixture(scope='session') -def dummy_addons_path(): +def dummy_addons_path(pytestconfig: pytest.Config) -> Iterator[str]: with tempfile.TemporaryDirectory() as dummy_addons_path: - mod = pathlib.Path(dummy_addons_path, 'saas_worker') - mod.mkdir(0o700) - (mod / '__init__.py').write_text('''\ -import builtins -import logging -import threading - -import psycopg2 - -import odoo -from odoo import api, fields, models - -_logger = logging.getLogger(__name__) - - -class Base(models.AbstractModel): - _inherit = 'base' - - def run_crons(self): - builtins.current_date = self.env.context.get('current_date') - builtins.forwardport_merged_before = self.env.context.get('forwardport_merged_before') - self.env['ir.cron']._process_jobs(self.env.cr.dbname) - del builtins.forwardport_merged_before - return True - - -class IrCron(models.Model): - _inherit = 'ir.cron' - - @classmethod - def _process_jobs(cls, db_name): - t = threading.current_thread() - try: - db = odoo.sql_db.db_connect(db_name) - t.dbname = db_name - with db.cursor() as cron_cr: - # FIXME: override `_get_all_ready_jobs` to directly lock the cron? - while jobs := next(( - job - for j in cls._get_all_ready_jobs(cron_cr) - if (job := cls._acquire_one_job(cron_cr, (j['id'],))) - ), None): - # take into account overridings of _process_job() on that database - registry = odoo.registry(db_name) - registry[cls._name]._process_job(db, cron_cr, job) - cron_cr.commit() - - except psycopg2.ProgrammingError as e: - raise - except Exception: - _logger.warning('Exception in cron:', exc_info=True) - finally: - if hasattr(t, 'dbname'): - del t.dbname -''', encoding='utf-8') - (mod / '__manifest__.py').write_text(pprint.pformat({ - 'name': 'dummy saas_worker', - 'version': '1.0', - 'license': 'BSD-0-Clause', - }), encoding='utf-8') - (mod / 'util.py').write_text("""\ -def from_role(*_, **__): - return lambda fn: fn -""", encoding='utf-8') + shutil.copytree( + pathlib.Path(__file__).parent / 'mergebot_test_utils/saas_worker', + pathlib.Path(dummy_addons_path, 'saas_worker'), + ) + shutil.copytree( + pathlib.Path(__file__).parent / 'mergebot_test_utils/saas_tracing', + pathlib.Path(dummy_addons_path, 'saas_tracing'), + ) yield dummy_addons_path @pytest.fixture(scope='session') -def addons_path(request, dummy_addons_path): +def addons_path( + request: pytest.FixtureRequest, + dummy_addons_path: str, +) -> str: return ','.join(map(str, filter(None, [ request.config.getoption('--addons-path'), dummy_addons_path, ]))) @pytest.fixture -def server(request, db, port, module, addons_path, tmpdir): +def server( + request: pytest.FixtureRequest, + db: str, + port: int, + module: str, + addons_path: str, + tmp_path: pathlib.Path, +) -> Iterator[tuple[subprocess.Popen, bytearray]]: log_handlers = [ 'odoo.modules.loading:WARNING', 'py.warnings:ERROR', @@ -482,20 +506,33 @@ def server(request, db, port, module, addons_path, tmpdir): buf.extend(r) os.close(inpt) + CACHEDIR = tmp_path / 'cache' + CACHEDIR.mkdir() + subenv = { + **os.environ, + # stop putting garbage in the user dirs, and potentially creating conflicts + # TODO: way to override this with macOS? + 'XDG_DATA_HOME': str(tmp_path / 'share'), + 'XDG_CACHE_HOME': str(CACHEDIR), + } + serverwide = 'base,web' + if request.config.getoption('--export-traces', None): + serverwide = 'base,web,saas_tracing' + # Inject OTEL context into subprocess env, so it can be extracted by + # the server and the server setup (registry preload) is correctly nested + # inside the test setup. + from opentelemetry.propagate import inject + inject(subenv) + p = subprocess.Popen([ *cov, 'odoo', '--http-port', str(port), '--addons-path', addons_path, '-d', db, + '--load', serverwide, '--max-cron-threads', '0', # disable cron threads (we're running crons by hand) *itertools.chain.from_iterable(('--log-handler', h) for h in log_handlers), - ], stderr=w, env={ - **os.environ, - # stop putting garbage in the user dirs, and potentially creating conflicts - # TODO: way to override this with macOS? - 'XDG_DATA_HOME': str(tmpdir / 'share'), - 'XDG_CACHE_HOME': str(tmpdir.mkdir('cache')), - }) + ], stderr=w, env=subenv) os.close(w) # start the reader thread here so `_move` can read `p` without needing # additional handholding diff --git a/mergebot_test_utils/saas_tracing/__init__.py b/mergebot_test_utils/saas_tracing/__init__.py new file mode 100644 index 00000000..f97ef352 --- /dev/null +++ b/mergebot_test_utils/saas_tracing/__init__.py @@ -0,0 +1,123 @@ +"""OpenTelemetry instrumentation + +- tries to set up the exporting of traces, metrics, and logs based on otel + semantics +- automatically instruments psycopg2 and requests to trace outbound requests +- instruments db preload, server run to create new traces (or inbound from env) +- instruments WSGI, RPC for inbound traces + +.. todo:: instrument crons +.. todo:: instrument subprocess (?) +""" + +import functools +import json +import os + +from opentelemetry import trace, distro +from opentelemetry.environment_variables import OTEL_LOGS_EXPORTER, OTEL_METRICS_EXPORTER, OTEL_TRACES_EXPORTER +from opentelemetry.instrumentation import psycopg2, requests, wsgi +from opentelemetry.propagate import extract +from opentelemetry.sdk.resources import SERVICE_NAME, SERVICE_VERSION + +import odoo.release +import odoo.service + +tracer = trace.get_tracer('odoo') + + +distro.OpenTelemetryDistro().configure() +configurator = distro.OpenTelemetryConfigurator() + +conf = { + "resource_attributes": { + SERVICE_NAME: odoo.release.product_name, + SERVICE_VERSION: odoo.release.major_version, + }, + # OpenTelemetryDistro only set up trace and metrics, and not well (uses setenv) + "trace_exporter_names": [e] if (e := os.environ.get(OTEL_LOGS_EXPORTER, "otlp")) else [], + "metric_exporter_names": [e] if (e := os.environ.get(OTEL_METRICS_EXPORTER, "otlp")) else [], + "log_exporter_names": [e] if (e := os.environ.get(OTEL_TRACES_EXPORTER, "otlp")) else [], +} +# open-telemetry/opentelemetry-pythhon#4340 changed the name (and some semantics) +# of the parameters so need to try new and fall back to old +try: + configurator.configure(setup_logging_handler=True, **conf) +except TypeError: + configurator.configure(logging_enabled=True, **conf) + +# Breaks server instrumentation when enabled: threads inherit the init context +# instead of creating a per-request / per-job trace, if we want to propagate +# tracing through to one-short workers it should be done by hand (using +# extract/inject) +# ThreadingInstrumentor().instrument() +# adds otel trace/span information to the logrecord which is not what we care about +# LoggingInstrumentor().instrument() + +psycopg2.Psycopg2Instrumentor().instrument( + # instrumenter checks for psycopg2, but dev machines may have + # psycopg2-binary, if not present odoo literally can't run so no need to + # check + skip_dep_check=True, +) +requests.RequestsInstrumentor().instrument() + +# FIXME: blacklist /xmlrpc here so it's not duplicated by the instrumentation +# of execute below, the middleware currently does not support blacklisting +# (open-telemetry/opentelemetry-python-contrib#2369) +# +# FIXME: +# - servers which mount `odoo.http.root` (before patched?) +# - `lazy_property.reset_all(odoo.http.root)` +# - `patch('odoo.http.root.get_db_router', ...)` +odoo.http.root = wsgi.OpenTelemetryMiddleware(odoo.http.root) +# because there's code which accesses attributes on `odoo.http.root` +wsgi.OpenTelemetryMiddleware.__getattr__ = lambda self, attr: getattr(self.wsgi, attr) + +def wraps(obj: object, attr: str): + """ Wraps the callable ``attr`` of ``obj`` in the decorated callable + in-place (so patches ``obj``). + + The wrappee is passed as first argument to the wrapper. + """ + def decorator(fn): + wrapped = getattr(obj, attr) + @functools.wraps(wrapped) + def wrapper(*args, **kw): + return fn(wrapped, *args, **kw) + setattr(obj, attr, wrapper) + return decorator + +# instrument / span the method call side of RPC calls so we don't just have an +# opaque "POST /xmlrpc/2" +@wraps(odoo.service.model, 'execute') +def instrumented_execute(wrapped_execute, db, uid, obj, method, *args, **kw): + with tracer.start_as_current_span( f"{obj}.{method}", attributes={ + "db": db, + "uid": uid, + # while attributes can be sequences they can't be map, or nested + "args": json.dumps(args, default=str), + "kwargs": json.dumps(kw, default=str), + }): + return wrapped_execute(db, uid, obj, method, *args, **kw) + +# Instrument the server & preload so we know / can trace what happens during +# init. Server instrumentation performs context extraction from environment +# (preload is just nested inside that). + +@wraps(odoo.service.server.ThreadedServer, "run") +def instrumented_threaded_start(wrapped_threaded_run, self, preload=None, stop=None): + with tracer.start_as_current_span("server.threaded.run", context=extract(os.environ)): + return wrapped_threaded_run(self, preload=preload, stop=stop) + +@wraps(odoo.service.server.PreforkServer, "run") +def instrumented_prefork_run(wrapped_prefork_run, self, preload=None, stop=None): + with tracer.start_as_current_span("server.prefork.run", context=extract(os.environ)): + return wrapped_prefork_run(self, preload=preload, stop=stop) + +@wraps(odoo.service.server, "preload_registries") +def instrumented_preload(wrapped_preload, dbnames): + with tracer.start_as_current_span("preload", attributes={ + "dbnames": dbnames, + }): + return wrapped_preload(dbnames) diff --git a/mergebot_test_utils/saas_tracing/__manifest__.py b/mergebot_test_utils/saas_tracing/__manifest__.py new file mode 100644 index 00000000..7a8feb6b --- /dev/null +++ b/mergebot_test_utils/saas_tracing/__manifest__.py @@ -0,0 +1,16 @@ +{ + "name": "OpenTelemetry instrumentation for Odoo", + 'version': '1.0', + 'license': 'BSD-0-Clause', + "category": "Hidden", + 'external_dependencies': { + 'python': [ + 'opentelemetry', + 'opentelemetry-instrumentation-logging', + 'opentelemetry-instrumentation-psycopg2', + 'opentelemetry-instrumentation-requests', + 'opentelemetry-instrumentation-wsgi', + 'opentelemetry-container-distro', + ] + }, +} diff --git a/mergebot_test_utils/saas_worker/__init__.py b/mergebot_test_utils/saas_worker/__init__.py new file mode 100644 index 00000000..4f4ee22f --- /dev/null +++ b/mergebot_test_utils/saas_worker/__init__.py @@ -0,0 +1,51 @@ +import builtins +import logging +import threading + +import psycopg2 + +import odoo +from odoo import models + +_logger = logging.getLogger(__name__) + + +class Base(models.AbstractModel): + _inherit = 'base' + + def run_crons(self): + builtins.current_date = self.env.context.get('current_date') + builtins.forwardport_merged_before = self.env.context.get('forwardport_merged_before') + self.env['ir.cron']._process_jobs(self.env.cr.dbname) + del builtins.forwardport_merged_before + return True + + +class IrCron(models.Model): + _inherit = 'ir.cron' + + @classmethod + def _process_jobs(cls, db_name): + t = threading.current_thread() + try: + db = odoo.sql_db.db_connect(db_name) + t.dbname = db_name + with db.cursor() as cron_cr: + # FIXME: override `_get_all_ready_jobs` to directly lock the cron? + while job := next(( + job + for j in cls._get_all_ready_jobs(cron_cr) + if (job := cls._acquire_one_job(cron_cr, (j['id'],))) + ), None): + # take into account overridings of _process_job() on that database + registry = odoo.registry(db_name) + registry[cls._name]._process_job(db, cron_cr, job) + cron_cr.commit() + + except psycopg2.ProgrammingError as e: + raise + except Exception: + _logger.warning('Exception in cron:', exc_info=True) + finally: + if hasattr(t, 'dbname'): + del t.dbname diff --git a/mergebot_test_utils/saas_worker/__manifest__.py b/mergebot_test_utils/saas_worker/__manifest__.py new file mode 100644 index 00000000..95f49d66 --- /dev/null +++ b/mergebot_test_utils/saas_worker/__manifest__.py @@ -0,0 +1,5 @@ +{ + 'name': 'dummy saas_worker', + 'version': '1.0', + 'license': 'BSD-0-Clause', +} diff --git a/mergebot_test_utils/saas_worker/util.py b/mergebot_test_utils/saas_worker/util.py new file mode 100644 index 00000000..896e33ba --- /dev/null +++ b/mergebot_test_utils/saas_worker/util.py @@ -0,0 +1,2 @@ +def from_role(*_, **__): + return lambda fn: fn diff --git a/runbot_merge/git.py b/runbot_merge/git.py index 36c5a1ab..d2f9e92e 100644 --- a/runbot_merge/git.py +++ b/runbot_merge/git.py @@ -7,7 +7,7 @@ import resource import stat import subprocess from operator import methodcaller -from typing import Optional, TypeVar, Union, Sequence, Tuple, Dict +from typing import Optional, TypeVar, Union, Sequence, Tuple, Dict, Iterator from collections.abc import Iterable, Mapping, Callable from odoo.tools.appdirs import user_cache_dir @@ -15,6 +15,23 @@ from .github import MergeError, PrCommit _logger = logging.getLogger(__name__) +try: + from opentelemetry import trace + from opentelemetry.propagate import inject + tracer = trace.get_tracer(__name__) + + def git_tracing_params() -> Iterator[str]: + tracing = {} + inject(tracing) + return itertools.chain.from_iterable( + ('-c', f'http.extraHeader={k}:{v}') + for k, v in tracing.items() + ) +except ImportError: + trace = tracer = inject = None + def git_tracing_params() -> Iterator[str]: + return iter(()) + def source_url(repository) -> str: return 'https://{}@github.com/{}'.format( repository.project_id.github_token, @@ -39,7 +56,10 @@ def get_local(repository, *, clone: bool = True) -> 'Optional[Repo]': return git(repo_dir) elif clone: _logger.info("Cloning out %s to %s", repository.name, repo_dir) - subprocess.run(['git', 'clone', '--bare', source_url(repository), str(repo_dir)], check=True) + subprocess.run([ + 'git', *git_tracing_params(), 'clone', '--bare', + source_url(repository), str(repo_dir) + ], check=True) # bare repos don't have fetch specs by default, and fetching *into* # them is a pain in the ass, configure fetch specs so `git fetch` # works properly @@ -72,7 +92,7 @@ def git(directory: str) -> 'Repo': Self = TypeVar("Self", bound="Repo") class Repo: - def __init__(self, directory, **config) -> None: + def __init__(self, directory: str, **config: object) -> None: self._directory = str(directory) config.setdefault('stderr', subprocess.PIPE) self._config = config @@ -84,9 +104,12 @@ class Repo: def _run(self, *args, **kwargs) -> subprocess.CompletedProcess: opts = {**self._config, **kwargs} - args = ('git', '-C', self._directory)\ - + tuple(itertools.chain.from_iterable(('-c', p) for p in self._params + ALWAYS))\ - + args + args = tuple(itertools.chain( + ('git', '-C', self._directory), + itertools.chain.from_iterable(('-c', p) for p in self._params + ALWAYS), + git_tracing_params(), + args, + )) try: return self.runner(args, preexec_fn=_bypass_limits, **opts) except subprocess.CalledProcessError as e: @@ -305,14 +328,21 @@ def check(p: subprocess.CompletedProcess) -> subprocess.CompletedProcess: _logger.info("rebase failed at %s\nstdout:\n%s\nstderr:\n%s", p.args, p.stdout, p.stderr) raise MergeError(p.stderr or 'merge conflict') - @dataclasses.dataclass class GitCommand: repo: Repo name: str - def __call__(self, *args, **kwargs) -> subprocess.CompletedProcess: - return self.repo._run(self.name, *args, *self._to_options(kwargs)) + if tracer: + def __call__(self, *args, **kwargs) -> subprocess.CompletedProcess: + with tracer.start_as_current_span(f"git.{self.name}", attributes={ + "http.target": None, + "http.user_agent": "git", + }): + return self.repo._run(self.name, *args, *self._to_options(kwargs)) + else: + def __call__(self, *args, **kwargs) -> subprocess.CompletedProcess: + return self.repo._run(self.name, *args, *self._to_options(kwargs)) def _to_options(self, d): for k, v in d.items():