[IMP] runbot: secure webhook with secrets

Use github secret and header verification to validate that the sender is
indeed github.
This commit is contained in:
William Braeckman 2025-02-26 11:28:37 +01:00
parent 34a92ac0cb
commit 78d00aee38
4 changed files with 67 additions and 2 deletions

View File

@ -3,6 +3,10 @@
import time
import json
import logging
import hashlib
import hmac
from werkzeug.exceptions import BadRequest
from odoo import http
from odoo.http import request
@ -10,12 +14,34 @@ from odoo.http import request
_logger = logging.getLogger(__name__)
def verify_signature(payload_body, remote, signature_header):
"""Verify that the payload was sent from GitHub by validating SHA256.
Raise and return 403 if not authorized.
Args:
payload_body: original request body to verify (request.body())
remote: runbot.remote
signature_header: header received from GitHub (x-hub-signature-256)
"""
if not remote.webhook_secret:
return
if not signature_header:
_logger.info('Received payload without signature header')
raise BadRequest(description="x-hub-signature-256 header is missing!")
hash_object = hmac.new(remote.webhook_secret.encode('utf-8'), msg=payload_body, digestmod=hashlib.sha256)
expected_signature = "sha256=" + hash_object.hexdigest()
if not hmac.compare_digest(expected_signature, signature_header):
_logger.info('Received payload with invalid signature for remote %s', remote.name)
raise BadRequest(description="Request signatures didn't match!")
class Hook(http.Controller):
@http.route(['/runbot/hook', '/runbot/hook/<int:remote_id>'], type='http', auth="public", website=True, csrf=False, sitemap=False)
def hook(self, remote_id=None, **_post):
event = request.httprequest.headers.get("X-Github-Event")
payload = json.loads(request.params.get('payload', '{}'))
payload_str = request.params.get('payload', '{}')
payload = json.loads(payload_str)
if remote_id is None:
repo_data = payload.get('repository')
if repo_data:
@ -31,7 +57,13 @@ class Hook(http.Controller):
remote_id = remote.id
if not remote_id:
_logger.error("Remote %s not found", repo_data['ssh_url'])
remote = request.env['runbot.remote'].sudo().browse(remote_id)
remote = request.env['runbot.remote'].sudo().browse(remote_id).exists()
if not remote:
raise BadRequest(description='Invalid remote')
verify_signature(
payload_str.encode('utf-8'), remote.webhook_secret,
request.httprequest.headers.get('X-Hub-Signature-256')
)
# force update of dependencies too in case a hook is lost
if not payload or event == 'push':

View File

@ -220,6 +220,7 @@ class Remote(models.Model):
send_status = fields.Boolean('Send status', default=False, tracking=True)
token = fields.Char("Github token", groups="runbot.group_runbot_admin")
webhook_secret = fields.Char("Webhook secret", groups="runbot.group_runbot_admin")
@api.depends('name')
def _compute_base_infos(self):

View File

@ -3,15 +3,20 @@ import datetime
import logging
import time
import re
import hmac
import hashlib
from unittest import skip
from unittest.mock import patch, Mock
from subprocess import CalledProcessError
from werkzeug.exceptions import HTTPException
from odoo import fields
from odoo.tests import common, TransactionCase
from odoo.tools import mute_logger
from odoo.addons.runbot.controllers.hook import verify_signature
from .common import RunbotCase, RunbotCaseMinimalSetup
_logger = logging.getLogger(__name__)
@ -364,6 +369,31 @@ class TestGithub(TransactionCase):
self.assertEqual(2, mock_session.return_value.post.call_count, "_github method should try two times by default")
def test_verify_signature(self):
project = self.env['runbot.project'].create({'name': 'Tests'})
repo_server = self.env['runbot.repo'].create({
'name': 'server',
'project_id': project.id,
})
remote = self.env['runbot.remote'].create({
'name': 'bla@example.com:base/server',
'repo_id': repo_server.id,
})
# Should not raise -> no secret on remote
payload_body = '{"payload": "data"}'.encode('utf-8')
verify_signature(payload_body, remote, '')
verify_signature(payload_body, remote, None)
# Should not raise, valid signature
remote.webhook_secret = 'IAMRUNBOT'
signature_header = f'sha256={hmac.new(remote.webhook_secret.encode("utf-8"), msg=payload_body, digestmod=hashlib.sha256).hexdigest()}'
verify_signature(payload_body, remote, signature_header)
# Should raise invalid signature
with self.assertRaises(HTTPException):
verify_signature(payload_body, remote, 'invalid header')
# Should raise if no signature and webhook_secret is set
with self.assertRaises(HTTPException):
verify_signature(payload_body, remote, None)
class TestFetch(RunbotCase):

View File

@ -179,6 +179,7 @@
<field name="fetch_pull" string="PR"/>
<field name="send_status"/>
<field name="token" password="True"/>
<field name="webhook_secret" password="True"/>
</list>
</field>
</group>
@ -217,6 +218,7 @@
<field name="sequence"/>
<field name="repo_id"/>
<field name="token"/>
<field name="webhook_secret"/>
<field name="fetch_pull"/>
<field name="fetch_heads"/>
<field name="send_status"/>