2025-03-10 10:52:11 +07:00

382 lines
14 KiB

# Part of Odoo. See LICENSE file for full copyright and licensing details.
Italian E-invoice signed files content extraction.
There are two methods: OpenSSL and Fallback.
Sometimes OpenSSL fail in reading signed invoices for some error in the signature itself.
The Fallback method only has minimal code to extract the invoices' content without verifying the signature itself.
It's only to be used as a no-requirements fallback for OpenSSL.
import logging
import struct
import warnings
from contextlib import suppress
_logger = logging.getLogger(__name__)
def remove_signature(content, target=None):
""" Takes a bytestring supposedly PKCS7 signed and returns its PKCS7-data only """
for removal_strategy in (remove_signature_openssl, remove_signature_fallback):
if target:
target.remove_signature_method = removal_strategy.__name__
with suppress(Exception):
return removal_strategy(content)
# --------------------------------------------------------------------------------
# --------------------------------------------------------------------------------
def byte_to_bit_array(val):
""" Convert a byte to an array of zeros and ones """
return [((val & (1 << pos)) and 1) or 0 for pos in range(7, -1, -1)]
def bit_array_to_byte(val):
""" Convert an array of zeros and ones to byte """
value = 0
max_idx = len(val) - 1
for i in range(max_idx, -1, -1):
value += val[i] << max_idx - i
return value
# --------------------------------------------------------------------------------
# --------------------------------------------------------------------------------
from OpenSSL import crypto as ssl_crypto
import OpenSSL._util as ssl_util
except ImportError:
ssl_crypto = None
_logger.warning("Cannot import library 'OpenSSL' for PKCS#7 envelope extraction.")
def remove_signature_openssl(content):
""" Remove the PKCS#7 envelope from given content, making a '.xml.p7m' file content readable as it was '.xml'.
As OpenSSL may not be installed, in that case a warning is issued and None is returned. """
# Prevent using the library if it had import errors
if not ssl_crypto:
_logger.warning("Error reading the content, check if the OpenSSL library is installed for for PKCS#7 envelope extraction.")
return None
# Load some tools from the library
null = ssl_util.ffi.NULL
verify = ssl_util.lib.PKCS7_verify
# By default ignore the validity of the certificates, just validate the structure
flags = ssl_util.lib.PKCS7_NOVERIFY | ssl_util.lib.PKCS7_NOSIGS
# Read the signed data fron the content
out_buffer = ssl_crypto._new_mem_buf()
# This method is deprecated, but there are actually no alternatives
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=DeprecationWarning)
loaded_data = ssl_crypto.load_pkcs7_data(ssl_crypto.FILETYPE_ASN1, content)
# Verify the signature
if verify(loaded_data._pkcs7, null, null, null, out_buffer, flags) != 1:
# Get the content as a byte-string
return ssl_crypto._bio_to_string(out_buffer)
# --------------------------------------------------------------------------------
# --------------------------------------------------------------------------------
def remove_signature_fallback(content):
""" The invoice content is inside an ASN1 node identified by PKCS7_DATA_OID (pkcs7-data).
The node is defined as an OctectString, which can be composed of an arbitrary
sequence of octects of string data.
We visit in-order the ASN1 tree nodes until we find the pkcs7-data, then we look for content.
Once we found it, we read all OctectString that get yielded by the in-order visit..
When there are no more OctectStrings, then another object will follow
with its header and identifier, so we stop exploring and just return the content.
See also:
PKCS7_DATA_OID = '1.2.840.113549.1.7.1'
result, header_found, data_found = None, False, False
for node in Reader().build_from_stream(content):
if node.kind == 'ObjectIdentifier' and node.content == PKCS7_DATA_OID:
header_found = True
if header_found and node.kind == 'OctetString':
data_found = True
result = (result or b'') + node.content
elif data_found:
if not header_found:
raise Exception("ASN1 Header not found")
if not data_found:
raise Exception("ASN1 Content not found")
return result
# --------------------------------------------------------------------------------
# --------------------------------------------------------------------------------
universal_tags = {
0: 'Zero',
1: 'Boolean',
2: 'Integer',
3: 'BitString',
4: 'OctetString',
5: 'Null',
6: 'ObjectIdentifier',
7: 'ObjectDescriptor',
8: 'External',
9: 'Real',
10: 'Enumerated',
11: 'EmbeddedPDV',
12: 'UTF8String',
13: 'RelativeOid',
16: 'Sequence',
17: 'Set',
18: 'NumericString',
19: 'PrintableString',
20: 'TeletexString',
21: 'VideotexString',
22: 'IA5String',
23: 'UTCTime',
24: 'GeneralizedTime',
25: 'GraphicString',
26: 'VisibleString',
27: 'GeneralString',
28: 'UniversalString',
29: 'CharacterString',
30: 'BMPString',
# --------------------------------------------------------------------------------
# NODES (ASN1 parse)
# --------------------------------------------------------------------------------
class Asn1Node:
""" Base class for Asn1 nodes """
_content = None
def __init__(self, kind, start_offset, node_len, cls, parent=None):
""" Initialization of the Asn1 node """
if not (parent is None or issubclass(Asn1Node, parent.__class__)):
raise TypeError("parent must be an Asn1Node or None")
# Register to parent
self.parent = parent
if parent:
self.kind = kind
self.start_offset = start_offset
self.children = []
self.cls = cls
self.finalized = False
self.name = self.__class__.__name__.replace('Node', '')
self.length = node_len
def finalize(self, end_offset, content=None):
""" Closes the initialization of the Asn1 node, giving it content and finished length """
self.content = content
self.length = end_offset - self.start_offset
self.end_offset = end_offset
self.finalized = True
def total_length(self):
""" Get the total length of the node if defined. The definition and length bytes must be considered. """
return self.length + 2 if self.length != "?" else "?"
def content(self):
return self._content
def content(self, content):
if content is not None and not isinstance(content, bytes):
raise TypeError("content must be bytes or None")
self._content = content
class PrimitiveNode(Asn1Node):
""" Primitive Asn1 nodes contain pure data """
class OctetStringNode(PrimitiveNode):
""" Octet String Asn1 node """
class ObjectIdentifierNode(PrimitiveNode):
""" Asn1 Object Identifier, i.e. """
def content(self, content):
# Run through the content's bytes
calc = 0
result = ''
for idx, octet in enumerate(content):
# The first position is treated differently
if idx == 0:
result += "%s.%s" % (octet // 40, octet % 40)
# Other positions value the less significant 7 bits,
# but the most significant bit is only negative when there's a break
calc = (calc << 7) + (octet % 0x80)
break_it = not bool(octet // 0x80)
if break_it:
result += ".%s" % calc
calc = 0
self._content = result
# --------------------------------------------------------------------------------
# READER (ASN1 parse)
# --------------------------------------------------------------------------------
class Reader:
def __init__(self, *args, **kwargs):
def clear(self):
self.offset = 0
self.root = None
self.current_node = None
self.parent_node = None
self.open_nodes_stack = []
self.last_open_node = None
def finalize_last_open_node(self):
""" Whenever a node is complete, it is finalized, and the references are updated """
self.last_open_node = self.open_nodes_stack.pop()
self.last_open_node.finalize(self.offset, None)
self.parent_node = self.last_open_node.parent
self.current_node = None
finalized_node = self.last_open_node
self.last_open_node = self.open_nodes_stack[-1] if self.open_nodes_stack else None
return finalized_node
def build_from_stream(self, stream):
""" Build an Asn1 tree starting from a byte string from a p7m file """
while self.offset < len(stream):
start_offset = self.offset
self.last_open_node = self.open_nodes_stack[-1] if self.open_nodes_stack else None
# Read the definition and length bytes
definition_byte, self.offset = self.consume('B', stream, self.offset)
node_len, _bytes_read, self.offset = self.read_length(stream, self.offset)
if definition_byte == 0 and node_len == 0 and self.open_nodes_stack:
yield self.finalize_last_open_node()
# Create the current Node
self.current_node = self.create_node(definition_byte, node_len, start_offset, parent=self.parent_node)
if not self.root:
self.root = self.current_node
# If not primitive, add to the stack
if not issubclass(self.current_node.__class__, PrimitiveNode):
self.last_open_node = self.current_node
self.parent_node = self.current_node
data, self.offset = self.consume('%ss' % self.current_node.length, stream, self.offset)
self.current_node.finalize(self.offset, data)
yield self.current_node
# Clear the stack of all finished nodes
while (
and not self.last_open_node.finalized
and self.last_open_node.length != '?'
and self.last_open_node.start_offset + self.last_open_node.total_length() <= self.offset
yield self.finalize_last_open_node()
return self.root
def consume(self, _format, stream, offset):
""" Read from a bytes stream to get data out """
size = struct.calcsize(_format)
value = struct.unpack_from(_format, stream, offset)[0]
offset += size
return value, offset
def read_length(self, stream, offset):
""" Returns: (length of the node, bytes read, updated offset) """
# Read the first byte: if it is zero, it's a special entry.
# Probably it's the second byte of a closing tag of a node (\x00 \x00 <--)
first_byte, offset = self.consume('B', stream, offset)
if first_byte == 0:
return 0, 1, offset
# Convert byte to bits
bits = byte_to_bit_array(first_byte)
# If the first bit of the first length byte is on
if not bits[0]:
return first_byte, 1, offset
# If it's the only bit being set, the length is indefinite,
# and the node will terminate with a double \x00
if not any(bits[1:]):
return '?', 1, offset
# We turn off the first bit, and the rest is the number of bytes we have to read
bytes_read = bit_array_to_byte([0] + bits[1:])
# Each byte we read is less significant, so we increase the significance of the
# value we already read and increment by the current byte
node_len = 0
for dummy in range(1, bytes_read + 1):
current_byte, offset = self.consume('B', stream, offset)
node_len = (node_len << 8) + current_byte
return node_len, bytes_read, offset
def create_node(self, definition_byte, node_len, start_offset, parent=None):
""" Method to create new Asn1 nodes, given the definition bytes and the offset """
target_class = Asn1Node
kind = "Indefinite" if node_len == "?" else "Container"
node_classes = {
(0, 0): 'Universal',
(0, 1): 'Application',
(1, 0): 'Context-specific',
(1, 1): 'Private'
bits = byte_to_bit_array(definition_byte)
cls_bits = tuple(bits[0:2])
cls = node_classes[cls_bits]
if cls == 'Universal':
is_primitive = not bool(bits[2])
if is_primitive:
tag = definition_byte % (1 << 5)
kind = universal_tags.get(tag)
if kind:
subclasses = PrimitiveNode.__subclasses__()
target_classes = {x.__name__: x for x in subclasses}
target_class = target_classes.get("%sNode" % kind, PrimitiveNode)
return target_class(kind, start_offset, node_len, cls, parent)