diff --git a/odoo/service/server.py b/odoo/service/server.py index a31bd68e7..5c7a1e057 100644 --- a/odoo/service/server.py +++ b/odoo/service/server.py @@ -1,6 +1,6 @@ -#----------------------------------------------------------- +# ----------------------------------------------------------- # Threaded, Gevent and Prefork Servers -#----------------------------------------------------------- +# ----------------------------------------------------------- import datetime import errno import logging @@ -20,14 +20,16 @@ from io import BytesIO import psutil import werkzeug.serving -if os.name == 'posix': +if os.name == "posix": # Unix only for workers import fcntl import resource + try: import inotify from inotify.adapters import InotifyTrees from inotify.constants import IN_MODIFY, IN_CREATE, IN_MOVED_TO + INOTIFY_LISTEN_EVENTS = IN_MODIFY | IN_CREATE | IN_MOVED_TO except ImportError: inotify = None @@ -60,31 +62,33 @@ from odoo.tools.misc import stripped_sys_argv, dumpstacks _logger = logging.getLogger(__name__) -SLEEP_INTERVAL = 60 # 1 min +SLEEP_INTERVAL = 60 # 1 min + def memory_info(process): """ :return: the relevant memory usage according to the OS in bytes. """ # psutil < 2.0 does not have memory_info, >= 3.0 does not have get_memory_info - pmem = (getattr(process, 'memory_info', None) or process.get_memory_info)() + pmem = (getattr(process, "memory_info", None) or process.get_memory_info)() # MacOSX allocates very large vms to all processes so we only monitor the rss usage. - if platform.system() == 'Darwin': + if platform.system() == "Darwin": return pmem.rss return pmem.vms def set_limit_memory_hard(): - if platform.system() != 'Linux': + if platform.system() != "Linux": return - limit_memory_hard = config['limit_memory_hard'] - if odoo.evented and config['limit_memory_hard_gevent']: - limit_memory_hard = config['limit_memory_hard_gevent'] + limit_memory_hard = config["limit_memory_hard"] + if odoo.evented and config["limit_memory_hard_gevent"]: + limit_memory_hard = config["limit_memory_hard_gevent"] if limit_memory_hard: rlimit = resource.RLIMIT_AS soft, hard = resource.getrlimit(rlimit) resource.setrlimit(rlimit, (limit_memory_hard, hard)) + def empty_pipe(fd): try: while os.read(fd, 1): @@ -93,21 +97,26 @@ def empty_pipe(fd): if e.errno not in [errno.EAGAIN]: raise -#---------------------------------------------------------- + +# ---------------------------------------------------------- # Werkzeug WSGI servers patched -#---------------------------------------------------------- +# ---------------------------------------------------------- class LoggingBaseWSGIServerMixIn(object): def handle_error(self, request, client_address): t, e, _ = sys.exc_info() if t == socket.error and e.errno == errno.EPIPE: # broken pipe, ignore error return - _logger.exception('Exception happened during processing of request from %s', client_address) + _logger.exception( + "Exception happened during processing of request from %s", client_address + ) + class BaseWSGIServerNoBind(LoggingBaseWSGIServerMixIn, werkzeug.serving.BaseWSGIServer): - """ werkzeug Base WSGI Server patched to skip socket binding. PreforkServer + """werkzeug Base WSGI Server patched to skip socket binding. PreforkServer use this class, sets the socket and calls the process_request() manually """ + def __init__(self, app): werkzeug.serving.BaseWSGIServer.__init__(self, "127.0.0.1", 0, app) # Directly close the socket. It will be replaced by WorkerHTTP when processing requests @@ -122,19 +131,19 @@ class BaseWSGIServerNoBind(LoggingBaseWSGIServerMixIn, werkzeug.serving.BaseWSGI class RequestHandler(werkzeug.serving.WSGIRequestHandler): def setup(self): # timeout to avoid chrome headless preconnect during tests - if config['test_enable'] or config['test_file']: + if config["test_enable"] or config["test_file"]: self.timeout = 5 # flag the current thread as handling a http request super(RequestHandler, self).setup() me = threading.current_thread() - me.name = 'odoo.service.http.request.%s' % (me.ident,) + me.name = "odoo.service.http.request.%s" % (me.ident,) def make_environ(self): environ = super().make_environ() # Add the TCP socket to environ in order for the websocket # connections to use it. - environ['socket'] = self.connection - if self.headers.get('Upgrade') == 'websocket': + environ["socket"] = self.connection + if self.headers.get("Upgrade") == "websocket": # Since the upgrade header is introduced in version 1.1, Firefox # won't accept a websocket connection if the version is set to # 1.0. @@ -144,7 +153,11 @@ class RequestHandler(werkzeug.serving.WSGIRequestHandler): def send_header(self, keyword, value): # Prevent `WSGIRequestHandler` from sending the connection close header (compatibility with werkzeug >= 2.1.1 ) # since it is incompatible with websocket. - if self.headers.get('Upgrade') == 'websocket' and keyword == 'Connection' and value == 'close': + if ( + self.headers.get("Upgrade") == "websocket" + and keyword == "Connection" + and value == "close" + ): # Do not keep processing requests. self.close_connection = True return @@ -156,15 +169,19 @@ class RequestHandler(werkzeug.serving.WSGIRequestHandler): # data. In the case of WebSocket connections, data should not be discarded. Replace the # rfile/wfile of this handler to prevent any further action (compatibility with werkzeug >= 2.3.x). # See: https://github.com/pallets/werkzeug/blob/2.3.x/src/werkzeug/serving.py#L334 - if self.headers.get('Upgrade') == 'websocket': + if self.headers.get("Upgrade") == "websocket": self.rfile = BytesIO() self.wfile = BytesIO() -class ThreadedWSGIServerReloadable(LoggingBaseWSGIServerMixIn, werkzeug.serving.ThreadedWSGIServer): - """ werkzeug Threaded WSGI Server patched to allow reusing a listen socket + +class ThreadedWSGIServerReloadable( + LoggingBaseWSGIServerMixIn, werkzeug.serving.ThreadedWSGIServer +): + """werkzeug Threaded WSGI Server patched to allow reusing a listen socket given by the environment, this is used by autoreload to keep the listen socket open when a reload happens. """ + def __init__(self, host, port, app): # The ODOO_MAX_HTTP_THREADS environment variable allows to limit the amount of concurrent # socket connections accepted by a threaded server, implicitly limiting the amount of @@ -177,10 +194,13 @@ class ThreadedWSGIServerReloadable(LoggingBaseWSGIServerMixIn, werkzeug.serving. # If the value can't be parsed to an integer then it's computed in an automated way to # half the size of db_maxconn because while most requests won't borrow cursors concurrently # there are some exceptions where some controllers might allocate two or more cursors. - self.max_http_threads = max((config['db_maxconn'] - config['max_cron_threads']) // 2, 1) + self.max_http_threads = max( + (config["db_maxconn"] - config["max_cron_threads"]) // 2, 1 + ) self.http_threads_sem = threading.Semaphore(self.max_http_threads) - super(ThreadedWSGIServerReloadable, self).__init__(host, port, app, - handler=RequestHandler) + super(ThreadedWSGIServerReloadable, self).__init__( + host, port, app, handler=RequestHandler + ) # See https://github.com/pallets/werkzeug/pull/770 # This allow the request threads to not be set as daemon @@ -189,14 +209,22 @@ class ThreadedWSGIServerReloadable(LoggingBaseWSGIServerMixIn, werkzeug.serving. def server_bind(self): SD_LISTEN_FDS_START = 3 - if os.environ.get('LISTEN_FDS') == '1' and os.environ.get('LISTEN_PID') == str(os.getpid()): + if os.environ.get("LISTEN_FDS") == "1" and os.environ.get("LISTEN_PID") == str( + os.getpid() + ): self.reload_socket = True - self.socket = socket.fromfd(SD_LISTEN_FDS_START, socket.AF_INET, socket.SOCK_STREAM) - _logger.info('HTTP service (werkzeug) running through socket activation') + self.socket = socket.fromfd( + SD_LISTEN_FDS_START, socket.AF_INET, socket.SOCK_STREAM + ) + _logger.info("HTTP service (werkzeug) running through socket activation") else: self.reload_socket = False super(ThreadedWSGIServerReloadable, self).server_bind() - _logger.info('HTTP service (werkzeug) running on %s:%s', self.server_name, self.server_port) + _logger.info( + "HTTP service (werkzeug) running on %s:%s", + self.server_name, + self.server_port, + ) def server_activate(self): if not self.reload_socket: @@ -209,10 +237,11 @@ class ThreadedWSGIServerReloadable(LoggingBaseWSGIServerMixIn, werkzeug.serving. to be able to get the thread object which is instantiated and set its start time as an attribute """ - t = threading.Thread(target = self.process_request_thread, - args = (request, client_address)) + t = threading.Thread( + target=self.process_request_thread, args=(request, client_address) + ) t.daemon = self.daemon_threads - t.type = 'http' + t.type = "http" t.start_time = time.time() t.start() @@ -232,22 +261,29 @@ class ThreadedWSGIServerReloadable(LoggingBaseWSGIServerMixIn, werkzeug.serving. self.http_threads_sem.release() super().shutdown_request(request) -#---------------------------------------------------------- + +# ---------------------------------------------------------- # FileSystem Watcher for autoreload and cache invalidation -#---------------------------------------------------------- +# ---------------------------------------------------------- class FSWatcherBase(object): def handle_file(self, path): - if path.endswith('.py') and not os.path.basename(path).startswith('.~'): + if path.endswith(".py") and not os.path.basename(path).startswith(".~"): try: - source = open(path, 'rb').read() + b'\n' - compile(source, path, 'exec') + source = open(path, "rb").read() + b"\n" + compile(source, path, "exec") except IOError: - _logger.error('autoreload: python code change detected, IOError for %s', path) + _logger.error( + "autoreload: python code change detected, IOError for %s", path + ) except SyntaxError: - _logger.error('autoreload: python code change detected, SyntaxError in %s', path) + _logger.error( + "autoreload: python code change detected, SyntaxError in %s", path + ) else: if not server_phoenix: - _logger.info('autoreload: python code updated, autoreload activated') + _logger.info( + "autoreload: python code updated, autoreload activated" + ) restart() return True @@ -256,18 +292,18 @@ class FSWatcherWatchdog(FSWatcherBase): def __init__(self): self.observer = Observer() for path in odoo.addons.__path__: - _logger.info('Watching addons folder %s', path) + _logger.info("Watching addons folder %s", path) self.observer.schedule(self, path, recursive=True) def dispatch(self, event): if isinstance(event, (FileCreatedEvent, FileModifiedEvent, FileMovedEvent)): if not event.is_directory: - path = getattr(event, 'dest_path', '') or event.src_path + path = getattr(event, "dest_path", "") or event.src_path self.handle_file(path) def start(self): self.observer.start() - _logger.info('AutoReload watcher running with watchdog') + _logger.info("AutoReload watcher running with watchdog") def stop(self): self.observer.stop() @@ -283,19 +319,21 @@ class FSWatcherInotify(FSWatcherBase): paths_to_watch = [] for path in odoo.addons.__path__: paths_to_watch.append(path) - _logger.info('Watching addons folder %s', path) - self.watcher = InotifyTrees(paths_to_watch, mask=INOTIFY_LISTEN_EVENTS, block_duration_s=.5) + _logger.info("Watching addons folder %s", path) + self.watcher = InotifyTrees( + paths_to_watch, mask=INOTIFY_LISTEN_EVENTS, block_duration_s=0.5 + ) def run(self): - _logger.info('AutoReload watcher running with inotify') - dir_creation_events = set(('IN_MOVED_TO', 'IN_CREATE')) + _logger.info("AutoReload watcher running with inotify") + dir_creation_events = set(("IN_MOVED_TO", "IN_CREATE")) while self.started: for event in self.watcher.event_gen(timeout_s=0, yield_nones=False): (_, type_names, path, filename) = event - if 'IN_ISDIR' not in type_names: + if "IN_ISDIR" not in type_names: # despite not having IN_DELETE in the watcher's mask, the # watcher sends these events when a directory is deleted. - if 'IN_DELETE' not in type_names: + if "IN_DELETE" not in type_names: full_path = os.path.join(path, filename) if self.handle_file(full_path): return @@ -308,7 +346,9 @@ class FSWatcherInotify(FSWatcherBase): def start(self): self.started = True - self.thread = threading.Thread(target=self.run, name="odoo.service.autoreload.watcher") + self.thread = threading.Thread( + target=self.run, name="odoo.service.autoreload.watcher" + ) self.thread.daemon = True self.thread.start() @@ -318,9 +358,10 @@ class FSWatcherInotify(FSWatcherBase): del self.watcher # ensures inotify watches are freed up before reexec -#---------------------------------------------------------- +# ---------------------------------------------------------- # Servers: Threaded, Gevented and Prefork -#---------------------------------------------------------- +# ---------------------------------------------------------- + class CommonServer(object): _on_stop_funcs = [] @@ -328,13 +369,13 @@ class CommonServer(object): def __init__(self, app): self.app = app # config - self.interface = config['http_interface'] or '0.0.0.0' - self.port = config['http_port'] + self.interface = config["http_interface"] or "0.0.0.0" + self.port = config["http_port"] # runtime self.pid = os.getpid() def close_socket(self, sock): - """ Closes a socket instance cleanly + """Closes a socket instance cleanly :param sock: the network socket to close :type sock: socket.socket """ @@ -350,13 +391,16 @@ class CommonServer(object): # of the other side (or something), see # http://bugs.python.org/issue4397 # note: stdlib fixed test, not behavior - if e.errno != errno.ENOTCONN or platform.system() not in ['Darwin', 'Windows']: + if e.errno != errno.ENOTCONN or platform.system() not in [ + "Darwin", + "Windows", + ]: raise sock.close() @classmethod def on_stop(cls, func): - """ Register a cleanup function to be executed when the server stops """ + """Register a cleanup function to be executed when the server stops""" cls._on_stop_funcs.append(func) def stop(self): @@ -376,7 +420,7 @@ class ThreadedServer(CommonServer): # below. This variable is monitored by ``quit_on_signals()``. self.quit_signals_received = 0 - #self.socket = None + # self.socket = None self.httpd = None self.limits_reached_threads = set() self.limit_reached_time = None @@ -391,7 +435,7 @@ class ThreadedServer(CommonServer): os._exit(0) # interrupt run() to start shutdown raise KeyboardInterrupt() - elif hasattr(signal, 'SIGXCPU') and sig == signal.SIGXCPU: + elif hasattr(signal, "SIGXCPU") and sig == signal.SIGXCPU: sys.stderr.write("CPU time limit exceeded! Shutting down immediately\n") sys.stderr.flush() os._exit(0) @@ -405,25 +449,38 @@ class ThreadedServer(CommonServer): def process_limit(self): memory = memory_info(psutil.Process(os.getpid())) - if config['limit_memory_soft'] and memory > config['limit_memory_soft']: - _logger.warning('Server memory limit (%s) reached.', memory) + if config["limit_memory_soft"] and memory > config["limit_memory_soft"]: + _logger.warning("Server memory limit (%s) reached.", memory) self.limits_reached_threads.add(threading.current_thread()) for thread in threading.enumerate(): - thread_type = getattr(thread, 'type', None) - if not thread.daemon and thread_type != 'websocket' or thread_type == 'cron': + thread_type = getattr(thread, "type", None) + if ( + not thread.daemon + and thread_type != "websocket" + or thread_type == "cron" + ): # We apply the limits on cron threads and HTTP requests, # websocket requests excluded. - if getattr(thread, 'start_time', None): + if getattr(thread, "start_time", None): thread_execution_time = time.time() - thread.start_time - thread_limit_time_real = config['limit_time_real'] - if (getattr(thread, 'type', None) == 'cron' and - config['limit_time_real_cron'] and config['limit_time_real_cron'] > 0): - thread_limit_time_real = config['limit_time_real_cron'] - if thread_limit_time_real and thread_execution_time > thread_limit_time_real: + thread_limit_time_real = config["limit_time_real"] + if ( + getattr(thread, "type", None) == "cron" + and config["limit_time_real_cron"] + and config["limit_time_real_cron"] > 0 + ): + thread_limit_time_real = config["limit_time_real_cron"] + if ( + thread_limit_time_real + and thread_execution_time > thread_limit_time_real + ): _logger.warning( - 'Thread %s virtual real time limit (%d/%ds) reached.', - thread, thread_execution_time, thread_limit_time_real) + "Thread %s virtual real time limit (%d/%ds) reached.", + thread, + thread_execution_time, + thread_limit_time_real, + ) self.limits_reached_threads.add(thread) # Clean-up threads that are no longer alive # e.g. threads that exceeded their real time, @@ -448,39 +505,45 @@ class ThreadedServer(CommonServer): # On NOTIFY, all workers are awaken at the same time, sleeping # just a bit prevents they all poll the database at the exact # same time. This is known as the thundering herd effect. - pass - # from odoo.addons.base.models.ir_cron import ir_cron - # conn = odoo.sql_db.db_connect('postgres') - # with conn.cursor() as cr: - # pg_conn = cr._cnx - # # LISTEN / NOTIFY doesn't work in recovery mode - # cr.execute("SELECT pg_is_in_recovery()") - # in_recovery = cr.fetchone()[0] - # if not in_recovery: - # cr.execute("LISTEN cron_trigger") - # else: - # _logger.warning("PG cluster in recovery mode, cron trigger not activated") - # cr.commit() + from odoo.addons.base.models.ir_cron import ir_cron - # while True: - # select.select([pg_conn], [], [], SLEEP_INTERVAL + number) - # time.sleep(number / 100) - # pg_conn.poll() + conn = odoo.sql_db.db_connect("postgres") + with conn.cursor() as cr: + pg_conn = cr._cnx + # LISTEN / NOTIFY doesn't work in recovery mode + cr.execute("SELECT pg_is_in_recovery()") + in_recovery = cr.fetchone()[0] + if not in_recovery: + cr.execute("LISTEN cron_trigger") + else: + _logger.warning( + "PG cluster in recovery mode, cron trigger not activated" + ) + cr.commit() - # registries = odoo.modules.registry.Registry.registries - # _logger.debug('cron%d polling for jobs', number) - # for db_name, registry in registries.d.items(): - # if registry.ready: - # thread = threading.current_thread() - # thread.start_time = time.time() - # try: - # ir_cron._process_jobs(db_name) - # except Exception: - # _logger.warning('cron%d encountered an Exception:', number, exc_info=True) - # thread.start_time = None + while True: + select.select([pg_conn], [], [], SLEEP_INTERVAL + number) + time.sleep(number / 100) + pg_conn.poll() + + registries = odoo.modules.registry.Registry.registries + _logger.debug("cron%d polling for jobs", number) + for db_name, registry in registries.d.items(): + if registry.ready: + thread = threading.current_thread() + thread.start_time = time.time() + try: + ir_cron._process_jobs(db_name) + except Exception: + _logger.warning( + "cron%d encountered an Exception:", + number, + exc_info=True, + ) + thread.start_time = None def cron_spawn(self): - """ Start the above runner function in a daemon thread. + """Start the above runner function in a daemon thread. The thread is a typical daemon thread: it will never quit and must be terminated when the main process exits - with no consequence (the processing @@ -490,13 +553,15 @@ class ThreadedServer(CommonServer): # Force call to strptime just before starting the cron thread # to prevent time.strptime AttributeError within the thread. # See: http://bugs.python.org/issue7980 - datetime.datetime.strptime('2012-01-01', '%Y-%m-%d') - for i in range(odoo.tools.config['max_cron_threads']): + datetime.datetime.strptime("2012-01-01", "%Y-%m-%d") + for i in range(odoo.tools.config["max_cron_threads"]): + def target(): self.cron_thread(i) + t = threading.Thread(target=target, name="odoo.service.cron.cron%d" % i) t.daemon = True - t.type = 'cron' + t.type = "cron" t.start() _logger.debug("cron%d started!" % i) @@ -511,7 +576,7 @@ class ThreadedServer(CommonServer): def start(self, stop=False): _logger.debug("Setting signal handlers") set_limit_memory_hard() - if os.name == 'posix': + if os.name == "posix": signal.signal(signal.SIGINT, self.signal_handler) signal.signal(signal.SIGTERM, self.signal_handler) signal.signal(signal.SIGCHLD, self.signal_handler) @@ -519,23 +584,27 @@ class ThreadedServer(CommonServer): signal.signal(signal.SIGXCPU, self.signal_handler) signal.signal(signal.SIGQUIT, dumpstacks) signal.signal(signal.SIGUSR1, log_ormcache_stats) - elif os.name == 'nt': + elif os.name == "nt": import win32api - win32api.SetConsoleCtrlHandler(lambda sig: self.signal_handler(sig, None), 1) - test_mode = config['test_enable'] or config['test_file'] - if test_mode or (config['http_enable'] and not stop): + win32api.SetConsoleCtrlHandler( + lambda sig: self.signal_handler(sig, None), 1 + ) + + test_mode = config["test_enable"] or config["test_file"] + if test_mode or (config["http_enable"] and not stop): # some tests need the http daemon to be available... self.http_spawn() def stop(self): - """ Shutdown the WSGI server. Wait for non daemon threads. - """ + """Shutdown the WSGI server. Wait for non daemon threads.""" if server_phoenix: _logger.info("Initiating server reload") else: _logger.info("Initiating shutdown") - _logger.info("Hit CTRL-C again or send a second signal to force the shutdown.") + _logger.info( + "Hit CTRL-C again or send a second signal to force the shutdown." + ) stop_time = time.time() @@ -548,14 +617,18 @@ class ThreadedServer(CommonServer): # to trigger _force_quit() in case some non-daemon threads won't exit cleanly. # threading.Thread.join() should not mask signals (at least in python 2.5). me = threading.current_thread() - _logger.debug('current thread: %r', me) + _logger.debug("current thread: %r", me) for thread in threading.enumerate(): - _logger.debug('process %r (%r)', thread, thread.daemon) - if (thread != me and not thread.daemon and thread.ident != self.main_thread_id and - thread not in self.limits_reached_threads): + _logger.debug("process %r (%r)", thread, thread.daemon) + if ( + thread != me + and not thread.daemon + and thread.ident != self.main_thread_id + and thread not in self.limits_reached_threads + ): while thread.is_alive() and (time.time() - stop_time) < 1: # We wait for requests to finish, up to 1 second. - _logger.debug('join and sleep') + _logger.debug("join and sleep") # Need a busyloop here as thread.join() masks signals # and would prevent the forced shutdown. thread.join(0.05) @@ -563,11 +636,11 @@ class ThreadedServer(CommonServer): odoo.sql_db.close_all() - _logger.debug('--') + _logger.debug("--") logging.shutdown() def run(self, preload=None, stop=False): - """ Start the http server and the cron thread then wait for a signal. + """Start the http server and the cron thread then wait for a signal. The first SIGINT or SIGTERM signal will initiate a graceful shutdown while a second one if any will force an immediate exit. @@ -577,14 +650,17 @@ class ThreadedServer(CommonServer): rc = preload_registries(preload) if stop: - if config['test_enable']: + if config["test_enable"]: from odoo.tests.result import _logger as logger # noqa: PLC0415 + with Registry.registries._lock: for db, registry in Registry.registries.d.items(): report = registry._assertion_report - log = logger.error if not report.wasSuccessful() \ - else logger.warning if not report.testsRun \ - else logger.info + log = ( + logger.error + if not report.wasSuccessful() + else logger.warning if not report.testsRun else logger.info + ) log("%s when loading database %r", report, db) self.stop() return rc @@ -598,17 +674,25 @@ class ThreadedServer(CommonServer): self.process_limit() if self.limit_reached_time: has_other_valid_requests = any( - not t.daemon and - t not in self.limits_reached_threads + not t.daemon and t not in self.limits_reached_threads for t in threading.enumerate() - if getattr(t, 'type', None) == 'http') - if (not has_other_valid_requests or - (time.time() - self.limit_reached_time) > SLEEP_INTERVAL): + if getattr(t, "type", None) == "http" + ) + if ( + not has_other_valid_requests + or (time.time() - self.limit_reached_time) > SLEEP_INTERVAL + ): # We wait there is no processing requests # other than the ones exceeding the limits, up to 1 min, # before asking for a reload. - _logger.info('Dumping stacktrace of limit exceeding threads before reloading') - dumpstacks(thread_idents=[thread.ident for thread in self.limits_reached_threads]) + _logger.info( + "Dumping stacktrace of limit exceeding threads before reloading" + ) + dumpstacks( + thread_idents=[ + thread.ident for thread in self.limits_reached_threads + ] + ) self.reload() # `reload` increments `self.quit_signals_received` # and the loop will end after this iteration, @@ -627,10 +711,11 @@ class ThreadedServer(CommonServer): def reload(self): os.kill(self.pid, signal.SIGHUP) + class GeventServer(CommonServer): def __init__(self, app): super(GeventServer, self).__init__(app) - self.port = config['gevent_port'] + self.port = config["gevent_port"] self.httpd = None def process_limits(self): @@ -639,9 +724,11 @@ class GeventServer(CommonServer): _logger.warning("Gevent Parent changed: %s", self.pid) restart = True memory = memory_info(psutil.Process(self.pid)) - limit_memory_soft = config['limit_memory_soft_gevent'] or config['limit_memory_soft'] + limit_memory_soft = ( + config["limit_memory_soft_gevent"] or config["limit_memory_soft"] + ) if limit_memory_soft and memory > limit_memory_soft: - _logger.warning('Gevent virtual memory limit reached: %s', memory) + _logger.warning("Gevent virtual memory limit reached: %s", memory) restart = True if restart: # suicide !! @@ -649,6 +736,7 @@ class GeventServer(CommonServer): def watchdog(self, beat=4): import gevent + self.ppid = os.getppid() while True: self.process_limits() @@ -656,31 +744,33 @@ class GeventServer(CommonServer): def start(self): import gevent + try: from gevent.pywsgi import WSGIServer, WSGIHandler except ImportError: from gevent.wsgi import WSGIServer, WSGIHandler class ProxyHandler(WSGIHandler): - """ When logging requests, try to get the client address from + """When logging requests, try to get the client address from the environment so we get proxyfix's modifications (if any). Derived from werzeug.serving.WSGIRequestHandler.log / werzeug.serving.WSGIRequestHandler.address_string """ + def _connection_upgrade_requested(self): - if self.headers.get('Connection', '').lower() == 'upgrade': + if self.headers.get("Connection", "").lower() == "upgrade": return True - if self.headers.get('Upgrade', '').lower() == 'websocket': + if self.headers.get("Upgrade", "").lower() == "websocket": return True return False def format_request(self): old_address = self.client_address - if getattr(self, 'environ', None): - self.client_address = self.environ['REMOTE_ADDR'] + if getattr(self, "environ", None): + self.client_address = self.environ["REMOTE_ADDR"] elif not self.client_address: - self.client_address = '' + self.client_address = "" # other cases are handled inside WSGIHandler try: return super().format_request() @@ -699,37 +789,43 @@ class GeventServer(CommonServer): # Add the TCP socket to environ in order for the websocket # connections to use it. environ = super().get_environ() - environ['socket'] = self.socket + environ["socket"] = self.socket # Disable support for HTTP chunking on reads which cause # an issue when the connection is being upgraded, see # https://github.com/gevent/gevent/issues/1712 if self._connection_upgrade_requested(): - environ['wsgi.input'] = self.rfile - environ['wsgi.input_terminated'] = False + environ["wsgi.input"] = self.rfile + environ["wsgi.input_terminated"] = False return environ set_limit_memory_hard() - if os.name == 'posix': + if os.name == "posix": # Set process memory limit as an extra safeguard signal.signal(signal.SIGQUIT, dumpstacks) signal.signal(signal.SIGUSR1, log_ormcache_stats) gevent.spawn(self.watchdog) self.httpd = WSGIServer( - (self.interface, self.port), self.app, - log=logging.getLogger('longpolling'), - error_log=logging.getLogger('longpolling'), + (self.interface, self.port), + self.app, + log=logging.getLogger("longpolling"), + error_log=logging.getLogger("longpolling"), handler_class=ProxyHandler, ) - _logger.info('Evented Service (longpolling) running on %s:%s', self.interface, self.port) + _logger.info( + "Evented Service (longpolling) running on %s:%s", self.interface, self.port + ) try: self.httpd.serve_forever() except: - _logger.exception("Evented Service (longpolling): uncaught error during main loop") + _logger.exception( + "Evented Service (longpolling): uncaught error during main loop" + ) raise def stop(self): import gevent + self.httpd.stop() super().stop() gevent.shutdown() @@ -738,19 +834,21 @@ class GeventServer(CommonServer): self.start() self.stop() + class PreforkServer(CommonServer): - """ Multiprocessing inspired by (g)unicorn. + """Multiprocessing inspired by (g)unicorn. PreforkServer (aka Multicorn) currently uses accept(2) as dispatching method between workers but we plan to replace it by a more intelligent dispatcher to will parse the first HTTP request line. """ + def __init__(self, app): super().__init__(app) # config - self.population = config['workers'] - self.timeout = config['limit_time_real'] - self.limit_request = config['limit_request'] - self.cron_timeout = config['limit_time_real_cron'] or None + self.population = config["workers"] + self.timeout = config["limit_time_real"] + self.limit_request = config["limit_request"] + self.cron_timeout = config["limit_time_real_cron"] or None if self.cron_timeout == -1: self.cron_timeout = self.timeout # working vars @@ -776,7 +874,7 @@ class PreforkServer(CommonServer): def pipe_ping(self, pipe): try: - os.write(pipe[1], b'.') + os.write(pipe[1], b".") except IOError as e: if e.errno not in [errno.EAGAIN, errno.EINTR]: raise @@ -803,7 +901,7 @@ class PreforkServer(CommonServer): def long_polling_spawn(self): nargs = stripped_sys_argv() - cmd = [sys.executable, sys.argv[0], 'gevent'] + nargs[1:] + cmd = [sys.executable, sys.argv[0], "gevent"] + nargs[1:] popen = subprocess.Popen(cmd) self.long_polling_pid = popen.pid @@ -869,22 +967,26 @@ class PreforkServer(CommonServer): def process_timeout(self): now = time.time() - for (pid, worker) in self.workers.items(): - if worker.watchdog_timeout is not None and \ - (now - worker.watchdog_time) >= worker.watchdog_timeout: - _logger.error("%s (%s) timeout after %ss", - worker.__class__.__name__, - pid, - worker.watchdog_timeout) + for pid, worker in self.workers.items(): + if ( + worker.watchdog_timeout is not None + and (now - worker.watchdog_time) >= worker.watchdog_timeout + ): + _logger.error( + "%s (%s) timeout after %ss", + worker.__class__.__name__, + pid, + worker.watchdog_timeout, + ) self.worker_kill(pid, signal.SIGKILL) def process_spawn(self): - if config['http_enable']: + if config["http_enable"]: while len(self.workers_http) < self.population: self.worker_spawn(WorkerHTTP, self.workers_http) if not self.long_polling_pid: self.long_polling_spawn() - while len(self.workers_cron) < config['max_cron_threads']: + while len(self.workers_cron) < config["max_cron_threads"]: self.worker_spawn(WorkerCron, self.workers_cron) def sleep(self): @@ -918,11 +1020,13 @@ class PreforkServer(CommonServer): signal.signal(signal.SIGQUIT, dumpstacks) signal.signal(signal.SIGUSR1, log_ormcache_stats) - if config['http_enable']: + if config["http_enable"]: # listen to socket - _logger.info('HTTP service (werkzeug) running on %s:%s', self.interface, self.port) + _logger.info( + "HTTP service (werkzeug) running on %s:%s", self.interface, self.port + ) family = socket.AF_INET - if ':' in self.interface: + if ":" in self.interface: family = socket.AF_INET6 self.socket = socket.socket(family, socket.SOCK_STREAM) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) @@ -971,7 +1075,7 @@ class PreforkServer(CommonServer): _logger.debug("Multiprocess starting") while 1: try: - #_logger.debug("Multiprocess beat (%s)",time.time()) + # _logger.debug("Multiprocess beat (%s)",time.time()) self.process_signals() self.process_zombie() self.process_timeout() @@ -986,8 +1090,10 @@ class PreforkServer(CommonServer): self.stop(False) return -1 + class Worker(object): - """ Workers """ + """Workers""" + def __init__(self, multi): self.multi = multi self.watchdog_time = time.time() @@ -1004,7 +1110,7 @@ class Worker(object): self.request_count = 0 def setproctitle(self, title=""): - setproctitle('odoo: %s %s %s' % (self.__class__.__name__, self.pid, title)) + setproctitle("odoo: %s %s %s" % (self.__class__.__name__, self.pid, title)) def close(self): os.close(self.watchdog_pipe[0]) @@ -1018,13 +1124,19 @@ class Worker(object): def signal_time_expired_handler(self, n, stack): # TODO: print actual RUSAGE_SELF (since last check_limits) instead of # just repeating the config setting - _logger.info('Worker (%d) CPU time limit (%s) reached.', self.pid, config['limit_time_cpu']) + _logger.info( + "Worker (%d) CPU time limit (%s) reached.", + self.pid, + config["limit_time_cpu"], + ) # We dont suicide in such case - raise Exception('CPU time limit exceeded.') + raise Exception("CPU time limit exceeded.") def sleep(self): try: - select.select([self.multi.socket, self.wakeup_fd_r], [], [], self.multi.beat) + select.select( + [self.multi.socket, self.wakeup_fd_r], [], [], self.multi.beat + ) # clear wakeup pipe if we were interrupted empty_pipe(self.wakeup_fd_r) except select.error as e: @@ -1038,13 +1150,17 @@ class Worker(object): self.alive = False # check for lifetime if self.request_count >= self.request_max: - _logger.info("Worker (%d) max request (%s) reached.", self.pid, self.request_count) + _logger.info( + "Worker (%d) max request (%s) reached.", self.pid, self.request_count + ) self.alive = False # Reset the worker if it consumes too much memory (e.g. caused by a memory leak). memory = memory_info(psutil.Process(os.getpid())) - if config['limit_memory_soft'] and memory > config['limit_memory_soft']: - _logger.info('Worker (%d) virtual memory limit (%s) reached.', self.pid, memory) - self.alive = False # Commit suicide after the request. + if config["limit_memory_soft"] and memory > config["limit_memory_soft"]: + _logger.info( + "Worker (%d) virtual memory limit (%s) reached.", self.pid, memory + ) + self.alive = False # Commit suicide after the request. set_limit_memory_hard() @@ -1052,7 +1168,9 @@ class Worker(object): r = resource.getrusage(resource.RUSAGE_SELF) cpu_time = r.ru_utime + r.ru_stime soft, hard = resource.getrlimit(resource.RLIMIT_CPU) - resource.setrlimit(resource.RLIMIT_CPU, (int(cpu_time + config['limit_time_cpu']), hard)) + resource.setrlimit( + resource.RLIMIT_CPU, (int(cpu_time + config["limit_time_cpu"]), hard) + ) def process_work(self): pass @@ -1087,13 +1205,19 @@ class Worker(object): def run(self): try: self.start() - t = threading.Thread(name="Worker %s (%s) workthread" % (self.__class__.__name__, self.pid), target=self._runloop) + t = threading.Thread( + name="Worker %s (%s) workthread" % (self.__class__.__name__, self.pid), + target=self._runloop, + ) t.daemon = True t.start() t.join() - _logger.info("Worker (%s) exiting. request_count: %s, registry count: %s.", - self.pid, self.request_count, - len(odoo.modules.registry.Registry.registries)) + _logger.info( + "Worker (%s) exiting. request_count: %s, registry count: %s.", + self.pid, + self.request_count, + len(odoo.modules.registry.Registry.registries), + ) self.stop() except Exception: _logger.exception("Worker (%s) Exception occurred, exiting...", self.pid) @@ -1101,10 +1225,15 @@ class Worker(object): sys.exit(1) def _runloop(self): - signal.pthread_sigmask(signal.SIG_BLOCK, { - signal.SIGXCPU, - signal.SIGINT, signal.SIGQUIT, signal.SIGUSR1, - }) + signal.pthread_sigmask( + signal.SIG_BLOCK, + { + signal.SIGXCPU, + signal.SIGINT, + signal.SIGQUIT, + signal.SIGUSR1, + }, + ) try: while self.alive: self.check_limits() @@ -1114,11 +1243,17 @@ class Worker(object): break self.process_work() except: - _logger.exception("Worker %s (%s) Exception occurred, exiting...", self.__class__.__name__, self.pid) + _logger.exception( + "Worker %s (%s) Exception occurred, exiting...", + self.__class__.__name__, + self.pid, + ) sys.exit(1) + class WorkerHTTP(Worker): - """ HTTP Request workers """ + """HTTP Request workers""" + def __init__(self, multi): super(WorkerHTTP, self).__init__(multi) @@ -1160,8 +1295,9 @@ class WorkerHTTP(Worker): Worker.start(self) self.server = BaseWSGIServerNoBind(self.multi.app) + class WorkerCron(Worker): - """ Cron workers """ + """Cron workers""" def __init__(self, multi): super(WorkerCron, self).__init__(multi) @@ -1169,18 +1305,20 @@ class WorkerCron(Worker): # The variable db_index is keeping track of the next database to # process. self.db_index = 0 - self.watchdog_timeout = multi.cron_timeout # Use a distinct value for CRON Worker + self.watchdog_timeout = ( + multi.cron_timeout + ) # Use a distinct value for CRON Worker def sleep(self): # Really sleep once all the databases have been processed. if self.db_index == 0: - interval = SLEEP_INTERVAL + self.pid % 10 # chorus effect + interval = SLEEP_INTERVAL + self.pid % 10 # chorus effect # simulate interruptible sleep with select(wakeup_fd, timeout) try: select.select([self.wakeup_fd_r, self.dbcursor._cnx], [], [], interval) # clear pg_conn/wakeup pipe if we were interrupted - time.sleep(self.pid / 100 % .1) + time.sleep(self.pid / 100 % 0.1) self.dbcursor._cnx.poll() empty_pipe(self.wakeup_fd_r) except select.error as e: @@ -1188,8 +1326,8 @@ class WorkerCron(Worker): raise def _db_list(self): - if config['db_name']: - db_names = config['db_name'].split(',') + if config["db_name"]: + db_names = config["db_name"].split(",") else: db_names = odoo.service.db.list_dbs(True) return db_names @@ -1203,6 +1341,7 @@ class WorkerCron(Worker): self.setproctitle(db_name) from odoo.addons import base + base.models.ir_cron.ir_cron._process_jobs(db_name) # dont keep cursors in multi database mode @@ -1210,20 +1349,24 @@ class WorkerCron(Worker): odoo.sql_db.close_db(db_name) self.request_count += 1 - if self.request_count >= self.request_max and self.request_max < len(db_names): - _logger.error("There are more dabatases to process than allowed " - "by the `limit_request` configuration variable: %s more.", - len(db_names) - self.request_max) + if self.request_count >= self.request_max and self.request_max < len( + db_names + ): + _logger.error( + "There are more dabatases to process than allowed " + "by the `limit_request` configuration variable: %s more.", + len(db_names) - self.request_max, + ) else: self.db_index = 0 def start(self): - os.nice(10) # mommy always told me to be nice with others... + os.nice(10) # mommy always told me to be nice with others... Worker.start(self) if self.multi.socket: self.multi.socket.close() - dbconn = odoo.sql_db.db_connect('postgres') + dbconn = odoo.sql_db.db_connect("postgres") self.dbcursor = dbconn.cursor() # LISTEN / NOTIFY doesn't work in recovery mode self.dbcursor.execute("SELECT pg_is_in_recovery()") @@ -1238,34 +1381,39 @@ class WorkerCron(Worker): super().stop() self.dbcursor.close() -#---------------------------------------------------------- + +# ---------------------------------------------------------- # start/stop public api -#---------------------------------------------------------- +# ---------------------------------------------------------- server = None server_phoenix = False + def load_server_wide_modules(): - server_wide_modules = {'base', 'web'} | set(odoo.conf.server_wide_modules) + server_wide_modules = {"base", "web"} | set(odoo.conf.server_wide_modules) for m in server_wide_modules: try: odoo.modules.module.load_openerp_module(m) except Exception: - msg = '' - if m == 'web': + msg = "" + if m == "web": msg = """ The `web` module is provided by the addons found in the `openerp-web` project. Maybe you forgot to add those addons in your addons_path configuration.""" - _logger.exception('Failed to load server-wide module `%s`.%s', m, msg) + _logger.exception("Failed to load server-wide module `%s`.%s", m, msg) + def _reexec(updated_modules=None): """reexecute openerp-server process with (nearly) the same arguments""" if odoo.tools.osutil.is_running_as_nt_service(): - subprocess.call('net stop {0} && net start {0}'.format(nt_service_name), shell=True) + subprocess.call( + "net stop {0} && net start {0}".format(nt_service_name), shell=True + ) exe = os.path.basename(sys.executable) args = stripped_sys_argv() if updated_modules: - args += ["-u", ','.join(updated_modules)] + args += ["-u", ",".join(updated_modules)] if not args or args[0] != exe: args.insert(0, exe) # We should keep the LISTEN_* environment variabled in order to support socket activation on reexec @@ -1276,91 +1424,113 @@ def load_test_file_py(registry, test_file): # pylint: disable=import-outside-toplevel from odoo.tests import loader # noqa: PLC0415 from odoo.tests.suite import OdooSuite # noqa: PLC0415 + threading.current_thread().testing = True try: test_path, _ = os.path.splitext(os.path.abspath(test_file)) - for mod in [m for m in get_modules() if '%s%s%s' % (os.path.sep, m, os.path.sep) in test_file]: + for mod in [ + m + for m in get_modules() + if "%s%s%s" % (os.path.sep, m, os.path.sep) in test_file + ]: for mod_mod in loader.get_test_modules(mod): - mod_path, _ = os.path.splitext(getattr(mod_mod, '__file__', '')) + mod_path, _ = os.path.splitext(getattr(mod_mod, "__file__", "")) if test_path == config._normalize(mod_path): tests = loader.get_module_test_cases(mod_mod) suite = OdooSuite(tests) - _logger.log(logging.INFO, 'running tests %s.', mod_mod.__name__) + _logger.log(logging.INFO, "running tests %s.", mod_mod.__name__) suite(registry._assertion_report) if not registry._assertion_report.wasSuccessful(): - _logger.error('%s: at least one error occurred in a test', test_file) + _logger.error( + "%s: at least one error occurred in a test", test_file + ) return finally: threading.current_thread().testing = False def preload_registries(dbnames): - """ Preload a registries, possibly run a test file.""" + """Preload a registries, possibly run a test file.""" # TODO: move all config checks to args dont check tools.config here dbnames = dbnames or [] rc = 0 for dbname in dbnames: try: - update_module = config['init'] or config['update'] + update_module = config["init"] or config["update"] threading.current_thread().dbname = dbname registry = Registry.new(dbname, update_module=update_module) # run test_file if provided - if config['test_file']: - test_file = config['test_file'] + if config["test_file"]: + test_file = config["test_file"] if not os.path.isfile(test_file): - _logger.warning('test file %s cannot be found', test_file) - elif not test_file.endswith('py'): - _logger.warning('test file %s is not a python file', test_file) + _logger.warning("test file %s cannot be found", test_file) + elif not test_file.endswith("py"): + _logger.warning("test file %s is not a python file", test_file) else: - _logger.info('loading test file %s', test_file) + _logger.info("loading test file %s", test_file) load_test_file_py(registry, test_file) # run post-install tests - if config['test_enable']: + if config["test_enable"]: from odoo.tests import loader # noqa: PLC0415 + t0 = time.time() t0_sql = odoo.sql_db.sql_counter - module_names = (registry.updated_modules if update_module else - sorted(registry._init_modules)) + module_names = ( + registry.updated_modules + if update_module + else sorted(registry._init_modules) + ) _logger.info("Starting post tests") tests_before = registry._assertion_report.testsRun - post_install_suite = loader.make_suite(module_names, 'post_install') + post_install_suite = loader.make_suite(module_names, "post_install") if post_install_suite.has_http_case(): with registry.cursor() as cr: env = odoo.api.Environment(cr, odoo.SUPERUSER_ID, {}) - env['ir.qweb']._pregenerate_assets_bundles() + env["ir.qweb"]._pregenerate_assets_bundles() result = loader.run_suite(post_install_suite) registry._assertion_report.update(result) - _logger.info("%d post-tests in %.2fs, %s queries", - registry._assertion_report.testsRun - tests_before, - time.time() - t0, - odoo.sql_db.sql_counter - t0_sql) + _logger.info( + "%d post-tests in %.2fs, %s queries", + registry._assertion_report.testsRun - tests_before, + time.time() - t0, + odoo.sql_db.sql_counter - t0_sql, + ) registry._assertion_report.log_stats() - if registry._assertion_report and not registry._assertion_report.wasSuccessful(): + if ( + registry._assertion_report + and not registry._assertion_report.wasSuccessful() + ): rc += 1 except Exception: - _logger.critical('Failed to initialize database `%s`.', dbname, exc_info=True) + _logger.critical( + "Failed to initialize database `%s`.", dbname, exc_info=True + ) return -1 return rc + def start(preload=None, stop=False): - """ Start the odoo http server and cron processor. - """ + """Start the odoo http server and cron processor.""" global server load_server_wide_modules() if odoo.evented: server = GeventServer(odoo.http.root) - elif config['workers']: - if config['test_enable'] or config['test_file']: + elif config["workers"]: + if config["test_enable"] or config["test_file"]: _logger.warning("Unit testing in workers mode could fail; use --workers 0.") server = PreforkServer(odoo.http.root) else: - if platform.system() == "Linux" and sys.maxsize > 2**32 and "MALLOC_ARENA_MAX" not in os.environ: + if ( + platform.system() == "Linux" + and sys.maxsize > 2**32 + and "MALLOC_ARENA_MAX" not in os.environ + ): # glibc's malloc() uses arenas [1] in order to efficiently handle memory allocation of multi-threaded # applications. This allows better memory allocation handling in case of multiple threads that # would be using malloc() concurrently [2]. @@ -1377,6 +1547,7 @@ def start(preload=None, stop=False): # [3] https://sourceware.org/git/?p=glibc.git;a=blob;f=malloc/malloc.c;h=00ce48c;hb=0a8262a#l862 try: import ctypes + libc = ctypes.CDLL("libc.so.6") M_ARENA_MAX = -8 assert libc.mallopt(ctypes.c_int(M_ARENA_MAX), ctypes.c_int(2)) @@ -1385,7 +1556,7 @@ def start(preload=None, stop=False): server = ThreadedServer(odoo.http.root) watcher = None - if 'reload' in config['dev_mode'] and not odoo.evented: + if "reload" in config["dev_mode"] and not odoo.evented: if inotify: watcher = FSWatcherInotify() watcher.start() @@ -1393,11 +1564,13 @@ def start(preload=None, stop=False): watcher = FSWatcherWatchdog() watcher.start() else: - if os.name == 'posix' and platform.system() != 'Darwin': - module = 'inotify' + if os.name == "posix" and platform.system() != "Darwin": + module = "inotify" else: - module = 'watchdog' - _logger.warning("'%s' module not installed. Code autoreload feature is disabled", module) + module = "watchdog" + _logger.warning( + "'%s' module not installed. Code autoreload feature is disabled", module + ) rc = server.run(preload, stop) @@ -1409,10 +1582,10 @@ def start(preload=None, stop=False): return rc if rc else 0 + def restart(): - """ Restart the server - """ - if os.name == 'nt': + """Restart the server""" + if os.name == "nt": # run in a thread to let the current thread return response to the caller. threading.Thread(target=_reexec).start() else: