# Part of Odoo. See LICENSE file for full copyright and licensing details. import logging import re import time import lxml.html from werkzeug import urls import odoo from odoo.addons.base.tests.common import HttpCaseWithUserDemo _logger = logging.getLogger(__name__) @odoo.tests.common.tagged('post_install', '-at_install', 'crawl') class Crawler(HttpCaseWithUserDemo): """ Test suite crawling an Odoo CMS instance and checking that all internal links lead to a 200 response. If a username and a password are provided, authenticates the user before starting the crawl """ def setUp(self): super(Crawler, self).setUp() self.env.ref('website.default_website').write({ 'social_facebook': "https://www.facebook.com/Odoo", 'social_twitter': 'https://twitter.com/Odoo', 'social_linkedin': 'https://www.linkedin.com/company/odoo', 'social_youtube': 'https://www.youtube.com/user/OpenERPonline', 'social_github': 'https://github.com/odoo', 'social_instagram': 'https://www.instagram.com/explore/tags/odoo/', 'social_tiktok': 'https://www.tiktok.com/@odoo', }) if hasattr(self.env['res.partner'], 'grade_id'): # Create at least one published parter, so that /partners doesn't # return a 404 grade = self.env['res.partner.grade'].create({ 'name': 'A test grade', 'website_published': True, }) self.env['res.partner'].create({ 'name': 'A Company for /partners', 'is_company': True, 'grade_id': grade.id, 'website_published': True, }) def clean_url(self, url): # convert clean_url = re.sub(r"(?<=/)(([^/=?&]+)?-?[0-9]+)(?=(/|$|\?|#))", r"", url) # remove # part, sort param and clean trailing /? base, *qs = clean_url.split('#', 1)[0].split('?', 1) qs_sorted = '?' + '&'.join(sorted(''.join(qs).split('&'))) # convert ?qs= qs_sorted = re.sub(r"([^=?&]+)=[^=?&]+", r'\g<1>=', qs_sorted) clean_url = base.rstrip('/#') + qs_sorted.rstrip('?#') return clean_url def crawl(self, url, seen=None, msg=''): if seen is None: seen = set() url_slug = self.clean_url(url) if url_slug in seen: return seen seen.add(url_slug) _logger.info("%s %s", msg, url) r = self.url_open(url, allow_redirects=False) if r.status_code in (301, 302, 303): # check local redirect to avoid fetch externals pages new_url = r.headers.get('Location') current_url = r.url if urls.url_parse(new_url).netloc != urls.url_parse(current_url).netloc: return seen r = self.url_open(new_url) code = r.status_code self.assertIn(code, range(200, 300), "%s Fetching %s returned error response (%d)" % (msg, url, code)) if r.headers['Content-Type'].startswith('text/html'): doc = lxml.html.fromstring(r.content) for link in doc.xpath('//a[@href]'): href = link.get('href') parts = urls.url_parse(href) # href with any fragment removed href = parts.replace(fragment='').to_url() # FIXME: handle relative link (not parts.path.startswith /) if parts.netloc or \ not parts.path.startswith('/') or \ parts.path == '/odoo' or\ parts.path.startswith('/web/') or \ parts.path.startswith('/en/') or \ (parts.scheme and parts.scheme not in ('http', 'https')): continue self.crawl(href, seen, msg) return seen def test_05_test_clean_url(self): urls_to_check = [ ("/my/1/20/300", "/my///"), ("/my/19/", "/my/"), ("/my/19#", "/my/"), ("/my/19#a=b", "/my/"), ("/my/19/?access_token=www-xxx-yyy-zzz", "/my/?access_token="), ("/my/19?access_token=www-xxx-yyy-zzz", "/my/?access_token="), ("/my/19?access_token=www-xxx-yyy-zzz&report_type=pdf", "/my/?access_token=&report_type="), ("/my/slug-19/", "/my/"), ("/my/slug-19#a=b", "/my/"), ("/my/slug-19/?access_token=www-xxx-yyy-zzz", "/my/?access_token="), ("/my/slug-19?access_token=www-xxx-yyy-zzz", "/my/?access_token="), ("/my/slug-19?access_token=www-xxx-yyy-zzz&report_type=pdf", "/my/?access_token=&report_type="), ("/my/page/2?order=website_sequence+asc", "/my/page/?order="), ("/my/page/2", "/my/page/"), ("/my/page/2/", "/my/page/"), ("/terms", "/terms"), ("/controller/slug-1", "/controller/"), ("/controller/tag/slug-2", "/controller/tag/"), ("/controller/slug-1/slug-2", "/controller//"), ("/controller/slug-1/tag/slug-2", "/controller//tag/"), ("/controller/slug-1/tag/slug-2/end", "/controller//tag//end"), ("/controller?tags=%5B5%5D", "/controller?tags="), ("/controller?date=upcoming&tags=%5B5%5D", "/controller?date=&tags="), ("/controller?tags=%5B%5D&date=upcoming", "/controller?date=&tags="), ("/controller?tags=%5B%5D&from=/a/b/c", "/controller?from=&tags="), ("/controller?tags=%5B%5D&from=d/e/f&to=/a/b", "/controller?from=&tags=&to="), ("/controller?tags=%5B%5D&from=d/e/f&to=/c/d", "/controller?from=&tags=&to="), ] uniq = set() for url, clean_expected in urls_to_check: cleaned = self.clean_url(url) self.assertEqual(cleaned, clean_expected) uniq.add(cleaned) self.assertEqual(len(uniq), 16) def test_10_crawl_public(self): t0 = time.time() t0_sql = self.registry.test_cr.sql_log_count seen = self.crawl('/', msg='Anonymous Coward') count = len(seen) duration = time.time() - t0 sql = self.registry.test_cr.sql_log_count - t0_sql _logger.runbot("public crawled %s urls in %.2fs %s queries, %.3fs %.2fq per request, ", count, duration, sql, duration / count, float(sql) / count) def test_20_crawl_demo(self): # Demo user without sales/crm/helpdesk/... rights won't be able to access to # portals like /my/leads. Grant him those rights if exists. groups = self.env['res.groups'] group_xmlids = [ 'sales_team.group_sale_salesman', 'purchase.group_purchase_user', 'helpdesk.group_helpdesk_user', ] for group_xmlid in group_xmlids: group = self.env.ref(group_xmlid, raise_if_not_found=False) if group: groups += group self.env.ref('base.group_user').write({'implied_ids': [(4, group.id) for group in groups]}) t0 = time.time() t0_sql = self.registry.test_cr.sql_log_count self.authenticate('demo', 'demo') seen = self.crawl('/', msg='demo') count = len(seen) duration = time.time() - t0 sql = self.registry.test_cr.sql_log_count - t0_sql _logger.runbot("demo crawled %s urls in %.2fs %s queries, %.3fs %.2fq per request", count, duration, sql, duration / count, float(sql) / count)