382 lines
14 KiB
Python
382 lines
14 KiB
Python
|
# Part of Odoo. See LICENSE file for full copyright and licensing details.
|
||
|
|
||
|
"""
|
||
|
Italian E-invoice signed files content extraction.
|
||
|
|
||
|
There are two methods: OpenSSL and Fallback.
|
||
|
Sometimes OpenSSL fail in reading signed invoices for some error in the signature itself.
|
||
|
The Fallback method only has minimal code to extract the invoices' content without verifying the signature itself.
|
||
|
It's only to be used as a no-requirements fallback for OpenSSL.
|
||
|
"""
|
||
|
|
||
|
import logging
|
||
|
import struct
|
||
|
import warnings
|
||
|
from contextlib import suppress
|
||
|
|
||
|
_logger = logging.getLogger(__name__)
|
||
|
|
||
|
|
||
|
def remove_signature(content, target=None):
|
||
|
""" Takes a bytestring supposedly PKCS7 signed and returns its PKCS7-data only """
|
||
|
for removal_strategy in (remove_signature_openssl, remove_signature_fallback):
|
||
|
if target:
|
||
|
target.remove_signature_method = removal_strategy.__name__
|
||
|
with suppress(Exception):
|
||
|
return removal_strategy(content)
|
||
|
|
||
|
# --------------------------------------------------------------------------------
|
||
|
# UTILS
|
||
|
# --------------------------------------------------------------------------------
|
||
|
|
||
|
|
||
|
def byte_to_bit_array(val):
|
||
|
""" Convert a byte to an array of zeros and ones """
|
||
|
return [((val & (1 << pos)) and 1) or 0 for pos in range(7, -1, -1)]
|
||
|
|
||
|
|
||
|
def bit_array_to_byte(val):
|
||
|
""" Convert an array of zeros and ones to byte """
|
||
|
value = 0
|
||
|
max_idx = len(val) - 1
|
||
|
for i in range(max_idx, -1, -1):
|
||
|
value += val[i] << max_idx - i
|
||
|
return value
|
||
|
|
||
|
# --------------------------------------------------------------------------------
|
||
|
# OPENSSL
|
||
|
# --------------------------------------------------------------------------------
|
||
|
|
||
|
|
||
|
try:
|
||
|
from OpenSSL import crypto as ssl_crypto
|
||
|
import OpenSSL._util as ssl_util
|
||
|
except ImportError:
|
||
|
ssl_crypto = None
|
||
|
_logger.warning("Cannot import library 'OpenSSL' for PKCS#7 envelope extraction.")
|
||
|
|
||
|
|
||
|
def remove_signature_openssl(content):
|
||
|
""" Remove the PKCS#7 envelope from given content, making a '.xml.p7m' file content readable as it was '.xml'.
|
||
|
As OpenSSL may not be installed, in that case a warning is issued and None is returned. """
|
||
|
|
||
|
# Prevent using the library if it had import errors
|
||
|
if not ssl_crypto:
|
||
|
_logger.warning("Error reading the content, check if the OpenSSL library is installed for for PKCS#7 envelope extraction.")
|
||
|
return None
|
||
|
|
||
|
# Load some tools from the library
|
||
|
null = ssl_util.ffi.NULL
|
||
|
verify = ssl_util.lib.PKCS7_verify
|
||
|
|
||
|
# By default ignore the validity of the certificates, just validate the structure
|
||
|
flags = ssl_util.lib.PKCS7_NOVERIFY | ssl_util.lib.PKCS7_NOSIGS
|
||
|
|
||
|
# Read the signed data fron the content
|
||
|
out_buffer = ssl_crypto._new_mem_buf()
|
||
|
|
||
|
# This method is deprecated, but there are actually no alternatives
|
||
|
with warnings.catch_warnings():
|
||
|
warnings.filterwarnings("ignore", category=DeprecationWarning)
|
||
|
loaded_data = ssl_crypto.load_pkcs7_data(ssl_crypto.FILETYPE_ASN1, content)
|
||
|
|
||
|
# Verify the signature
|
||
|
if verify(loaded_data._pkcs7, null, null, null, out_buffer, flags) != 1:
|
||
|
ssl_crypto._raise_current_error()
|
||
|
|
||
|
# Get the content as a byte-string
|
||
|
return ssl_crypto._bio_to_string(out_buffer)
|
||
|
|
||
|
# --------------------------------------------------------------------------------
|
||
|
# FALLBACK REMOVE SIGNATURE (ASN1 parse)
|
||
|
# --------------------------------------------------------------------------------
|
||
|
|
||
|
|
||
|
def remove_signature_fallback(content):
|
||
|
""" The invoice content is inside an ASN1 node identified by PKCS7_DATA_OID (pkcs7-data).
|
||
|
The node is defined as an OctectString, which can be composed of an arbitrary
|
||
|
sequence of octects of string data.
|
||
|
We visit in-order the ASN1 tree nodes until we find the pkcs7-data, then we look for content.
|
||
|
Once we found it, we read all OctectString that get yielded by the in-order visit..
|
||
|
When there are no more OctectStrings, then another object will follow
|
||
|
with its header and identifier, so we stop exploring and just return the content.
|
||
|
|
||
|
See also:
|
||
|
https://datatracker.ietf.org/doc/html/rfc2315
|
||
|
https://www.oss.com/asn1/resources/asn1-made-simple/asn1-quick-reference/octetstring.html
|
||
|
"""
|
||
|
PKCS7_DATA_OID = '1.2.840.113549.1.7.1'
|
||
|
result, header_found, data_found = None, False, False
|
||
|
for node in Reader().build_from_stream(content):
|
||
|
if node.kind == 'ObjectIdentifier' and node.content == PKCS7_DATA_OID:
|
||
|
header_found = True
|
||
|
if header_found and node.kind == 'OctetString':
|
||
|
data_found = True
|
||
|
result = (result or b'') + node.content
|
||
|
elif data_found:
|
||
|
break
|
||
|
|
||
|
if not header_found:
|
||
|
raise Exception("ASN1 Header not found")
|
||
|
if not data_found:
|
||
|
raise Exception("ASN1 Content not found")
|
||
|
return result
|
||
|
|
||
|
# --------------------------------------------------------------------------------
|
||
|
# ASN1 DATA
|
||
|
# --------------------------------------------------------------------------------
|
||
|
|
||
|
|
||
|
universal_tags = {
|
||
|
0: 'Zero',
|
||
|
1: 'Boolean',
|
||
|
2: 'Integer',
|
||
|
3: 'BitString',
|
||
|
4: 'OctetString',
|
||
|
5: 'Null',
|
||
|
6: 'ObjectIdentifier',
|
||
|
7: 'ObjectDescriptor',
|
||
|
8: 'External',
|
||
|
9: 'Real',
|
||
|
10: 'Enumerated',
|
||
|
11: 'EmbeddedPDV',
|
||
|
12: 'UTF8String',
|
||
|
13: 'RelativeOid',
|
||
|
16: 'Sequence',
|
||
|
17: 'Set',
|
||
|
18: 'NumericString',
|
||
|
19: 'PrintableString',
|
||
|
20: 'TeletexString',
|
||
|
21: 'VideotexString',
|
||
|
22: 'IA5String',
|
||
|
23: 'UTCTime',
|
||
|
24: 'GeneralizedTime',
|
||
|
25: 'GraphicString',
|
||
|
26: 'VisibleString',
|
||
|
27: 'GeneralString',
|
||
|
28: 'UniversalString',
|
||
|
29: 'CharacterString',
|
||
|
30: 'BMPString',
|
||
|
}
|
||
|
|
||
|
# --------------------------------------------------------------------------------
|
||
|
# NODES (ASN1 parse)
|
||
|
# --------------------------------------------------------------------------------
|
||
|
|
||
|
|
||
|
class Asn1Node:
|
||
|
""" Base class for Asn1 nodes """
|
||
|
_content = None
|
||
|
|
||
|
def __init__(self, kind, start_offset, node_len, cls, parent=None):
|
||
|
""" Initialization of the Asn1 node """
|
||
|
|
||
|
if not (parent is None or issubclass(Asn1Node, parent.__class__)):
|
||
|
raise TypeError("parent must be an Asn1Node or None")
|
||
|
|
||
|
# Register to parent
|
||
|
self.parent = parent
|
||
|
if parent:
|
||
|
parent.children.append(self)
|
||
|
|
||
|
self.kind = kind
|
||
|
self.start_offset = start_offset
|
||
|
self.children = []
|
||
|
self.cls = cls
|
||
|
self.finalized = False
|
||
|
self.name = self.__class__.__name__.replace('Node', '')
|
||
|
self.length = node_len
|
||
|
|
||
|
def finalize(self, end_offset, content=None):
|
||
|
""" Closes the initialization of the Asn1 node, giving it content and finished length """
|
||
|
self.content = content
|
||
|
self.length = end_offset - self.start_offset
|
||
|
self.end_offset = end_offset
|
||
|
self.finalized = True
|
||
|
|
||
|
def total_length(self):
|
||
|
""" Get the total length of the node if defined. The definition and length bytes must be considered. """
|
||
|
return self.length + 2 if self.length != "?" else "?"
|
||
|
|
||
|
@property
|
||
|
def content(self):
|
||
|
return self._content
|
||
|
|
||
|
@content.setter
|
||
|
def content(self, content):
|
||
|
if content is not None and not isinstance(content, bytes):
|
||
|
raise TypeError("content must be bytes or None")
|
||
|
self._content = content
|
||
|
|
||
|
|
||
|
class PrimitiveNode(Asn1Node):
|
||
|
""" Primitive Asn1 nodes contain pure data """
|
||
|
pass
|
||
|
|
||
|
|
||
|
class OctetStringNode(PrimitiveNode):
|
||
|
""" Octet String Asn1 node """
|
||
|
pass
|
||
|
|
||
|
|
||
|
class ObjectIdentifierNode(PrimitiveNode):
|
||
|
""" Asn1 Object Identifier, i.e. 1.3.6.1.5.5.7.48.1 """
|
||
|
@Asn1Node.content.setter
|
||
|
def content(self, content):
|
||
|
# Run through the content's bytes
|
||
|
calc = 0
|
||
|
result = ''
|
||
|
for idx, octet in enumerate(content):
|
||
|
# The first position is treated differently
|
||
|
if idx == 0:
|
||
|
result += "%s.%s" % (octet // 40, octet % 40)
|
||
|
continue
|
||
|
|
||
|
# Other positions value the less significant 7 bits,
|
||
|
# but the most significant bit is only negative when there's a break
|
||
|
calc = (calc << 7) + (octet % 0x80)
|
||
|
break_it = not bool(octet // 0x80)
|
||
|
if break_it:
|
||
|
result += ".%s" % calc
|
||
|
calc = 0
|
||
|
|
||
|
self._content = result
|
||
|
|
||
|
# --------------------------------------------------------------------------------
|
||
|
# READER (ASN1 parse)
|
||
|
# --------------------------------------------------------------------------------
|
||
|
|
||
|
|
||
|
class Reader:
|
||
|
|
||
|
def __init__(self, *args, **kwargs):
|
||
|
self.clear()
|
||
|
|
||
|
def clear(self):
|
||
|
self.offset = 0
|
||
|
self.root = None
|
||
|
self.current_node = None
|
||
|
self.parent_node = None
|
||
|
self.open_nodes_stack = []
|
||
|
self.last_open_node = None
|
||
|
|
||
|
def finalize_last_open_node(self):
|
||
|
""" Whenever a node is complete, it is finalized, and the references are updated """
|
||
|
self.last_open_node = self.open_nodes_stack.pop()
|
||
|
self.last_open_node.finalize(self.offset, None)
|
||
|
self.parent_node = self.last_open_node.parent
|
||
|
self.current_node = None
|
||
|
finalized_node = self.last_open_node
|
||
|
self.last_open_node = self.open_nodes_stack[-1] if self.open_nodes_stack else None
|
||
|
return finalized_node
|
||
|
|
||
|
def build_from_stream(self, stream):
|
||
|
""" Build an Asn1 tree starting from a byte string from a p7m file """
|
||
|
|
||
|
self.clear()
|
||
|
while self.offset < len(stream):
|
||
|
|
||
|
start_offset = self.offset
|
||
|
self.last_open_node = self.open_nodes_stack[-1] if self.open_nodes_stack else None
|
||
|
|
||
|
# Read the definition and length bytes
|
||
|
definition_byte, self.offset = self.consume('B', stream, self.offset)
|
||
|
node_len, _bytes_read, self.offset = self.read_length(stream, self.offset)
|
||
|
|
||
|
if definition_byte == 0 and node_len == 0 and self.open_nodes_stack:
|
||
|
yield self.finalize_last_open_node()
|
||
|
continue
|
||
|
|
||
|
# Create the current Node
|
||
|
self.current_node = self.create_node(definition_byte, node_len, start_offset, parent=self.parent_node)
|
||
|
if not self.root:
|
||
|
self.root = self.current_node
|
||
|
|
||
|
# If not primitive, add to the stack
|
||
|
if not issubclass(self.current_node.__class__, PrimitiveNode):
|
||
|
self.open_nodes_stack.append(self.current_node)
|
||
|
self.last_open_node = self.current_node
|
||
|
self.parent_node = self.current_node
|
||
|
else:
|
||
|
data, self.offset = self.consume('%ss' % self.current_node.length, stream, self.offset)
|
||
|
self.current_node.finalize(self.offset, data)
|
||
|
yield self.current_node
|
||
|
|
||
|
# Clear the stack of all finished nodes
|
||
|
while (
|
||
|
self.last_open_node
|
||
|
and not self.last_open_node.finalized
|
||
|
and self.last_open_node.length != '?'
|
||
|
and self.last_open_node.start_offset + self.last_open_node.total_length() <= self.offset
|
||
|
):
|
||
|
yield self.finalize_last_open_node()
|
||
|
|
||
|
return self.root
|
||
|
|
||
|
def consume(self, _format, stream, offset):
|
||
|
""" Read from a bytes stream to get data out """
|
||
|
size = struct.calcsize(_format)
|
||
|
value = struct.unpack_from(_format, stream, offset)[0]
|
||
|
offset += size
|
||
|
return value, offset
|
||
|
|
||
|
def read_length(self, stream, offset):
|
||
|
""" Returns: (length of the node, bytes read, updated offset) """
|
||
|
|
||
|
# Read the first byte: if it is zero, it's a special entry.
|
||
|
# Probably it's the second byte of a closing tag of a node (\x00 \x00 <--)
|
||
|
first_byte, offset = self.consume('B', stream, offset)
|
||
|
if first_byte == 0:
|
||
|
return 0, 1, offset
|
||
|
|
||
|
# Convert byte to bits
|
||
|
bits = byte_to_bit_array(first_byte)
|
||
|
|
||
|
# If the first bit of the first length byte is on
|
||
|
if not bits[0]:
|
||
|
return first_byte, 1, offset
|
||
|
|
||
|
# If it's the only bit being set, the length is indefinite,
|
||
|
# and the node will terminate with a double \x00
|
||
|
if not any(bits[1:]):
|
||
|
return '?', 1, offset
|
||
|
|
||
|
# We turn off the first bit, and the rest is the number of bytes we have to read
|
||
|
bytes_read = bit_array_to_byte([0] + bits[1:])
|
||
|
|
||
|
# Each byte we read is less significant, so we increase the significance of the
|
||
|
# value we already read and increment by the current byte
|
||
|
node_len = 0
|
||
|
for dummy in range(1, bytes_read + 1):
|
||
|
current_byte, offset = self.consume('B', stream, offset)
|
||
|
node_len = (node_len << 8) + current_byte
|
||
|
|
||
|
return node_len, bytes_read, offset
|
||
|
|
||
|
def create_node(self, definition_byte, node_len, start_offset, parent=None):
|
||
|
""" Method to create new Asn1 nodes, given the definition bytes and the offset """
|
||
|
|
||
|
target_class = Asn1Node
|
||
|
kind = "Indefinite" if node_len == "?" else "Container"
|
||
|
|
||
|
node_classes = {
|
||
|
(0, 0): 'Universal',
|
||
|
(0, 1): 'Application',
|
||
|
(1, 0): 'Context-specific',
|
||
|
(1, 1): 'Private'
|
||
|
}
|
||
|
bits = byte_to_bit_array(definition_byte)
|
||
|
cls_bits = tuple(bits[0:2])
|
||
|
cls = node_classes[cls_bits]
|
||
|
if cls == 'Universal':
|
||
|
is_primitive = not bool(bits[2])
|
||
|
if is_primitive:
|
||
|
tag = definition_byte % (1 << 5)
|
||
|
kind = universal_tags.get(tag)
|
||
|
if kind:
|
||
|
subclasses = PrimitiveNode.__subclasses__()
|
||
|
target_classes = {x.__name__: x for x in subclasses}
|
||
|
target_class = target_classes.get("%sNode" % kind, PrimitiveNode)
|
||
|
|
||
|
return target_class(kind, start_offset, node_len, cls, parent)
|