547 lines
20 KiB
Python
547 lines
20 KiB
Python
import json
|
|
import os
|
|
import tempfile
|
|
import unittest
|
|
|
|
from contextlib import contextmanager
|
|
from subprocess import run, PIPE
|
|
from textwrap import dedent
|
|
|
|
from odoo.tools.which import which
|
|
from odoo.tests.common import TransactionCase
|
|
|
|
from . import _odoo_checker_sql_injection
|
|
|
|
try:
|
|
import pylint
|
|
from pylint.lint import PyLinter
|
|
except ImportError:
|
|
pylint = None
|
|
PyLinter = object
|
|
try:
|
|
pylint_bin = which('pylint')
|
|
except IOError:
|
|
pylint_bin = None
|
|
|
|
class UnittestLinter(PyLinter):
|
|
current_file = 'not_test_checkers.py'
|
|
|
|
def __init__(self):
|
|
self._messages = []
|
|
self.stats = {}
|
|
super().__init__()
|
|
|
|
def add_message(self, msg_id, *args, **kwargs):
|
|
self._messages.append(msg_id)
|
|
|
|
@staticmethod
|
|
def is_message_enabled(*_args, **kwargs):
|
|
return True
|
|
|
|
|
|
HERE = os.path.dirname(os.path.realpath(__file__))
|
|
|
|
|
|
class TestPylintChecks(TransactionCase):
|
|
def check(self, test_content, plugins, rules):
|
|
with tempfile.NamedTemporaryFile(mode="w", encoding="utf-8", delete=False) as f:
|
|
self.addCleanup(os.remove, f.name)
|
|
f.write(dedent(test_content).strip())
|
|
res = run(
|
|
[
|
|
pylint_bin,
|
|
f"--rcfile={os.devnull}",
|
|
f"--load-plugins={plugins}",
|
|
"--disable=all",
|
|
f"--enable={rules}",
|
|
"--output-format=json",
|
|
f.name,
|
|
],
|
|
stdout=PIPE,
|
|
encoding="utf-8",
|
|
env={
|
|
**os.environ,
|
|
"PYTHONPATH": HERE + os.pathsep + os.environ.get("PYTHONPATH", ""),
|
|
},
|
|
check=False,
|
|
shell=False, # keep False to avoid shell injection
|
|
)
|
|
return res.returncode, json.loads(res.stdout)
|
|
|
|
|
|
@unittest.skipUnless(pylint and pylint_bin, "testing lints requires pylint")
|
|
class TestGetTextLint(TestPylintChecks):
|
|
def check(self, testtext):
|
|
return super().check(testtext, "_odoo_checker_gettext", "gettext-placeholders")
|
|
|
|
def test_gettext_env(self):
|
|
# check that _ and self.env._ are checked in the same way
|
|
r, errs = self.check("""
|
|
def method(self, vars):
|
|
_("something %s %s", *vars)
|
|
""")
|
|
self.assertTrue(r, "_() should have raised for multiple placeholders")
|
|
self.assertEqual(errs[0]['line'], 2, errs)
|
|
|
|
r, errs = self.check("""
|
|
def method(self, vars):
|
|
self.env._("something %s %s", *vars)
|
|
""")
|
|
self.assertTrue(r, "self.env._() should have raised for multiple placeholders")
|
|
self.assertEqual(errs[0]['line'], 2, errs)
|
|
|
|
|
|
@unittest.skipUnless(pylint and pylint_bin, "testing lints requires pylint")
|
|
class TestSqlLint(TestPylintChecks):
|
|
def check(self, testtext):
|
|
return super().check(testtext, "_odoo_checker_sql_injection", "sql-injection")
|
|
|
|
def test_printf(self):
|
|
r, [err] = self.check("""
|
|
def do_the_thing(cr, name):
|
|
cr.execute('select %s from thing' % name)
|
|
""")
|
|
self.assertTrue(r, "should have noticed the injection")
|
|
self.assertEqual(err['line'], 2, err)
|
|
|
|
r, errs = self.check("""
|
|
def do_the_thing(self):
|
|
self.env.cr.execute("select thing from %s" % self._table)
|
|
""")
|
|
self.assertFalse(r, f"underscore-attributes are allowed\n{errs}")
|
|
|
|
r, errs = self.check("""
|
|
def do_the_thing(self):
|
|
query = "select thing from %s"
|
|
self.env.cr.execute(query % self._table)
|
|
""")
|
|
self.assertFalse(r, f"underscore-attributes are allowed\n{errs}")
|
|
|
|
def test_fstring(self):
|
|
r, [err] = self.check("""
|
|
def do_the_thing(cr, name):
|
|
cr.execute(f'select {name} from thing')
|
|
""")
|
|
self.assertTrue(r, "should have noticed the injection")
|
|
self.assertEqual(err['line'], 2, err)
|
|
|
|
r, errs = self.check("""
|
|
def do_the_thing(cr, name):
|
|
cr.execute(f'select name from thing')
|
|
""")
|
|
self.assertFalse(r, f"unnecessary fstring should be innocuous\n{errs}")
|
|
|
|
#r, errs = self.check("""
|
|
#def do_the_thing(cr, name, value):
|
|
# cr.execute(f'select {name} from thing where field = %s', [value])
|
|
#""")
|
|
#self.assertFalse(r, f"probably has a good reason for the extra arg\n{errs}")
|
|
|
|
r, errs = self.check("""
|
|
def do_the_thing(self):
|
|
self.env.cr.execute(f'select name from {self._table}')
|
|
""")
|
|
self.assertFalse(r, f'underscore-attributes are allowable\n{errs}')
|
|
|
|
|
|
@contextmanager
|
|
def assertMessages(self, *messages):
|
|
self.linter._messages = []
|
|
yield
|
|
self.assertEqual(self.linter._messages, list(messages))
|
|
|
|
@contextmanager
|
|
def assertNoMessages(self):
|
|
self.linter._messages = []
|
|
yield
|
|
self.assertEqual(self.linter._messages, [])
|
|
|
|
def test_sql_injection_detection(self):
|
|
self.linter = UnittestLinter()
|
|
self.linter.current_file = 'dummy.py' # should not be prefixed by test
|
|
checker = _odoo_checker_sql_injection.OdooBaseChecker(self.linter)
|
|
|
|
node = _odoo_checker_sql_injection.astroid.extract_node("""
|
|
def test():
|
|
arg = "test"
|
|
arg = arg + arg
|
|
self.env.cr.execute(arg) #@
|
|
""")
|
|
|
|
with self.assertNoMessages():
|
|
checker.visit_call(node)
|
|
|
|
node = _odoo_checker_sql_injection.astroid.extract_node("""
|
|
def test_function9(self,arg):
|
|
my_injection_variable= "aaa" % arg #Uninferable
|
|
self.env.cr.execute('select * from hello where id = %s' % my_injection_variable) #@
|
|
""")
|
|
|
|
with self.assertMessages("sql-injection"):
|
|
checker.visit_call(node)
|
|
|
|
node = _odoo_checker_sql_injection.astroid.extract_node("""
|
|
def test_function10(self):
|
|
my_injection_variable= "aaa" + "aaa" #Const
|
|
self.env.cr.execute('select * from hello where id = %s' % my_injection_variable) #@
|
|
""")
|
|
with self.assertNoMessages():
|
|
checker.visit_call(node)
|
|
|
|
node = _odoo_checker_sql_injection.astroid.extract_node("""
|
|
def test_function11(self, arg):
|
|
my_injection_variable= "aaaaaaaa" + arg #Uninferable
|
|
self.env.cr.execute('select * from hello where id = %s' % my_injection_variable) #@
|
|
""")
|
|
|
|
with self.assertMessages("sql-injection"):
|
|
checker.visit_call(node)
|
|
|
|
node = _odoo_checker_sql_injection.astroid.extract_node("""
|
|
def test_function12(self):
|
|
arg1 = "a"
|
|
arg2 = "b" + arg1
|
|
arg3 = arg2 + arg1 + arg2
|
|
arg4 = arg1 + "d"
|
|
my_injection_variable= arg1 + arg2 + arg3 + arg4
|
|
self.env.cr.execute('select * from hello where id = %s' % my_injection_variable) #@
|
|
""")
|
|
|
|
with self.assertNoMessages():
|
|
checker.visit_call(node)
|
|
|
|
node = _odoo_checker_sql_injection.astroid.extract_node("""
|
|
def test_function1(self, arg):
|
|
my_injection_variable= f"aaaaa{arg}aaa" #Uninferable
|
|
self.env.cr.execute('select * from hello where id = %s' % my_injection_variable) #@
|
|
""")
|
|
with self.assertMessages("sql-injection"):
|
|
checker.visit_call(node)
|
|
|
|
node = _odoo_checker_sql_injection.astroid.extract_node("""
|
|
def test_function2(self):
|
|
arg = 'bbb'
|
|
my_injection_variable= f"aaaaa{arg}aaa" #Uninferable
|
|
self.env.cr.execute('select * from hello where id = %s' % my_injection_variable) #@
|
|
""")
|
|
with self.assertNoMessages():
|
|
checker.visit_call(node)
|
|
|
|
node = _odoo_checker_sql_injection.astroid.extract_node("""
|
|
def test_function3(self, arg):
|
|
my_injection_variable= "aaaaaaaa".format() # Const
|
|
self.env.cr.execute('select * from hello where id = %s' % my_injection_variable) #@
|
|
""")
|
|
with self.assertNoMessages():
|
|
checker.visit_call(node)
|
|
|
|
node = _odoo_checker_sql_injection.astroid.extract_node("""
|
|
def test_function4(self, arg):
|
|
my_injection_variable= "aaaaaaaa {test}".format(test="aaa")
|
|
self.env.cr.execute('select * from hello where id = %s' % my_injection_variable) #@
|
|
""")
|
|
with self.assertNoMessages():
|
|
checker.visit_call(node)
|
|
|
|
node = _odoo_checker_sql_injection.astroid.extract_node("""
|
|
def test_function5(self):
|
|
arg = 'aaa'
|
|
my_injection_variable= "aaaaaaaa {test}".format(test=arg) #Uninferable
|
|
self.env.cr.execute('select * from hello where id = %s' % my_injection_variable) #@
|
|
""")
|
|
with self.assertNoMessages():
|
|
checker.visit_call(node)
|
|
|
|
node = _odoo_checker_sql_injection.astroid.extract_node("""
|
|
def test_function6(self,arg):
|
|
my_injection_variable= "aaaaaaaa {test}".format(test="aaa" + arg) #Uninferable
|
|
self.env.cr.execute('select * from hello where id = %s' % my_injection_variable) #@
|
|
""")
|
|
with self.assertMessages("sql-injection"):
|
|
checker.visit_call(node)
|
|
|
|
node = _odoo_checker_sql_injection.astroid.extract_node("""
|
|
def test_function7(self):
|
|
arg = "aaa"
|
|
my_injection_variable= "aaaaaaaa {test}".format(test="aaa" + arg) #Const
|
|
self.env.cr.execute('select * from hello where id = %s' % my_injection_variable)#@
|
|
""")
|
|
with self.assertNoMessages():
|
|
checker.visit_call(node)
|
|
|
|
node = _odoo_checker_sql_injection.astroid.extract_node("""
|
|
def test_function8(self):
|
|
global arg
|
|
my_injection_variable= "aaaaaaaa {test}".format(test="aaa" + arg) #Uninferable
|
|
self.env.cr.execute('select * from hello where id = %s' % my_injection_variable) #@
|
|
""")
|
|
with self.assertMessages("sql-injection"):
|
|
checker.visit_call(node)
|
|
|
|
#TODO
|
|
#node = _odoo_checker_sql_injection.astroid.extract_node("""
|
|
#def test_function(self):
|
|
# def test():
|
|
# return "hello world"
|
|
# my_injection_variable= "aaaaaaaa {test}".format(test=test()) #Const
|
|
# self.env.cr.execute('select * from hello where id = %s' % my_injection_variable) #@
|
|
#""")
|
|
#with self.assertNoMessages():
|
|
# checker.visit_call(node)
|
|
|
|
node = _odoo_checker_sql_injection.astroid.extract_node("""
|
|
def test_function9(self,arg):
|
|
my_injection_variable= "aaa" % arg
|
|
self.env.cr.execute('select * from hello where id = %s' % my_injection_variable) #@
|
|
""")
|
|
|
|
with self.assertMessages("sql-injection"):
|
|
checker.visit_call(node)
|
|
|
|
node = _odoo_checker_sql_injection.astroid.extract_node("""
|
|
def test_function10(self,arg):
|
|
if_else_variable = "aaa" if arg else "bbb" # the two choice of a condition are constant, this is not injectable
|
|
self.env.cr.execute('select * from hello where id = %s' % if_else_variable) #@
|
|
""")
|
|
|
|
with self.assertMessages():
|
|
checker.visit_call(node)
|
|
|
|
node = _odoo_checker_sql_injection.astroid.extract_node("""
|
|
def _search_phone_mobile_search(self, operator, value):
|
|
|
|
condition = 'IS NULL' if operator == '=' else 'IS NOT NULL'
|
|
query = '''
|
|
SELECT model.id
|
|
FROM %s model
|
|
WHERE model.phone %s
|
|
AND model.mobile %s
|
|
''' % (self._table, condition, condition)
|
|
self.env.cr.execute(query) #@
|
|
""") #Real false positive example from the code
|
|
with self.assertMessages():
|
|
checker.visit_call(node)
|
|
|
|
node = _odoo_checker_sql_injection.astroid.extract_node("""
|
|
def test1(self):
|
|
operator = 'aaa'
|
|
value = 'bbb'
|
|
op1 , val1 = (operator,value)
|
|
self.env.cr.execute('query' + op1) #@
|
|
""") #Test tuple assignement
|
|
with self.assertMessages():
|
|
checker.visit_call(node)
|
|
|
|
node = _odoo_checker_sql_injection.astroid.extract_node("""
|
|
def test2(self):
|
|
operator = 'aaa'
|
|
operator += 'bbb'
|
|
self.env.cr.execute('query' + operator) #@
|
|
""")
|
|
with self.assertMessages():
|
|
checker.visit_call(node)
|
|
|
|
node = _odoo_checker_sql_injection.astroid.extract_node("""
|
|
def test3(self):
|
|
self.env.cr.execute(f'{self._table}') #@
|
|
""")
|
|
with self.assertMessages():
|
|
checker.visit_call(node)
|
|
|
|
node = _odoo_checker_sql_injection.astroid.extract_node("""
|
|
def _init_column(self, column_name):
|
|
query = f'UPDATE "{self._table}" SET "{column_name}" = %s WHERE "{column_name}" IS NULL'
|
|
self._cr.execute(query, (value,)) #@
|
|
""") #Test private function arg should not flag
|
|
with self.assertMessages():
|
|
checker.visit_call(node)
|
|
|
|
node = _odoo_checker_sql_injection.astroid.extract_node("""
|
|
def _init_column1(self, column_name):
|
|
query = 'SELECT %(var1)s FROM %(var2)s WHERE %(var3)s' % {'var1': 'field_name','var2': 'table_name','var3': 'where_clause'}
|
|
self._cr.execute(query) #@
|
|
""")
|
|
with self.assertMessages():
|
|
checker.visit_call(node)
|
|
|
|
node = _odoo_checker_sql_injection.astroid.extract_node("""
|
|
def _graph_data(self, start_date, end_date):
|
|
|
|
query = '''SELECT %(x_query)s as x_value, %(y_query)s as y_value
|
|
FROM %(table)s
|
|
WHERE team_id = %(team_id)s
|
|
AND DATE(%(date_column)s) >= %(start_date)s
|
|
AND DATE(%(date_column)s) <= %(end_date)s
|
|
%(extra_conditions)s
|
|
GROUP BY x_value;'''
|
|
|
|
# apply rules
|
|
dashboard_graph_model = self._graph_get_model()
|
|
GraphModel = self.env[dashboard_graph_model]
|
|
graph_table = self._graph_get_table(GraphModel)
|
|
extra_conditions = self._extra_sql_conditions()
|
|
where_query = GraphModel._where_calc([])
|
|
GraphModel._apply_ir_rules(where_query, 'read')
|
|
from_clause, where_clause, where_clause_params = where_query.get_sql()
|
|
if where_clause:
|
|
extra_conditions += " AND " + where_clause
|
|
|
|
query = query % {
|
|
'x_query': self._graph_x_query(),
|
|
'y_query': self._graph_y_query(),
|
|
'table': graph_table,
|
|
'team_id': "%s",
|
|
'date_column': self._graph_date_column(),
|
|
'start_date': "%s",
|
|
'end_date': "%s",
|
|
'extra_conditions': extra_conditions
|
|
}
|
|
|
|
self._cr.execute(query, [self.id, start_date, end_date] + where_clause_params) #@
|
|
return self.env.cr.dictfetchall()
|
|
""")
|
|
with self.assertMessages():
|
|
checker.visit_call(node)
|
|
|
|
node = _odoo_checker_sql_injection.astroid.extract_node("""
|
|
def first_fun():
|
|
anycall() #@
|
|
return 'a'
|
|
""")
|
|
with self.assertMessages():
|
|
checker.visit_call(node)
|
|
node = _odoo_checker_sql_injection.astroid.extract_node("""
|
|
def second_fun(value):
|
|
anycall() #@
|
|
return value
|
|
""")
|
|
with self.assertMessages():
|
|
checker.visit_call(node)
|
|
node = _odoo_checker_sql_injection.astroid.extract_node("""
|
|
def injectable():
|
|
cr.execute(first_fun())#@
|
|
""")
|
|
with self.assertMessages():
|
|
checker.visit_call(node)
|
|
node = _odoo_checker_sql_injection.astroid.extract_node("""
|
|
def injectable1():
|
|
cr.execute(second_fun('aaaaa'))#@
|
|
""")
|
|
with self.assertMessages():
|
|
checker.visit_call(node)
|
|
|
|
node = _odoo_checker_sql_injection.astroid.extract_node("""
|
|
def injectable2(var):
|
|
a = ['a','b']
|
|
cr.execute('a'.join(a))#@
|
|
""")
|
|
with self.assertMessages():
|
|
checker.visit_call(node)
|
|
|
|
node = _odoo_checker_sql_injection.astroid.extract_node("""
|
|
def return_tuple(var):
|
|
return 'a',var
|
|
""")
|
|
with self.assertMessages():
|
|
checker.visit_functiondef(node)
|
|
|
|
node = _odoo_checker_sql_injection.astroid.extract_node("""
|
|
def injectable4(var):
|
|
a, _ = return_tuple(var)
|
|
cr.execute(a) #@
|
|
""")
|
|
with self.assertMessages():
|
|
checker.visit_call(node)
|
|
node = _odoo_checker_sql_injection.astroid.extract_node("""
|
|
def not_injectable5(var):
|
|
star = ('defined','constant','string')
|
|
cr.execute(*star)#@
|
|
""")
|
|
with self.assertMessages():
|
|
checker.visit_call(node)
|
|
|
|
node = _odoo_checker_sql_injection.astroid.extract_node("""
|
|
def injectable6(var):
|
|
star = ('defined','variable','string',var)
|
|
cr.execute(*star)#@
|
|
""")
|
|
with self.assertMessages("sql-injection"):
|
|
checker.visit_call(node)
|
|
|
|
node = _odoo_checker_sql_injection.astroid.extract_node("""
|
|
def formatNumber(var):
|
|
cr.execute('LIMIT %d' % var)#@
|
|
""")
|
|
with self.assertMessages():
|
|
checker.visit_call(node)
|
|
|
|
node = _odoo_checker_sql_injection.astroid.extract_node("""
|
|
def wrapper1(var):
|
|
query = SQL(var) #@
|
|
return query
|
|
""")
|
|
with self.assertMessages("sql-injection"):
|
|
checker.visit_call(list(node.get_children())[1])
|
|
|
|
node = _odoo_checker_sql_injection.astroid.extract_node("""
|
|
def wrapper2(var):
|
|
query = tools.SQL(var) #@
|
|
return query
|
|
""")
|
|
with self.assertMessages("sql-injection"):
|
|
checker.visit_call(list(node.get_children())[1])
|
|
|
|
|
|
@unittest.skipUnless(pylint and pylint_bin, "testing lints requires pylint")
|
|
class TestI18nChecks(TestPylintChecks):
|
|
def check(self, test_content):
|
|
return super().check(
|
|
test_content, "_odoo_checker_gettext", "gettext-variable,gettext-placeholders,gettext-repr"
|
|
)
|
|
|
|
def test_gettext_variable(self):
|
|
exit_code, errors = self.check(
|
|
"""
|
|
some_variable = "Roblox Mini Golf! [ACTUALLY FIXED]"
|
|
_(some_variable)
|
|
_lt(513)
|
|
_lt("string but" + "not static")
|
|
_(f"formatted string")
|
|
"""
|
|
)
|
|
self.assertNotEqual(exit_code, os.EX_OK)
|
|
self.assertEqual(len(errors), 4)
|
|
for error in errors:
|
|
self.assertEqual(error["symbol"], "gettext-variable")
|
|
|
|
def test_gettext_placeholders(self):
|
|
exit_code, errors = self.check(
|
|
"""
|
|
_("shouldn't match escaped %%s %%s")
|
|
"""
|
|
)
|
|
self.assertEqual(exit_code, os.EX_OK)
|
|
self.assertFalse(errors)
|
|
exit_code, errors = self.check(
|
|
"""
|
|
_("more than one unnamed placeholder: %s %s")
|
|
_lt("with fancy placeholders: %03.14d %-xL")
|
|
"""
|
|
)
|
|
self.assertNotEqual(exit_code, os.EX_OK)
|
|
self.assertEqual(len(errors), 2)
|
|
for error in errors:
|
|
self.assertEqual(error["symbol"], "gettext-placeholders")
|
|
|
|
def test_gettext_repr(self):
|
|
exit_code, errors = self.check(
|
|
"""
|
|
_("%r shouldn't be part of translated strings")
|
|
_lt("%(with_placeholders_in_between)r")
|
|
"""
|
|
)
|
|
self.assertNotEqual(exit_code, os.EX_OK)
|
|
self.assertEqual(len(errors), 2)
|
|
for error in errors:
|
|
self.assertEqual(error["symbol"], "gettext-repr")
|