# -*- coding: utf-8 -*- import datetime import json import logging import re import subprocess import time import requests import markupsafe from pathlib import Path from odoo import models, fields, api from odoo.tools import file_open from ..common import os, RunbotException, make_github_session, sanitize from odoo.exceptions import UserError from odoo.tools.safe_eval import safe_eval _logger = logging.getLogger(__name__) class Trigger(models.Model): """ List of repo parts that must be part of the same bundle """ _name = 'runbot.trigger' _inherit = 'mail.thread' _description = 'Triggers' _order = 'sequence, id' sequence = fields.Integer('Sequence') name = fields.Char("Name") description = fields.Char("Description", help="Informative description") project_id = fields.Many2one('runbot.project', string="Project id", required=True) repo_ids = fields.Many2many('runbot.repo', relation='runbot_trigger_triggers', string="Triggers", domain="[('project_id', '=', project_id)]") dependency_ids = fields.Many2many('runbot.repo', relation='runbot_trigger_dependencies', string="Dependencies") config_id = fields.Many2one('runbot.build.config', string="Config", required=True) batch_dependent = fields.Boolean('Batch Dependent', help="Force adding batch in build parameters to make it unique and give access to bundle") ci_context = fields.Char("CI context", tracking=True) category_id = fields.Many2one('runbot.category', default=lambda self: self.env.ref('runbot.default_category', raise_if_not_found=False)) version_domain = fields.Char(string="Version domain") hide = fields.Boolean('Hide trigger on main page') manual = fields.Boolean('Only start trigger manually', default=False) restore_trigger_id = fields.Many2one('runbot.trigger', string='Restore Trigger ID for custom triggers', help="Mainly usefull to automatically define where to find a reference database when creating a custom trigger", tracking=True) upgrade_dumps_trigger_id = fields.Many2one('runbot.trigger', string='Template/complement trigger', tracking=True) upgrade_step_id = fields.Many2one('runbot.build.config.step', compute="_compute_upgrade_step_id", store=True) ci_url = fields.Char("CI url") ci_description = fields.Char("CI description") has_stats = fields.Boolean('Has a make_stats config step', compute="_compute_has_stats", store=True) team_ids = fields.Many2many('runbot.team', string="Runbot Teams", help="Teams responsible of this trigger, mainly usefull for nightly") active = fields.Boolean("Active", default=True) @api.depends('config_id.step_order_ids.step_id.make_stats') def _compute_has_stats(self): for trigger in self: trigger.has_stats = any(trigger.config_id.step_order_ids.step_id.mapped('make_stats')) @api.depends('upgrade_dumps_trigger_id', 'config_id', 'config_id.step_order_ids.step_id.job_type') def _compute_upgrade_step_id(self): for trigger in self: trigger.upgrade_step_id = False if trigger.upgrade_dumps_trigger_id: trigger.upgrade_step_id = self._upgrade_step_from_config(trigger.config_id) def _upgrade_step_from_config(self, config): upgrade_step = next((step_order.step_id for step_order in config.step_order_ids if step_order.step_id._is_upgrade_step()), False) if not upgrade_step: upgrade_step = next((step_order.step_id for step_order in config.step_order_ids if step_order.step_id.job_type == 'python'), False) if not upgrade_step: raise UserError('Upgrade trigger should have a config with step of type Configure Upgrade') return upgrade_step def _reference_builds(self, bundle): self.ensure_one() if self.upgrade_step_id: # this is an upgrade trigger, add corresponding builds custom_config = next((trigger_custom.config_id for trigger_custom in bundle.trigger_custom_ids if trigger_custom.trigger_id == self), False) step = self._upgrade_step_from_config(custom_config) if custom_config else self.upgrade_step_id refs_builds = step._reference_builds(bundle, self) return [(4, b.id) for b in refs_builds] return [] def _get_version_domain(self): if self.version_domain: return safe_eval(self.version_domain) return [] class Remote(models.Model): """ Regroups repo and it duplicates (forks): odoo+odoo-dev for each repo """ _name = 'runbot.remote' _description = 'Remote' _order = 'sequence, id' _inherit = 'mail.thread' name = fields.Char('Url', required=True, tracking=True) repo_id = fields.Many2one('runbot.repo', required=True, tracking=True) owner = fields.Char(compute='_compute_base_infos', string='Repo Owner', store=True, readonly=True, tracking=True) repo_name = fields.Char(compute='_compute_base_infos', string='Repo Name', store=True, readonly=True, tracking=True) repo_domain = fields.Char(compute='_compute_base_infos', string='Repo domain', store=True, readonly=True, tracking=True) base_url = fields.Char(compute='_compute_base_url', string='Base URL', readonly=True, tracking=True) short_name = fields.Char('Short name', compute='_compute_short_name', tracking=True) remote_name = fields.Char('Remote name', compute='_compute_remote_name', tracking=True) sequence = fields.Integer('Sequence', tracking=True) fetch_heads = fields.Boolean('Fetch branches', default=True, tracking=True) fetch_pull = fields.Boolean('Fetch PR', default=False, tracking=True) send_status = fields.Boolean('Send status', default=False, tracking=True) token = fields.Char("Github token", groups="runbot.group_runbot_admin") @api.depends('name') def _compute_base_infos(self): for remote in self: name = re.sub('.+@', '', remote.name) name = re.sub('^https://', '', name) # support https repo style name = re.sub('.git$', '', name) name = name.replace(':', '/') s = name.split('/') remote.repo_domain = s[-3] remote.owner = s[-2] remote.repo_name = s[-1] @api.depends('repo_domain', 'owner', 'repo_name') def _compute_base_url(self): for remote in self: remote.base_url = '%s/%s/%s' % (remote.repo_domain, remote.owner, remote.repo_name) @api.depends('name', 'base_url') def _compute_short_name(self): for remote in self: remote.short_name = '/'.join(remote.base_url.split('/')[-2:]) def _compute_remote_name(self): for remote in self: remote.remote_name = sanitize(remote.short_name) def create(self, values_list): remote = super().create(values_list) if not remote.repo_id.main_remote_id: remote.repo_id.main_remote_id = remote remote._cr.postcommit.add(remote.repo_id._update_git_config) return remote def write(self, values): res = super().write(values) self._cr.postcommit.add(self.repo_id._update_git_config) return res def _github(self, url, payload=None, ignore_errors=False, nb_tries=2, recursive=False, session=None): generator = self.sudo()._github_generator(url, payload=payload, ignore_errors=ignore_errors, nb_tries=nb_tries, recursive=recursive, session=session) if recursive: return generator result = list(generator) return result[0] if result else False def _github_generator(self, url, payload=None, ignore_errors=False, nb_tries=2, recursive=False, session=None): """Return a http request to be sent to github""" for remote in self: if remote.owner and remote.repo_name and remote.repo_domain: url = url.replace(':owner', remote.owner) url = url.replace(':repo', remote.repo_name) url = 'https://api.%s%s' % (remote.repo_domain, url) session = session or make_github_session(remote.token) while url: if recursive: _logger.info('Getting page %s', url) try_count = 0 while try_count < nb_tries: try: if payload: response = session.post(url, data=json.dumps(payload)) else: response = session.get(url) response.raise_for_status() if try_count > 0: _logger.info('Success after %s tries', (try_count + 1)) if recursive: link = response.headers.get('link') url = False if link: url = {link.split(';')[1]: link.split(';')[0] for link in link.split(',')}.get(' rel="next"') if url: url = url.strip('<> ') yield response.json() break else: yield response.json() return except requests.HTTPError: try_count += 1 if try_count < nb_tries: time.sleep(2) else: if ignore_errors: _logger.exception('Ignored github error %s %r (try %s/%s)', url, payload, try_count, nb_tries) url = False else: raise def action_check_token(self): if not self.user_has_groups('runbot.group_runbot_admin'): raise UserError('This action is restricted to admin users') token_results = {} for repo in self: session = make_github_session(repo.token) if repo.token not in token_results: token_results[repo.token] = session.get("https://api.github.com/user") response = token_results[repo.token] try: limit_total = response.headers['X-RateLimit-Limit'] limit_used = response.headers['X-RateLimit-Used'] limit_remaining = response.headers['X-RateLimit-Remaining'] limit_reset = datetime.datetime.fromtimestamp(int(response.headers['X-RateLimit-Reset'])) json = response.json() login = json['login'] user_id = json['id'] html_url = json['html_url'] avatar_url = json['avatar_url'] repo_access_response = session.get(f'https://api.github.com/repos/{repo.owner}/{self.repo_name}/collaborators/{login}/permission') if repo_access_response.status_code == 200: repo_access = repo_access_response.json() permission = repo_access['permission'] permissions = repo_access['user']['permissions'] response access_info = markupsafe.Markup(''' Permissions: %s