[IMP] runbot: improve build error cleaning action

With this commit, when build errors are re-cleaned, they are also merged
if the fingerprints when fingerprints are matching.

Also, this fixes the ir_logging compute that associate a build error
so that the active build error is preferred over an inactive one.
Christophe Monniez 2023-12-06 16:34:33 +01:00 committed by xdo
parent c513edc6ad
commit 7ce79b434e
3 changed files with 139 additions and 4 deletions

@ -6,7 +6,7 @@ import re
from collections import defaultdict
from dateutil.relativedelta import relativedelta
from fnmatch import fnmatch
from werkzeug.urls import url_join
from odoo import models, fields, api
from odoo.exceptions import ValidationError, UserError
@ -81,6 +81,8 @@ class BuildError(models.Model):
raise UserError("This error as a test-tag and can only be (de)activated by admin")
if not vals['active'] and build_error.last_seen_date + relativedelta(days=1) > fields.Datetime.now():
raise UserError("This error broke less than one day ago can only be deactivated by admin")
if 'cleaned_content' in vals:
vals.update({'fingerprint': self._digest(vals['cleaned_content'])})
result = super(BuildError, self).write(vals)
if vals.get('parent_id'):
for build_error in self:
@ -224,7 +226,51 @@ class BuildError(models.Model):
def _search_trigger_ids(self, operator, value):
return [('build_ids.trigger_id', operator, value)]
def _get_form_url(self):
return url_join(self.get_base_url(), f'/web#id={self.id}&model=runbot.build.error&view_type=form')
def _get_form_link(self):
return f'<a href="{self._get_form_url()}">{self.id}</a>'
def _merge(self):
if len(self) < 2:
_logger.debug('Merging errors %s', self)
base_error = self[0]
base_linked = self[0].parent_id or self[0]
old_error_ids = []
for error in self[1:]:
assert base_error.fingerprint == error.fingerprint, f'Errors {base_error.id} and {error.id} have a different fingerprint'
if error.test_tags and not base_linked.test_tags:
base_linked.test_tags = error.test_tags
if not base_linked.active and error.active:
base_linked.active = True
base_error.message_post(body=f'⚠ test-tags inherited from error {error._get_form_link()}')
elif base_linked.test_tags and error.test_tags and base_linked.test_tags != error.test_tags:
base_error.message_post(body=f'⚠ trying to merge errors with different test-tags from {error._get_form_link()} tag: "{error.test_tags}"')
error.message_post(body=f'⚠ trying to merge errors with different test-tags from {base_error._get_form_link()} tag: "{base_error.test_tags}"')
base_error.build_ids += error.build_ids
error.build_ids = False
if error.responsible and not base_linked.responsible:
base_error.responsible = error.responsible
elif base_linked.responsible and error.responsible and base_linked.responsible != error.responsible:
base_linked.message_post(body=f'⚠ responsible in merged error {error._get_form_link()} was "{error.responsible.name}" and different from this one')
if error.team_id and not base_error.team_id:
base_error.team_id = error.team_id
base_error.message_post(body=f'Error {error._get_form_link()} was merged into this one')
error.message_post(body=f'Error was merged into {base_linked._get_form_link()}')
error.child_ids.parent_id = base_error
error.active = False
# Actions
@ -240,9 +286,21 @@ class BuildError(models.Model):
build_errors[1:].write({'parent_id': build_errors[0].id})
def action_clean_content(self):
_logger.info('Cleaning %s build errors', len(self))
cleaning_regs = self.env['runbot.error.regex'].search([('re_type', '=', 'cleaning')])
changed_fingerprints = set()
for build_error in self:
fingerprint_before = build_error.fingerprint
build_error.cleaned_content = cleaning_regs._r_sub('%', build_error.content)
if fingerprint_before != build_error.fingerprint:
# merge identical errors
errors_by_fingerprint = self.env['runbot.build.error'].search([('fingerprint', 'in', list(changed_fingerprints))])
for fingerprint in changed_fingerprints:
errors_to_merge = errors_by_fingerprint.filtered(lambda r: r.fingerprint == fingerprint)
rec_id = errors_to_merge._merge()
def action_assign(self):
if not any((not record.responsible and not record.team_id and record.file_path and not record.parent_id) for record in self):

@ -56,7 +56,7 @@ class IrLogging(models.Model):
ir_logging.error_id = False
if ir_logging.level in ('ERROR', 'CRITICAL', 'WARNING') and ir_logging.type == 'server':
fingerprints[self.env['runbot.build.error']._digest(cleaning_regexes._r_sub('%', ir_logging.message))].append(ir_logging)
for build_error in self.env['runbot.build.error'].search([('fingerprint', 'in', list(fingerprints.keys()))], order='active asc'):
for build_error in self.env['runbot.build.error'].search([('fingerprint', 'in', list(fingerprints.keys()))], order='active asc'):
for ir_logging in fingerprints[build_error.fingerprint]:
ir_logging.error_id = build_error.id

@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
import hashlib
from odoo.exceptions import ValidationError
from .common import RunbotCase
@ -30,6 +31,82 @@ class TestBuildError(RunbotCase):
super(TestBuildError, self).setUp()
self.BuildError = self.env['runbot.build.error']
self.BuildErrorTeam = self.env['runbot.team']
self.ErrorRegex = self.env['runbot.error.regex']
def test_create_write_clean(self):
'regex': '\d+',
're_type': 'cleaning',
error_x = self.BuildError.create({
'content': 'foo bar 242',
expected = 'foo bar %'
expected_hash = hashlib.sha256(expected.encode()).hexdigest()
self.assertEqual(error_x.cleaned_content, expected)
self.assertEqual(error_x.fingerprint, expected_hash)
# Let's ensure that the fingerprint changes if we clean with an additional regex
'regex': 'bar',
're_type': 'cleaning',
expected = 'foo % %'
expected_hash = hashlib.sha256(expected.encode()).hexdigest()
self.assertEqual(error_x.cleaned_content, expected)
self.assertEqual(error_x.fingerprint, expected_hash)
def test_merge(self):
build_a = self.create_test_build({'local_result': 'ko', 'local_state': 'done'})
error_a = self.BuildError.create({'content': 'foo bar', 'build_ids': [(6, 0, [build_a.id])]})
build_b = self.create_test_build({'local_result': 'ko', 'local_state': 'done'})
error_b = self.BuildError.create({'content': 'foo bar', 'build_ids': [(6, 0, [build_b.id])]})
(error_a | error_b)._merge()
self.assertEqual(len(self.BuildError.search([('fingerprint', '=', error_a.fingerprint)])), 1)
self.assertTrue(error_a.active, 'The first merged error should stay active')
self.assertFalse(error_b.active, 'The second merged error should have stay deactivated')
self.assertIn(build_a, error_a.build_ids)
self.assertIn(build_b, error_a.build_ids)
error_c = self.BuildError.create({'content': 'foo foo'})
# let's ensure we cannot merge errors with different fingerprints
with self.assertRaises(AssertionError):
(error_a | error_c)._merge()
def test_merge_linked(self):
top_error = self.BuildError.create({'content': 'foo foo', 'active': False})
build_a = self.create_test_build({'local_result': 'ko', 'local_state': 'done'})
error_a = self.BuildError.create({'content': 'foo bar', 'parent_id': top_error.id , 'build_ids': [(6, 0, [build_a.id])]})
build_b = self.create_test_build({'local_result': 'ko', 'local_state': 'done'})
error_b = self.BuildError.create({'content': 'foo bar', 'test_tags': 'footag', 'build_ids': [(6, 0, [build_b.id])]})
linked_error = self.BuildError.create({'content': 'foo foo bar', 'parent_id': error_b.id})
(error_a | error_b)._merge()
self.assertEqual(len(self.BuildError.search([('fingerprint', '=', error_a.fingerprint)])), 1)
self.assertTrue(error_a.active, 'The first merged error should stay active')
self.assertFalse(error_b.active, 'The second merged error should have stay deactivated')
self.assertIn(build_a, error_a.build_ids)
self.assertIn(build_b, error_a.build_ids)
self.assertEqual(top_error.test_tags, 'footag')
self.assertEqual(top_error.active, True)
self.assertEqual(linked_error.parent_id, error_a, 'Linked errors to a merged one should be now linked to the new one')
tagged_error = self.BuildError.create({'content': 'foo foo', 'test_tags': 'bartag'})
(top_error | tagged_error)._merge()
self.assertTrue(tagged_error.active, 'A differently tagged error cannot be deactivated by the merge')
def test_build_scan(self):
IrLog = self.env['ir.logging']