946 lines
38 KiB
Python
946 lines
38 KiB
Python
|
# -*- coding: utf-8 -*-
|
||
|
# Part of Odoo. See LICENSE file for full copyright and licensing details.
|
||
|
import base64
|
||
|
import difflib
|
||
|
import io
|
||
|
import pprint
|
||
|
import unittest
|
||
|
|
||
|
from odoo.tests.common import TransactionCase, can_import
|
||
|
from odoo.modules.module import get_module_resource
|
||
|
from odoo.tools import mute_logger, pycompat
|
||
|
from odoo.addons.base_import.models.base_import import ImportValidationError
|
||
|
|
||
|
ID_FIELD = {
|
||
|
'id': 'id',
|
||
|
'name': 'id',
|
||
|
'string': "External ID",
|
||
|
|
||
|
'required': False,
|
||
|
'fields': [],
|
||
|
'type': 'id',
|
||
|
}
|
||
|
|
||
|
|
||
|
def make_field(name='value', string='Value', required=False, fields=None, field_type='id', model_name=None, comodel_name=None):
|
||
|
if fields is None:
|
||
|
fields = []
|
||
|
field = {'id': name, 'name': name, 'string': string, 'required': required, 'fields': fields, 'type': field_type}
|
||
|
if model_name:
|
||
|
field['model_name'] = model_name
|
||
|
if comodel_name:
|
||
|
field['comodel_name'] = comodel_name
|
||
|
return [
|
||
|
ID_FIELD,
|
||
|
field,
|
||
|
]
|
||
|
|
||
|
|
||
|
def sorted_fields(fields):
|
||
|
""" recursively sort field lists to ease comparison """
|
||
|
recursed = [dict(field, fields=sorted_fields(field['fields'])) for field in fields]
|
||
|
return sorted(recursed, key=lambda field: field['id'])
|
||
|
|
||
|
|
||
|
class BaseImportCase(TransactionCase):
|
||
|
|
||
|
def assertEqualFields(self, fields1, fields2):
|
||
|
f1 = sorted_fields(fields1)
|
||
|
f2 = sorted_fields(fields2)
|
||
|
assert f1 == f2, '\n'.join(difflib.unified_diff(
|
||
|
pprint.pformat(f1).splitlines(),
|
||
|
pprint.pformat(f2).splitlines()
|
||
|
))
|
||
|
|
||
|
|
||
|
class TestBasicFields(BaseImportCase):
|
||
|
|
||
|
def get_fields(self, field):
|
||
|
return self.env['base_import.import'].get_fields_tree('base_import.tests.models.' + field)
|
||
|
|
||
|
def test_base(self):
|
||
|
""" A basic field is not required """
|
||
|
self.assertEqualFields(self.get_fields('char'), make_field(field_type='char', model_name='base_import.tests.models.char'))
|
||
|
|
||
|
def test_required(self):
|
||
|
""" Required fields should be flagged (so they can be fill-required) """
|
||
|
self.assertEqualFields(self.get_fields('char.required'), make_field(required=True, field_type='char', model_name='base_import.tests.models.char.required'))
|
||
|
|
||
|
def test_readonly(self):
|
||
|
""" Readonly fields should be filtered out"""
|
||
|
self.assertEqualFields(self.get_fields('char.readonly'), [ID_FIELD])
|
||
|
|
||
|
def test_readonly_states(self):
|
||
|
""" Readonly fields with states should not be filtered out"""
|
||
|
self.assertEqualFields(self.get_fields('char.states'), make_field(field_type='char', model_name='base_import.tests.models.char.states'))
|
||
|
|
||
|
def test_readonly_states_noreadonly(self):
|
||
|
""" Readonly fields with states having nothing to do with
|
||
|
readonly should still be filtered out"""
|
||
|
self.assertEqualFields(self.get_fields('char.noreadonly'), [ID_FIELD])
|
||
|
|
||
|
def test_readonly_states_stillreadonly(self):
|
||
|
""" Readonly fields with readonly states leaving them readonly
|
||
|
always... filtered out"""
|
||
|
self.assertEqualFields(self.get_fields('char.stillreadonly'), [ID_FIELD])
|
||
|
|
||
|
def test_m2o(self):
|
||
|
""" M2O fields should allow import of themselves (name_get),
|
||
|
their id and their xid"""
|
||
|
self.assertEqualFields(self.get_fields('m2o'), make_field(
|
||
|
field_type='many2one', comodel_name='base_import.tests.models.m2o.related', model_name='base_import.tests.models.m2o',
|
||
|
fields=[
|
||
|
{'id': 'value', 'name': 'id', 'string': 'External ID', 'required': False, 'fields': [], 'type': 'id', 'model_name': 'base_import.tests.models.m2o'},
|
||
|
{'id': 'value', 'name': '.id', 'string': 'Database ID', 'required': False, 'fields': [], 'type': 'id', 'model_name': 'base_import.tests.models.m2o'},
|
||
|
]))
|
||
|
|
||
|
def test_m2o_required(self):
|
||
|
""" If an m2o field is required, its three sub-fields are
|
||
|
required as well (the client has to handle that: requiredness
|
||
|
is id-based)
|
||
|
"""
|
||
|
self.assertEqualFields(self.get_fields('m2o.required'), make_field(
|
||
|
field_type='many2one', required=True, comodel_name='base_import.tests.models.m2o.required.related', model_name='base_import.tests.models.m2o.required',
|
||
|
fields=[
|
||
|
{'id': 'value', 'name': 'id', 'string': 'External ID', 'required': True, 'fields': [], 'type': 'id', 'model_name': 'base_import.tests.models.m2o.required'},
|
||
|
{'id': 'value', 'name': '.id', 'string': 'Database ID', 'required': True, 'fields': [], 'type': 'id', 'model_name': 'base_import.tests.models.m2o.required'},
|
||
|
]))
|
||
|
|
||
|
|
||
|
class TestO2M(BaseImportCase):
|
||
|
|
||
|
def get_fields(self, field):
|
||
|
return self.env['base_import.import'].get_fields_tree('base_import.tests.models.' + field)
|
||
|
|
||
|
def test_shallow(self):
|
||
|
self.assertEqualFields(
|
||
|
self.get_fields('o2m'), [
|
||
|
ID_FIELD,
|
||
|
{'id': 'name', 'name': 'name', 'string': "Name", 'required': False, 'fields': [], 'type': 'char', 'model_name': 'base_import.tests.models.o2m'},
|
||
|
{
|
||
|
'id': 'value', 'name': 'value', 'string': 'Value', 'model_name': 'base_import.tests.models.o2m',
|
||
|
'required': False, 'type': 'one2many', 'comodel_name': 'base_import.tests.models.o2m.child',
|
||
|
'fields': [
|
||
|
ID_FIELD,
|
||
|
{
|
||
|
'id': 'parent_id', 'name': 'parent_id', 'model_name': 'base_import.tests.models.o2m.child',
|
||
|
'string': 'Parent', 'type': 'many2one', 'comodel_name': 'base_import.tests.models.o2m',
|
||
|
'required': False, 'fields': [
|
||
|
{'id': 'parent_id', 'name': 'id', 'model_name': 'base_import.tests.models.o2m.child',
|
||
|
'string': 'External ID', 'required': False,
|
||
|
'fields': [], 'type': 'id'},
|
||
|
{'id': 'parent_id', 'name': '.id', 'model_name': 'base_import.tests.models.o2m.child',
|
||
|
'string': 'Database ID', 'required': False,
|
||
|
'fields': [], 'type': 'id'},
|
||
|
]
|
||
|
},
|
||
|
{'id': 'value', 'name': 'value', 'string': 'Value',
|
||
|
'required': False, 'fields': [], 'type': 'integer', 'model_name': 'base_import.tests.models.o2m.child',
|
||
|
},
|
||
|
]
|
||
|
}
|
||
|
]
|
||
|
)
|
||
|
|
||
|
|
||
|
class TestMatchHeadersSingle(TransactionCase):
|
||
|
|
||
|
def test_match_by_name(self):
|
||
|
match = self.env['base_import.import']._get_mapping_suggestion('f0', [{'name': 'f0'}], [], {})
|
||
|
self.assertEqual(match, {'field_path': ['f0'], 'distance': 0})
|
||
|
|
||
|
def test_match_by_string(self):
|
||
|
match = self.env['base_import.import']._get_mapping_suggestion('some field', [{'name': 'bob', 'string': "Some Field"}], [], {})
|
||
|
self.assertEqual(match, {'field_path': ['bob'], 'distance': 0})
|
||
|
|
||
|
def test_nomatch(self):
|
||
|
match = self.env['base_import.import']._get_mapping_suggestion('should not be', [{'name': 'bob', 'string': "wheee"}], [], {})
|
||
|
self.assertEqual(match, {})
|
||
|
|
||
|
def test_close_match(self):
|
||
|
match = self.env['base_import.import']._get_mapping_suggestion('bobe', [{'name': 'bob', 'type': 'char'}], ['char'], {})
|
||
|
self.assertEqual(match, {'field_path': ['bob'], 'distance': 0.1428571428571429})
|
||
|
|
||
|
def test_distant_match(self):
|
||
|
Import = self.env['base_import.import']
|
||
|
header, field_string = 'same Folding', 'Some Field'
|
||
|
match = Import._get_mapping_suggestion(header, [{'name': 'bob', 'string': field_string, 'type': 'char'}], ['char'], {})
|
||
|
string_field_dist = Import._get_distance(header.lower(), field_string.lower())
|
||
|
self.assertEqual(string_field_dist, 0.36363636363636365)
|
||
|
self.assertEqual(match, {}) # if distance >= 0.2, no match returned
|
||
|
|
||
|
def test_recursive_match(self):
|
||
|
f = {
|
||
|
'name': 'f0',
|
||
|
'string': "My Field",
|
||
|
'fields': [
|
||
|
{'name': 'f0', 'string': "Sub field 0", 'fields': []},
|
||
|
{'name': 'f1', 'string': "Sub field 2", 'fields': []},
|
||
|
]
|
||
|
}
|
||
|
match = self.env['base_import.import']._get_mapping_suggestion('f0/f1', [f], [], {})
|
||
|
self.assertEqual(match, {'field_path': [f['name'], f['fields'][1]['name']]})
|
||
|
|
||
|
def test_recursive_nomatch(self):
|
||
|
""" Match first level, fail to match second level
|
||
|
"""
|
||
|
f = {
|
||
|
'name': 'f0',
|
||
|
'string': "My Field",
|
||
|
'fields': [
|
||
|
{'name': 'f0', 'string': "Sub field 0", 'fields': []},
|
||
|
{'name': 'f1', 'string': "Sub field 2", 'fields': []},
|
||
|
]
|
||
|
}
|
||
|
match = self.env['base_import.import']._get_mapping_suggestion('f0/f2', [f], [], {})
|
||
|
self.assertEqual(match, {})
|
||
|
|
||
|
|
||
|
class TestMatchHeadersMultiple(TransactionCase):
|
||
|
|
||
|
def test_noheaders(self):
|
||
|
self.assertEqual(
|
||
|
self.env['base_import.import']._get_mapping_suggestions([], {}, []), {}
|
||
|
)
|
||
|
|
||
|
def test_nomatch(self):
|
||
|
self.assertEqual(
|
||
|
self.env['base_import.import']._get_mapping_suggestions(
|
||
|
['foo', 'bar', 'baz', 'qux'],
|
||
|
{
|
||
|
(0, 'foo'): ['int'],
|
||
|
(1, 'bar'): ['char'],
|
||
|
(2, 'baz'): ['text'],
|
||
|
(3, 'qux'): ['many2one']
|
||
|
},
|
||
|
{}),
|
||
|
{
|
||
|
(0, 'foo'): None,
|
||
|
(1, 'bar'): None,
|
||
|
(2, 'baz'): None,
|
||
|
(3, 'qux'): None
|
||
|
}
|
||
|
)
|
||
|
|
||
|
def test_mixed(self):
|
||
|
self.assertEqual(
|
||
|
self.env['base_import.import']._get_mapping_suggestions(
|
||
|
'foo bar baz qux/corge'.split(),
|
||
|
{
|
||
|
(0, 'foo'): ['int'],
|
||
|
(1, 'bar'): ['char'],
|
||
|
(2, 'baz'): ['text'],
|
||
|
(3, 'qux/corge'): ['text']
|
||
|
},
|
||
|
[
|
||
|
{'name': 'bar', 'string': 'Bar', 'type': 'char'},
|
||
|
{'name': 'bob', 'string': 'Baz', 'type': 'text'},
|
||
|
{'name': 'qux', 'string': 'Qux', 'type': 'many2one', 'fields': [
|
||
|
{'name': 'corge', 'type': 'text', 'fields': []},
|
||
|
]}
|
||
|
]),
|
||
|
{
|
||
|
(0, 'foo'): None,
|
||
|
(1, 'bar'): {'field_path': ['bar'], 'distance': 0},
|
||
|
(2, 'baz'): {'field_path': ['bob'], 'distance': 0},
|
||
|
(3, 'qux/corge'): {'field_path': ['qux', 'corge']}
|
||
|
}
|
||
|
)
|
||
|
|
||
|
|
||
|
class TestColumnMapping(TransactionCase):
|
||
|
|
||
|
def test_column_mapping(self):
|
||
|
import_record = self.env['base_import.import'].create({
|
||
|
'res_model': 'base_import.tests.models.preview',
|
||
|
'file': u"Name,Some Value,value\n"
|
||
|
u"chhagan,10,1\n"
|
||
|
u"magan,20,2\n".encode('utf-8'),
|
||
|
'file_type': 'text/csv',
|
||
|
'file_name': 'data.csv',
|
||
|
})
|
||
|
import_record.execute_import(
|
||
|
['name', 'somevalue', 'othervalue'],
|
||
|
['Name', 'Some Value', 'value'],
|
||
|
{'quoting': '"', 'separator': ',', 'has_headers': True},
|
||
|
True
|
||
|
)
|
||
|
fields = self.env['base_import.mapping'].search_read(
|
||
|
[('res_model', '=', 'base_import.tests.models.preview')],
|
||
|
['column_name', 'field_name']
|
||
|
)
|
||
|
self.assertItemsEqual([f['column_name'] for f in fields], ['Name', 'Some Value', 'value'])
|
||
|
self.assertItemsEqual([f['field_name'] for f in fields], ['somevalue', 'name', 'othervalue'])
|
||
|
|
||
|
def test_fuzzy_match_distance(self):
|
||
|
values_to_test = [
|
||
|
('opportunities', 'opportinuties'),
|
||
|
('opportunities', 'opportunate'),
|
||
|
('opportunities', 'operable'),
|
||
|
('opportunities', 'purchasing'),
|
||
|
('lead_id', 'laed_id'),
|
||
|
('lead_id', 'leen_id'),
|
||
|
('lead_id', 'let_id_be'),
|
||
|
('lead_id', 'not related'),
|
||
|
]
|
||
|
|
||
|
Import = self.env['base_import.import']
|
||
|
max_distance = 0.2 # see FUZZY_MATCH_DISTANCE. We don't use it here to avoid making test work after modifying this constant.
|
||
|
for value in values_to_test:
|
||
|
distance = Import._get_distance(value[0].lower(), value[1].lower())
|
||
|
model_fields_info = [{'name': value[0], 'string': value[0], 'type': 'char'}]
|
||
|
match = self.env['base_import.import']._get_mapping_suggestion(value[1], model_fields_info, ['char'], {})
|
||
|
|
||
|
self.assertEqual(
|
||
|
bool(match), distance < max_distance
|
||
|
)
|
||
|
|
||
|
|
||
|
class TestPreview(TransactionCase):
|
||
|
|
||
|
def make_import(self):
|
||
|
import_wizard = self.env['base_import.import'].create({
|
||
|
'res_model': 'res.users',
|
||
|
'file': u"로그인,언어\nbob,1\n".encode('euc_kr'),
|
||
|
'file_type': 'text/csv',
|
||
|
'file_name': 'kr_data.csv',
|
||
|
})
|
||
|
return import_wizard
|
||
|
|
||
|
@mute_logger('odoo.addons.base_import.models.base_import')
|
||
|
def test_encoding(self):
|
||
|
import_wizard = self.make_import()
|
||
|
result = import_wizard.parse_preview({
|
||
|
'quoting': '"',
|
||
|
'separator': ',',
|
||
|
})
|
||
|
self.assertFalse('error' in result)
|
||
|
|
||
|
@mute_logger('odoo.addons.base_import.models.base_import')
|
||
|
def test_csv_errors(self):
|
||
|
import_wizard = self.make_import()
|
||
|
|
||
|
result = import_wizard.parse_preview({
|
||
|
'quoting': 'foo',
|
||
|
'separator': ',',
|
||
|
})
|
||
|
self.assertTrue('error' in result)
|
||
|
|
||
|
result = import_wizard.parse_preview({
|
||
|
'quoting': '"',
|
||
|
'separator': 'bob',
|
||
|
})
|
||
|
self.assertTrue('error' in result)
|
||
|
|
||
|
def test_csv_success(self):
|
||
|
import_wizard = self.env['base_import.import'].create({
|
||
|
'res_model': 'base_import.tests.models.preview',
|
||
|
'file': b'name,Some Value,Counter\n'
|
||
|
b'foo,,\n'
|
||
|
b'bar,,4\n'
|
||
|
b'qux,5,6\n',
|
||
|
'file_type': 'text/csv'
|
||
|
})
|
||
|
|
||
|
result = import_wizard.parse_preview({
|
||
|
'quoting': '"',
|
||
|
'separator': ',',
|
||
|
'has_headers': True,
|
||
|
})
|
||
|
self.assertIsNone(result.get('error'))
|
||
|
self.assertEqual(result['matches'], {0: ['name'], 1: ['somevalue']})
|
||
|
self.assertEqual(result['headers'], ['name', 'Some Value', 'Counter'])
|
||
|
# Order depends on iteration order of fields_get
|
||
|
self.assertItemsEqual(result['fields'], [
|
||
|
ID_FIELD,
|
||
|
{'id': 'name', 'name': 'name', 'string': 'Name', 'required': False, 'fields': [], 'type': 'char', 'model_name': 'base_import.tests.models.preview'},
|
||
|
{'id': 'somevalue', 'name': 'somevalue', 'string': 'Some Value', 'required': True, 'fields': [], 'type': 'integer', 'model_name': 'base_import.tests.models.preview'},
|
||
|
{'id': 'othervalue', 'name': 'othervalue', 'string': 'Other Variable', 'required': False, 'fields': [], 'type': 'integer', 'model_name': 'base_import.tests.models.preview'},
|
||
|
])
|
||
|
self.assertEqual(result['preview'], [['foo', 'bar', 'qux'], ['5'], ['4', '6']])
|
||
|
|
||
|
@unittest.skipUnless(can_import('xlrd'), "XLRD module not available")
|
||
|
def test_xls_success(self):
|
||
|
xls_file_path = get_module_resource('base_import', 'tests', 'test.xls')
|
||
|
file_content = open(xls_file_path, 'rb').read()
|
||
|
import_wizard = self.env['base_import.import'].create({
|
||
|
'res_model': 'base_import.tests.models.preview',
|
||
|
'file': file_content,
|
||
|
'file_type': 'application/vnd.ms-excel'
|
||
|
})
|
||
|
|
||
|
result = import_wizard.parse_preview({
|
||
|
'has_headers': True,
|
||
|
})
|
||
|
self.assertIsNone(result.get('error'))
|
||
|
self.assertEqual(result['matches'], {0: ['name'], 1: ['somevalue']})
|
||
|
self.assertEqual(result['headers'], ['name', 'Some Value', 'Counter'])
|
||
|
self.assertItemsEqual(result['fields'], [
|
||
|
ID_FIELD,
|
||
|
{'id': 'name', 'name': 'name', 'string': 'Name', 'required': False, 'fields': [], 'type': 'char', 'model_name': 'base_import.tests.models.preview'},
|
||
|
{'id': 'somevalue', 'name': 'somevalue', 'string': 'Some Value', 'required': True, 'fields': [], 'type': 'integer', 'model_name': 'base_import.tests.models.preview'},
|
||
|
{'id': 'othervalue', 'name': 'othervalue', 'string': 'Other Variable', 'required': False, 'fields': [], 'type': 'integer', 'model_name': 'base_import.tests.models.preview'},
|
||
|
])
|
||
|
self.assertEqual(result['preview'], [['foo', 'bar', 'qux'], ['1', '3', '5'], ['2', '4', '6']])
|
||
|
|
||
|
@unittest.skipUnless(can_import('xlrd.xlsx') or can_import('openpyxl'), "XLRD/XLSX not available")
|
||
|
def test_xlsx_success(self):
|
||
|
xlsx_file_path = get_module_resource('base_import', 'tests', 'test.xlsx')
|
||
|
file_content = open(xlsx_file_path, 'rb').read()
|
||
|
import_wizard = self.env['base_import.import'].create({
|
||
|
'res_model': 'base_import.tests.models.preview',
|
||
|
'file': file_content,
|
||
|
'file_type': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
|
||
|
})
|
||
|
|
||
|
result = import_wizard.parse_preview({
|
||
|
'has_headers': True,
|
||
|
})
|
||
|
self.assertIsNone(result.get('error'))
|
||
|
self.assertEqual(result['matches'], {0: ['name'], 1: ['somevalue']})
|
||
|
self.assertEqual(result['headers'], ['name', 'Some Value', 'Counter'])
|
||
|
self.assertItemsEqual(result['fields'], [
|
||
|
ID_FIELD,
|
||
|
{'id': 'name', 'name': 'name', 'string': 'Name', 'required': False, 'fields': [], 'type': 'char', 'model_name': 'base_import.tests.models.preview'},
|
||
|
{'id': 'somevalue', 'name': 'somevalue', 'string': 'Some Value', 'required': True, 'fields': [], 'type': 'integer', 'model_name': 'base_import.tests.models.preview'},
|
||
|
{'id': 'othervalue', 'name': 'othervalue', 'string': 'Other Variable', 'required': False, 'fields': [], 'type': 'integer', 'model_name': 'base_import.tests.models.preview'},
|
||
|
])
|
||
|
self.assertEqual(result['preview'], [['foo', 'bar', 'qux'], ['1', '3', '5'], ['2', '4', '6']])
|
||
|
|
||
|
@unittest.skipUnless(can_import('odf'), "ODFPY not available")
|
||
|
def test_ods_success(self):
|
||
|
ods_file_path = get_module_resource('base_import', 'tests', 'test.ods')
|
||
|
file_content = open(ods_file_path, 'rb').read()
|
||
|
import_wizard = self.env['base_import.import'].create({
|
||
|
'res_model': 'base_import.tests.models.preview',
|
||
|
'file': file_content,
|
||
|
'file_type': 'application/vnd.oasis.opendocument.spreadsheet'
|
||
|
})
|
||
|
|
||
|
result = import_wizard.parse_preview({
|
||
|
'has_headers': True,
|
||
|
})
|
||
|
self.assertIsNone(result.get('error'))
|
||
|
self.assertEqual(result['matches'], {0: ['name'], 1: ['somevalue']})
|
||
|
self.assertEqual(result['headers'], ['name', 'Some Value', 'Counter'])
|
||
|
self.assertItemsEqual(result['fields'], [
|
||
|
ID_FIELD,
|
||
|
{'id': 'name', 'name': 'name', 'string': 'Name', 'required': False, 'fields': [], 'type': 'char', 'model_name': 'base_import.tests.models.preview'},
|
||
|
{'id': 'somevalue', 'name': 'somevalue', 'string': 'Some Value', 'required': True, 'fields': [], 'type': 'integer', 'model_name': 'base_import.tests.models.preview'},
|
||
|
{'id': 'othervalue', 'name': 'othervalue', 'string': 'Other Variable', 'required': False, 'fields': [], 'type': 'integer', 'model_name': 'base_import.tests.models.preview'},
|
||
|
])
|
||
|
self.assertEqual(result['preview'], [['foo', 'bar', 'aux'], ['1', '3', '5'], ['2', '4', '6']])
|
||
|
|
||
|
|
||
|
class test_convert_import_data(TransactionCase):
|
||
|
""" Tests conversion of base_import.import input into data which
|
||
|
can be fed to Model.load
|
||
|
"""
|
||
|
def test_all(self):
|
||
|
import_wizard = self.env['base_import.import'].create({
|
||
|
'res_model': 'base_import.tests.models.preview',
|
||
|
'file': b'name,Some Value,Counter\n'
|
||
|
b'foo,1,2\n'
|
||
|
b'bar,3,4\n'
|
||
|
b'qux,5,6\n',
|
||
|
'file_type': 'text/csv'
|
||
|
|
||
|
})
|
||
|
data, fields = import_wizard._convert_import_data(
|
||
|
['name', 'somevalue', 'othervalue'],
|
||
|
{'quoting': '"', 'separator': ',', 'has_headers': True}
|
||
|
)
|
||
|
|
||
|
self.assertItemsEqual(fields, ['name', 'somevalue', 'othervalue'])
|
||
|
self.assertItemsEqual(data, [
|
||
|
['foo', '1', '2'],
|
||
|
['bar', '3', '4'],
|
||
|
['qux', '5', '6'],
|
||
|
])
|
||
|
|
||
|
def test_date_fields(self):
|
||
|
import_wizard = self.env['base_import.import'].create({
|
||
|
'res_model': 'res.partner',
|
||
|
'file': u'name,date,create_date\n'
|
||
|
u'"foo","2013年07月18日","2016-10-12 06:06"\n'.encode('utf-8'),
|
||
|
'file_type': 'text/csv'
|
||
|
|
||
|
})
|
||
|
|
||
|
results = import_wizard.execute_import(
|
||
|
['name', 'date', 'create_date'],
|
||
|
[],
|
||
|
{
|
||
|
'date_format': '%Y年%m月%d日',
|
||
|
'datetime_format': '%Y-%m-%d %H:%M',
|
||
|
'quoting': '"',
|
||
|
'separator': ',',
|
||
|
'has_headers': True
|
||
|
}
|
||
|
)
|
||
|
|
||
|
# if results empty, no errors
|
||
|
self.assertItemsEqual(results['messages'], [])
|
||
|
|
||
|
def test_date_fields_no_options(self):
|
||
|
import_wizard = self.env['base_import.import'].with_context(lang='de_DE').create({
|
||
|
'res_model': 'res.partner',
|
||
|
'file': 'name,date,create_date\n'
|
||
|
'"foo","15.10.2023","15.10.2023 15:15:15"\n'.encode('utf-8'),
|
||
|
'file_type': 'text/csv',
|
||
|
})
|
||
|
|
||
|
opts = {
|
||
|
'date_format': '',
|
||
|
'datetime_format': '',
|
||
|
'quoting': '"',
|
||
|
'separator': ',',
|
||
|
'float_decimal_separator': '.',
|
||
|
'float_thousand_separator': ',',
|
||
|
'has_headers': True
|
||
|
}
|
||
|
result_parse = import_wizard.parse_preview({**opts})
|
||
|
|
||
|
opts = result_parse['options']
|
||
|
results = import_wizard.execute_import(
|
||
|
['name', 'date', 'create_date'],
|
||
|
[],
|
||
|
{**opts}
|
||
|
)
|
||
|
|
||
|
# if results empty, no errors
|
||
|
self.assertItemsEqual(results['messages'], [])
|
||
|
|
||
|
def test_parse_relational_fields(self):
|
||
|
""" Ensure that relational fields float and date are correctly
|
||
|
parsed during the import call.
|
||
|
"""
|
||
|
import_wizard = self.env['base_import.import'].create({
|
||
|
'res_model': 'res.partner',
|
||
|
'file': u'name,parent_id/id,parent_id/date,parent_id/partner_latitude\n'
|
||
|
u'"foo","__export__.res_partner_1","2017年10月12日","5,69"\n'.encode('utf-8'),
|
||
|
'file_type': 'text/csv'
|
||
|
|
||
|
})
|
||
|
options = {
|
||
|
'date_format': '%Y年%m月%d日',
|
||
|
'quoting': '"',
|
||
|
'separator': ',',
|
||
|
'float_decimal_separator': ',',
|
||
|
'float_thousand_separator': '.',
|
||
|
'has_headers': True
|
||
|
}
|
||
|
data, import_fields = import_wizard._convert_import_data(
|
||
|
['name', 'parent_id/.id', 'parent_id/date', 'parent_id/partner_latitude'],
|
||
|
options
|
||
|
)
|
||
|
result = import_wizard._parse_import_data(data, import_fields, options)
|
||
|
# Check if the data 5,69 as been correctly parsed.
|
||
|
self.assertEqual(float(result[0][-1]), 5.69)
|
||
|
self.assertEqual(str(result[0][-2]), '2017-10-12')
|
||
|
|
||
|
def test_parse_scientific_notation(self):
|
||
|
""" Ensure that scientific notation is correctly converted to decimal """
|
||
|
import_wizard = self.env['base_import.import']
|
||
|
|
||
|
test_options = {}
|
||
|
test_data = [
|
||
|
["1E+05"],
|
||
|
["1.20E-05"],
|
||
|
["1,9e5"],
|
||
|
["9,5e-5"],
|
||
|
]
|
||
|
expected_result = [
|
||
|
["100000.000000"],
|
||
|
["0.000012"],
|
||
|
["190000.000000"],
|
||
|
["0.000095"],
|
||
|
]
|
||
|
|
||
|
import_wizard._parse_float_from_data(test_data, 0, 'test-name', test_options)
|
||
|
self.assertEqual(test_data, expected_result)
|
||
|
|
||
|
def test_filtered(self):
|
||
|
""" If ``False`` is provided as field mapping for a column,
|
||
|
that column should be removed from importable data
|
||
|
"""
|
||
|
import_wizard = self.env['base_import.import'].create({
|
||
|
'res_model': 'base_import.tests.models.preview',
|
||
|
'file': b'name,Some Value,Counter\n'
|
||
|
b'foo,1,2\n'
|
||
|
b'bar,3,4\n'
|
||
|
b'qux,5,6\n',
|
||
|
'file_type': 'text/csv'
|
||
|
})
|
||
|
data, fields = import_wizard._convert_import_data(
|
||
|
['name', False, 'othervalue'],
|
||
|
{'quoting': '"', 'separator': ',', 'has_headers': True}
|
||
|
)
|
||
|
|
||
|
self.assertItemsEqual(fields, ['name', 'othervalue'])
|
||
|
self.assertItemsEqual(data, [
|
||
|
['foo', '2'],
|
||
|
['bar', '4'],
|
||
|
['qux', '6'],
|
||
|
])
|
||
|
|
||
|
def test_norow(self):
|
||
|
""" If a row is composed only of empty values (due to having
|
||
|
filtered out non-empty values from it), it should be removed
|
||
|
"""
|
||
|
import_wizard = self.env['base_import.import'].create({
|
||
|
'res_model': 'base_import.tests.models.preview',
|
||
|
'file': b'name,Some Value,Counter\n'
|
||
|
b'foo,1,2\n'
|
||
|
b',3,\n'
|
||
|
b',5,6\n',
|
||
|
'file_type': 'text/csv'
|
||
|
})
|
||
|
data, fields = import_wizard._convert_import_data(
|
||
|
['name', False, 'othervalue'],
|
||
|
{'quoting': '"', 'separator': ',', 'has_headers': True}
|
||
|
)
|
||
|
|
||
|
self.assertItemsEqual(fields, ['name', 'othervalue'])
|
||
|
self.assertItemsEqual(data, [
|
||
|
['foo', '2'],
|
||
|
['', '6'],
|
||
|
])
|
||
|
|
||
|
def test_empty_rows(self):
|
||
|
import_wizard = self.env['base_import.import'].create({
|
||
|
'res_model': 'base_import.tests.models.preview',
|
||
|
'file': b'name,Some Value\n'
|
||
|
b'foo,1\n'
|
||
|
b'\n'
|
||
|
b'bar,2\n'
|
||
|
b' \n'
|
||
|
b'\t \n',
|
||
|
'file_type': 'text/csv'
|
||
|
})
|
||
|
data, fields = import_wizard._convert_import_data(
|
||
|
['name', 'somevalue'],
|
||
|
{'quoting': '"', 'separator': ',', 'has_headers': True}
|
||
|
)
|
||
|
|
||
|
self.assertItemsEqual(fields, ['name', 'somevalue'])
|
||
|
self.assertItemsEqual(data, [
|
||
|
['foo', '1'],
|
||
|
['bar', '2'],
|
||
|
])
|
||
|
|
||
|
def test_nofield(self):
|
||
|
import_wizard = self.env['base_import.import'].create({
|
||
|
'res_model': 'base_import.tests.models.preview',
|
||
|
'file': b'name,Some Value,Counter\n'
|
||
|
b'foo,1,2\n',
|
||
|
'file_type': 'text/csv'
|
||
|
|
||
|
})
|
||
|
self.assertRaises(ImportValidationError, import_wizard._convert_import_data, [], {'quoting': '"', 'separator': ',', 'has_headers': True})
|
||
|
|
||
|
def test_falsefields(self):
|
||
|
import_wizard = self.env['base_import.import'].create({
|
||
|
'res_model': 'base_import.tests.models.preview',
|
||
|
'file': b'name,Some Value,Counter\n'
|
||
|
b'foo,1,2\n',
|
||
|
'file_type': 'text/csv'
|
||
|
})
|
||
|
|
||
|
self.assertRaises(
|
||
|
ImportValidationError,
|
||
|
import_wizard._convert_import_data,
|
||
|
[False, False, False],
|
||
|
{'quoting': '"', 'separator': ',', 'has_headers': True})
|
||
|
|
||
|
def test_newline_import(self):
|
||
|
"""
|
||
|
Ensure importing keep newlines
|
||
|
"""
|
||
|
output = io.BytesIO()
|
||
|
writer = pycompat.csv_writer(output, quoting=1)
|
||
|
|
||
|
data_row = [u"\tfoo\n\tbar", u" \"hello\" \n\n 'world' "]
|
||
|
|
||
|
writer.writerow([u"name", u"Some Value"])
|
||
|
writer.writerow(data_row)
|
||
|
|
||
|
import_wizard = self.env['base_import.import'].create({
|
||
|
'res_model': 'base_import.tests.models.preview',
|
||
|
'file': output.getvalue(),
|
||
|
'file_type': 'text/csv',
|
||
|
})
|
||
|
data, _ = import_wizard._convert_import_data(
|
||
|
['name', 'somevalue'],
|
||
|
{'quoting': '"', 'separator': ',', 'has_headers': True}
|
||
|
)
|
||
|
|
||
|
self.assertItemsEqual(data, [data_row])
|
||
|
|
||
|
def test_set_empty_value_import(self):
|
||
|
partners_before = self.env['res.partner'].search([])
|
||
|
import_wizard = self.env['base_import.import'].create({
|
||
|
'res_model': 'res.partner',
|
||
|
'file': """foo,US,person\n
|
||
|
foo1,Invalid Country,person\n
|
||
|
foo2,US,persons\n""",
|
||
|
'file_type': 'text/csv'
|
||
|
})
|
||
|
|
||
|
results = import_wizard.execute_import(
|
||
|
['name', 'country_id', 'company_type'],
|
||
|
[],
|
||
|
{
|
||
|
'quoting': '"',
|
||
|
'separator': ',',
|
||
|
'import_set_empty_fields': ['country_id', 'company_type'],
|
||
|
}
|
||
|
)
|
||
|
partners_now = self.env['res.partner'].search([]) - partners_before
|
||
|
self.assertEqual(len(results['ids']), 3, "should have imported the first 3 records in full, got %s" % results['ids'])
|
||
|
|
||
|
self.assertEqual(partners_now[0].name, 'foo', "New partner's name should be foo")
|
||
|
self.assertEqual(partners_now[0].country_id.id, self.env.ref('base.us').id, "Foo partner's country should be US")
|
||
|
self.assertEqual(partners_now[0].company_type, 'person', "Foo partner's country should be person")
|
||
|
|
||
|
self.assertEqual(partners_now[1].country_id.id, False, "foo1 partner's country should be False")
|
||
|
|
||
|
self.assertEqual(partners_now[2].company_type, False, "foo2 partner's country should be False")
|
||
|
# if results empty, no errors
|
||
|
self.assertItemsEqual(results['messages'], [])
|
||
|
|
||
|
def test_skip_record_import(self):
|
||
|
partners_before = self.env['res.partner'].search([])
|
||
|
import_wizard = self.env['base_import.import'].create({
|
||
|
'res_model': 'res.partner',
|
||
|
'file': """foo,US,0,person\n
|
||
|
foo1,Invalid Country,0,person\n
|
||
|
foo2,US,False Value,person\n
|
||
|
foo3,US,0,persons\n""",
|
||
|
'file_type': 'text/csv'
|
||
|
})
|
||
|
|
||
|
results = import_wizard.execute_import(
|
||
|
['name', 'country_id', 'is_company', 'company_type'],
|
||
|
[],
|
||
|
{
|
||
|
'quoting': '"',
|
||
|
'separator': ',',
|
||
|
'import_skip_records': ['country_id', 'is_company', 'company_type']
|
||
|
}
|
||
|
)
|
||
|
partners_now = self.env['res.partner'].search([]) - partners_before
|
||
|
|
||
|
self.assertEqual(len(results['ids']), 1, "should have imported the first record in full, got %s" % results['ids'])
|
||
|
self.assertEqual(partners_now.name, 'foo', "New partner's name should be foo")
|
||
|
# if results empty, no errors
|
||
|
self.assertItemsEqual(results['messages'], [])
|
||
|
|
||
|
def test_multi_mapping(self):
|
||
|
""" Test meant specifically for the '_handle_multi_mapping' that allows mapping multiple
|
||
|
columns to the same field and merging the values together.
|
||
|
|
||
|
It makes sure that values of type Char and Many2many are correctly merged. """
|
||
|
|
||
|
tag1, tag2, tag3 = self.env['res.partner.category'].create([{
|
||
|
'name': 'tag1',
|
||
|
}, {
|
||
|
'name': 'tag2',
|
||
|
}, {
|
||
|
'name': 'tag3',
|
||
|
}])
|
||
|
|
||
|
file_partner_values = [
|
||
|
['Mitchel', 'US', 'Admin', 'The Admin User', 'tag1,tag2', 'tag3'],
|
||
|
['Marc', 'US', 'Demo', 'The Demo User', '', 'tag3'],
|
||
|
['Joel', 'US', 'Portal', '', 'tag1', 'tag3'],
|
||
|
]
|
||
|
|
||
|
existing_partners = self.env['res.partner'].search_read([], ['id'])
|
||
|
import_wizard = self.env['base_import.import'].create({
|
||
|
'res_model': 'res.partner',
|
||
|
'file': '\n'.join([';'.join(partner_values) for partner_values in file_partner_values]),
|
||
|
'file_type': 'text/csv',
|
||
|
})
|
||
|
|
||
|
results = import_wizard.execute_import(
|
||
|
['name', 'country_id', 'name', 'name', 'category_id', 'category_id'],
|
||
|
[],
|
||
|
{
|
||
|
'quoting': '"',
|
||
|
'separator': ';',
|
||
|
},
|
||
|
)
|
||
|
|
||
|
# if result is empty, no import error
|
||
|
self.assertItemsEqual(results['messages'], [])
|
||
|
|
||
|
partners = self.env['res.partner'].search([
|
||
|
('id', 'not in', [existing_partner['id'] for existing_partner in existing_partners])
|
||
|
], order='id asc')
|
||
|
|
||
|
self.assertEqual(3, len(partners))
|
||
|
self.assertEqual('Mitchel Admin The Admin User', partners[0].name)
|
||
|
self.assertEqual('Marc Demo The Demo User', partners[1].name)
|
||
|
self.assertEqual('Joel Portal', partners[2].name)
|
||
|
|
||
|
self.assertEqual(tag1 | tag2 | tag3, partners[0].category_id)
|
||
|
self.assertEqual(tag3, partners[1].category_id)
|
||
|
self.assertEqual(tag1 | tag3, partners[2].category_id)
|
||
|
|
||
|
class TestBatching(TransactionCase):
|
||
|
def _makefile(self, rows):
|
||
|
f = io.BytesIO()
|
||
|
writer = pycompat.csv_writer(f, quoting=1)
|
||
|
writer.writerow(['name', 'counter'])
|
||
|
for i in range(rows):
|
||
|
writer.writerow(['n_%d' % i, str(i)])
|
||
|
return f.getvalue()
|
||
|
|
||
|
def test_recognize_batched(self):
|
||
|
import_wizard = self.env['base_import.import'].create({
|
||
|
'res_model': 'base_import.tests.models.preview',
|
||
|
'file_type': 'text/csv',
|
||
|
})
|
||
|
|
||
|
import_wizard.file = self._makefile(10)
|
||
|
result = import_wizard.parse_preview({
|
||
|
'quoting': '"',
|
||
|
'separator': ',',
|
||
|
'has_headers': True,
|
||
|
'limit': 100,
|
||
|
})
|
||
|
self.assertIsNone(result.get('error'))
|
||
|
self.assertIs(result['batch'], False)
|
||
|
|
||
|
result = import_wizard.parse_preview({
|
||
|
'quoting': '"',
|
||
|
'separator': ',',
|
||
|
'has_headers': True,
|
||
|
'limit': 5,
|
||
|
})
|
||
|
self.assertIsNone(result.get('error'))
|
||
|
self.assertIs(result['batch'], True)
|
||
|
|
||
|
def test_limit_on_lines(self):
|
||
|
""" The limit option should be a limit on the number of *lines*
|
||
|
imported at at time, not the number of *records*. This is relevant
|
||
|
when it comes to embedded o2m.
|
||
|
|
||
|
A big question is whether we want to round up or down (if the limit
|
||
|
brings us inside a record). Rounding up (aka finishing up the record
|
||
|
we're currently parsing) seems like a better idea:
|
||
|
|
||
|
* if the first record has so many sub-lines it hits the limit we still
|
||
|
want to import it (it's probably extremely rare but it can happen)
|
||
|
* if we have one line per record, we probably want to import <limit>
|
||
|
records not <limit-1>, but if we stop in the middle of the "current
|
||
|
record" we'd always ignore the last record (I think)
|
||
|
"""
|
||
|
f = io.BytesIO()
|
||
|
writer = pycompat.csv_writer(f, quoting=1)
|
||
|
writer.writerow(['name', 'value/value'])
|
||
|
for record in range(10):
|
||
|
writer.writerow(['record_%d' % record, '0'])
|
||
|
for row in range(1, 10):
|
||
|
writer.writerow(['', str(row)])
|
||
|
|
||
|
import_wizard = self.env['base_import.import'].create({
|
||
|
'res_model': 'base_import.tests.models.o2m',
|
||
|
'file_type': 'text/csv',
|
||
|
'file_name': 'things.csv',
|
||
|
'file': f.getvalue(),
|
||
|
})
|
||
|
opts = {'quoting': '"', 'separator': ',', 'has_headers': True}
|
||
|
preview = import_wizard.parse_preview({**opts, 'limit': 15})
|
||
|
self.assertIs(preview['batch'], True)
|
||
|
|
||
|
results = import_wizard.execute_import(
|
||
|
['name', 'value/value'], [],
|
||
|
{**opts, 'limit': 5}
|
||
|
)
|
||
|
self.assertFalse(results['messages'])
|
||
|
self.assertEqual(len(results['ids']), 1, "should have imported the first record in full, got %s" % results['ids'])
|
||
|
self.assertEqual(results['nextrow'], 10)
|
||
|
|
||
|
results = import_wizard.execute_import(
|
||
|
['name', 'value/value'], [],
|
||
|
{**opts, 'limit': 15}
|
||
|
)
|
||
|
self.assertFalse(results['messages'])
|
||
|
self.assertEqual(len(results['ids']), 2, "should have importe the first two records, got %s" % results['ids'])
|
||
|
self.assertEqual(results['nextrow'], 20)
|
||
|
|
||
|
|
||
|
def test_batches(self):
|
||
|
partners_before = self.env['res.partner'].search([])
|
||
|
opts = {'has_headers': True, 'separator': ',', 'quoting': '"'}
|
||
|
|
||
|
import_wizard = self.env['base_import.import'].create({
|
||
|
'res_model': 'res.partner',
|
||
|
'file_type': 'text/csv',
|
||
|
'file_name': 'clients.csv',
|
||
|
'file': b"""name,email
|
||
|
a,a@example.com
|
||
|
b,b@example.com
|
||
|
,
|
||
|
c,c@example.com
|
||
|
d,d@example.com
|
||
|
e,e@example.com
|
||
|
f,f@example.com
|
||
|
g,g@example.com
|
||
|
"""
|
||
|
})
|
||
|
|
||
|
results = import_wizard.execute_import(['name', 'email'], [], {**opts, 'limit': 1})
|
||
|
self.assertFalse(results['messages'])
|
||
|
self.assertEqual(len(results['ids']), 1)
|
||
|
# titlerow is ignored by lastrow's counter
|
||
|
self.assertEqual(results['nextrow'], 1)
|
||
|
partners_1 = self.env['res.partner'].search([]) - partners_before
|
||
|
self.assertEqual(partners_1.name, 'a')
|
||
|
|
||
|
results = import_wizard.execute_import(['name', 'email'], [], {**opts, 'limit': 2, 'skip': 1})
|
||
|
self.assertFalse(results['messages'])
|
||
|
self.assertEqual(len(results['ids']), 2)
|
||
|
# empty row should also be ignored
|
||
|
self.assertEqual(results['nextrow'], 3)
|
||
|
partners_2 = self.env['res.partner'].search([]) - (partners_before | partners_1)
|
||
|
self.assertEqual(partners_2.mapped('name'), ['b', 'c'])
|
||
|
|
||
|
results = import_wizard.execute_import(['name', 'email'], [], {**opts, 'limit': 10, 'skip': 3})
|
||
|
self.assertFalse(results['messages'])
|
||
|
self.assertEqual(len(results['ids']), 4)
|
||
|
self.assertEqual(results['nextrow'], 0)
|
||
|
partners_3 = self.env['res.partner'].search([]) - (partners_before | partners_1 | partners_2)
|
||
|
self.assertEqual(partners_3.mapped('name'), ['d', 'e', 'f', 'g'])
|
||
|
|
||
|
|
||
|
class test_failures(TransactionCase):
|
||
|
def test_big_attachments(self):
|
||
|
"""
|
||
|
Ensure big fields (e.g. b64-encoded image data) can be imported and
|
||
|
we're not hitting limits of the default CSV parser config
|
||
|
"""
|
||
|
from PIL import Image
|
||
|
|
||
|
im = Image.new('RGB', (1920, 1080))
|
||
|
fout = io.BytesIO()
|
||
|
|
||
|
writer = pycompat.csv_writer(fout, dialect=None)
|
||
|
writer.writerows([
|
||
|
[u'name', u'db_datas'],
|
||
|
[u'foo', base64.b64encode(im.tobytes()).decode('ascii')]
|
||
|
])
|
||
|
|
||
|
import_wizard = self.env['base_import.import'].create({
|
||
|
'res_model': 'ir.attachment',
|
||
|
'file': fout.getvalue(),
|
||
|
'file_type': 'text/csv'
|
||
|
})
|
||
|
results = import_wizard.execute_import(
|
||
|
['name', 'db_datas'],
|
||
|
[],
|
||
|
{'has_headers': True, 'separator': ',', 'quoting': '"'})
|
||
|
self.assertFalse(results['messages'], "results should be empty on successful import")
|