138 lines
6.2 KiB
Python
138 lines
6.2 KiB
Python
import base64
|
|
import binascii
|
|
import requests
|
|
|
|
from datetime import datetime
|
|
from dateutil.relativedelta import relativedelta
|
|
from werkzeug.urls import url_join
|
|
|
|
from odoo import fields, models, api, _
|
|
from odoo.exceptions import UserError, ValidationError
|
|
from odoo.http import request
|
|
from odoo.tools.safe_eval import json
|
|
|
|
|
|
class ResCompany(models.Model):
|
|
_inherit = 'res.company'
|
|
|
|
l10n_ro_edi_client_id = fields.Char(string='Client ID')
|
|
l10n_ro_edi_client_secret = fields.Char(string='Client Secret')
|
|
l10n_ro_edi_access_token = fields.Char(string='Access Token')
|
|
l10n_ro_edi_refresh_token = fields.Char(string='Refresh Token')
|
|
l10n_ro_edi_access_expiry_date = fields.Date(string='Access Token Expiry Date')
|
|
l10n_ro_edi_refresh_expiry_date = fields.Date(string='Refresh Token Expiry Date')
|
|
l10n_ro_edi_callback_url = fields.Char(compute='_compute_l10n_ro_edi_callback_url')
|
|
l10n_ro_edi_test_env = fields.Boolean(string='Use Test Environment', default=True)
|
|
l10n_ro_edi_oauth_error = fields.Char() # Error field to be shown in case of error from the authentication process
|
|
|
|
@api.depends('country_code')
|
|
def _compute_l10n_ro_edi_callback_url(self):
|
|
""" Callback URLs are used for generating client_id and client_secret from l10n_ro_edi's setting. """
|
|
for company in self:
|
|
if company.country_code == 'RO':
|
|
company.l10n_ro_edi_callback_url = url_join(request.httprequest.url_root, 'l10n_ro_edi/callback/%s' % company.id)
|
|
else:
|
|
company.l10n_ro_edi_callback_url = False
|
|
|
|
def _l10n_ro_edi_log_message(self, message: str, func: str):
|
|
with self.pool.cursor() as cr:
|
|
self = self.with_env(self.env(cr=cr))
|
|
self.env['ir.logging'].sudo().create({
|
|
'name': 'l10n_ro_edi_log',
|
|
'type': 'server',
|
|
'level': 'INFO',
|
|
'dbname': self.env.cr.dbname,
|
|
'message': message,
|
|
'func': func,
|
|
'path': '',
|
|
'line': '1',
|
|
})
|
|
self.env.cr.commit()
|
|
|
|
def _l10n_ro_edi_process_token_response(self, response_json):
|
|
"""
|
|
To be called just after processing the json response from https://logincert.anaf.ro/anaf-oauth2/v1/token
|
|
This method reads and process the json, and writes the token fields on the company.
|
|
"""
|
|
self.ensure_one()
|
|
if 'access_token' not in response_json or 'refresh_token' not in response_json:
|
|
raise ValidationError(_("Token not found.\nResponse: %s", response_json))
|
|
|
|
# The access_token is in JWT format, which consists of 3 parts separated by '.':
|
|
# Header, Payload, and Signature. We only need the Payload part to decode the token
|
|
# and get the access expiry date
|
|
payload = response_json['access_token'].split('.')[1]
|
|
payload += '=' * (-len(payload) % 4)
|
|
decoded_payload = base64.b64decode(payload, altchars=b'-_', validate=True)
|
|
access_token_obj = json.loads(decoded_payload)
|
|
access_expiry_date = datetime.fromtimestamp(access_token_obj['exp'])
|
|
refresh_expiry_date = datetime.now() + relativedelta(years=3)
|
|
self.write({
|
|
'l10n_ro_edi_access_token': response_json['access_token'],
|
|
'l10n_ro_edi_refresh_token': response_json['refresh_token'],
|
|
'l10n_ro_edi_access_expiry_date': access_expiry_date,
|
|
'l10n_ro_edi_refresh_expiry_date': refresh_expiry_date,
|
|
'l10n_ro_edi_oauth_error': False,
|
|
})
|
|
|
|
def _l10n_ro_edi_refresh_access_token(self, session):
|
|
"""
|
|
Uses the saved client_id, client_secret, and refresh_token on the company (self)
|
|
to make request to the SPV and renew the company's token fields.
|
|
"""
|
|
self.ensure_one()
|
|
if not self.l10n_ro_edi_client_id or not self.l10n_ro_edi_client_secret:
|
|
raise UserError(_("Client ID and Client Secret field must be filled."))
|
|
if not self.l10n_ro_edi_refresh_token:
|
|
raise UserError(_("Refresh token not found"))
|
|
|
|
response = session.post(
|
|
url='https://logincert.anaf.ro/anaf-oauth2/v1/token',
|
|
headers={'Content-Type': 'application/x-www-form-urlencoded'},
|
|
timeout=10,
|
|
data={
|
|
'grant_type': 'refresh_token',
|
|
'refresh_token': self.l10n_ro_edi_refresh_token,
|
|
'client_id': self.l10n_ro_edi_client_id,
|
|
'client_secret': self.l10n_ro_edi_client_secret,
|
|
},
|
|
)
|
|
response_json = response.json()
|
|
self._l10n_ro_edi_process_token_response(response_json)
|
|
|
|
def _cron_l10n_ro_edi_refresh_access_token(self):
|
|
"""
|
|
This CRON method will be run every 30 days to refresh the following fields on the company:
|
|
|
|
- ``l10n_ro_edi_access_token``
|
|
- ``l10n_ro_edi_refresh_token``
|
|
- ``l10n_ro_edi_access_expiry_date``
|
|
- ``l10n_ro_edi_refresh_expiry_date``
|
|
"""
|
|
ro_companies = self.env['res.company'].sudo().search([
|
|
('l10n_ro_edi_refresh_token', '!=', False),
|
|
('l10n_ro_edi_client_id', '!=', False),
|
|
('l10n_ro_edi_client_secret', '!=', False),
|
|
])
|
|
session = requests.Session()
|
|
for company in ro_companies:
|
|
error_cause = ''
|
|
try:
|
|
company._l10n_ro_edi_refresh_access_token(session)
|
|
except ValidationError as e:
|
|
# From access/refresh token not found after sending request
|
|
error_cause = e
|
|
except requests.exceptions.RequestException as e:
|
|
error_cause = _("Error when converting response to json: %s", e)
|
|
except binascii.Error as e:
|
|
error_cause = _("Error when decoding the access token payload: %s", e)
|
|
except Exception as e:
|
|
error_cause = _("Error when refreshing the access token: %s", e)
|
|
|
|
if error_cause:
|
|
error_header = _("Refresh token failed [company=%(company_id)s]", company_id=company.id)
|
|
self._l10n_ro_edi_log_message(
|
|
message=f'{error_header}\n{error_cause}',
|
|
func='_cron_l10n_ro_edi_refresh_access_token',
|
|
)
|