From 9ba22c9eb8c7a121cdf9249df4b5dc9ba54ce6e6 Mon Sep 17 00:00:00 2001 From: "Carsten Wolff (cawo)" Date: Fri, 13 Sep 2024 11:17:23 +0000 Subject: [PATCH] [IMP] snippets: move all work from parent to mp workers In `convert_html_columns()`, we select 100MiB worth of DB tuples and pass them to a ProcessPoolExecutor together with a converter callable. So far, the converter returns all tuples, changed or unchanged together with the information if it has changed something. All this is returned through IPC to the parent process. In the parent process, the caller only acts on the changed tuples, though, the rest is ignored. In any scenario I've seen, only a small proportion of the input tuples is actually changed, meaning that a large proportion is returned through IPC unnecessarily. What makes it worse is that processing of the converted results in the parent process is often slower than the conversion, leading to two effects: 1) The results of all workers sit in the parent process's memory, possibly leading to MemoryError (upg-2021031) 2) The parallel processing is being serialized on the feedback, defeating a large part of the intended performance gains To improve this, this commit - moves all work into the workers, meaning not just the conversion filter, but also the DB query as well as the DB updates. - by doing so reduces the amount of data passed by IPC to just the query texts - by doing so distributes the data held in memory to all worker processes - reduces the chunk size by one order of magnitude, which means - a lot less memory used at a time - a lot better distribution of "to-be-changed" rows when these rows are clustered in the table All in all, in my test case, this - reduces maximum process size in memory to 300MiB for all processes compared to formerly >2GiB (and MemoryError) in the parent process - reduces runtime from 17 minutes to less than 2 minutes --- src/base/tests/test_util.py | 57 +++++++++++++++++++------------------ src/util/snippets.py | 52 ++++++++++++++++++++++++--------- 2 files changed, 68 insertions(+), 41 deletions(-) diff --git a/src/base/tests/test_util.py b/src/base/tests/test_util.py index b2b169d8..6e877b59 100644 --- a/src/base/tests/test_util.py +++ b/src/base/tests/test_util.py @@ -13,6 +13,7 @@ except ImportError: import mock +from odoo import SUPERUSER_ID, api from odoo.osv.expression import FALSE_LEAF, TRUE_LEAF from odoo.tools import mute_logger from odoo.tools.safe_eval import safe_eval @@ -1436,33 +1437,35 @@ def not_doing_anything_converter(el): class TestHTMLFormat(UnitTestCase): def testsnip(self): - view_arch = """ - -

- - - """ - view_id = self.env["ir.ui.view"].create( - { - "name": "not_for_anything", - "type": "qweb", - "mode": "primary", - "key": "test.htmlconvert", - "arch_db": view_arch, - } - ) - cr = self.env.cr - snippets.convert_html_content( - cr, - snippets.html_converter( - not_doing_anything_converter, selector="//*[hasclass('fake_class_not_doing_anything')]" - ), - ) - util.invalidate(view_id) - res = self.env["ir.ui.view"].search_read([("id", "=", view_id.id)], ["arch_db"]) + with self.registry.cursor() as cr: + env = api.Environment(cr, SUPERUSER_ID, {}) + view_arch = """ + +

+ + + """ + view_id = env["ir.ui.view"].create( + { + "name": "not_for_anything", + "type": "qweb", + "mode": "primary", + "key": "test.htmlconvert", + "arch_db": view_arch, + } + ) + snippets.convert_html_content( + cr, + snippets.html_converter( + not_doing_anything_converter, selector="//*[hasclass('fake_class_not_doing_anything')]" + ), + ) + util.invalidate(view_id) + res = env["ir.ui.view"].search_read([("id", "=", view_id.id)], ["arch_db"]) + view_id.unlink() self.assertEqual(len(res), 1) oneline = lambda s: re.sub(r"\s+", " ", s.strip()) self.assertEqual(oneline(res[0]["arch_db"]), oneline(view_arch)) diff --git a/src/util/snippets.py b/src/util/snippets.py index 76b77015..bf8b257a 100644 --- a/src/util/snippets.py +++ b/src/util/snippets.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +import concurrent import inspect import logging import re @@ -11,6 +12,11 @@ from psycopg2.extensions import quote_ident from psycopg2.extras import Json +try: + from odoo.sql_db import db_connect +except ImportError: + from openerp.sql_db import db_connect + from .const import NEARLYWARN from .exceptions import MigrationError from .helpers import table_of_model @@ -243,16 +249,26 @@ def _dumps(self, node): class Convertor: - def __init__(self, converters, callback): + def __init__(self, converters, callback, dbname, update_query): self.converters = converters self.callback = callback + self.dbname = dbname + self.update_query = update_query + + def __call__(self, query): + with db_connect(self.dbname).cursor() as cr: + cr.execute(query) + for self.row in cr.fetchall(): + self._convert_row() + if "id" in self.changes: + cr.execute(self.update_query, self.changes) - def __call__(self, row): + def _convert_row(self): converters = self.converters columns = self.converters.keys() converter_callback = self.callback - res_id, *contents = row - changes = {} + res_id, *contents = self.row + self.changes = {} for column, content in zip(columns, contents): if content and converters[column]: # jsonb column; convert all keys @@ -264,10 +280,10 @@ def __call__(self, row): new_content = Json(new_content) else: has_changed, new_content = converter_callback(content) - changes[column] = new_content + self.changes[column] = new_content if has_changed: - changes["id"] = res_id - return changes + self.changes["id"] = res_id + return self.changes def convert_html_columns(cr, table, columns, converter_callback, where_column="IS NOT NULL", extra_where="true"): @@ -305,17 +321,25 @@ def convert_html_columns(cr, table, columns, converter_callback, where_column="I update_sql = ", ".join(f'"{column}" = %({column})s' for column in columns) update_query = f"UPDATE {table} SET {update_sql} WHERE id = %(id)s" + cr.commit() with ProcessPoolExecutor(max_workers=get_max_workers()) as executor: - convert = Convertor(converters, converter_callback) - for query in log_progress(split_queries, logger=_logger, qualifier=f"{table} updates"): - cr.execute(query) - for data in executor.map(convert, cr.fetchall(), chunksize=1000): - if "id" in data: - cr.execute(update_query, data) + convert = Convertor(converters, converter_callback, cr.dbname, update_query) + futures = [executor.submit(convert, query) for query in split_queries] + for future in log_progress( + concurrent.futures.as_completed(futures), + logger=_logger, + qualifier=f"{table} updates", + size=len(split_queries), + estimate=False, + log_hundred_percent=True, + ): + # just for raising any worker exception + future.result() + cr.commit() def determine_chunk_limit_ids(cr, table, column_arr, where): - bytes_per_chunk = 100 * 1024 * 1024 + bytes_per_chunk = 10 * 1024 * 1024 columns = ", ".join(quote_ident(column, cr._cnx) for column in column_arr if column != "id") cr.execute( f"""