perf: GLE reposting with progress and chunking (backport #31343) (#31373)

* fix: dont use cached doc for GLE reposts

ported from https://github.com/frappe/erpnext/pull/31240

* perf: GLE reposting with progress and chunking

If stock voucher count goes >1000 then fetching all gles and reposting
them all at once requires much more memory and can cause crash.

- This PR ensures that GLE reposting is done in chunks of 100 vouchers.
- This PR also starts keeping track of how many such chunks were
  processed so in future progress is resumed in event of timeout.

* test: add "actual" test for chunked GLE reposting
This commit is contained in:
Ankush Menat
2022-06-15 20:54:20 +05:30
committed by GitHub
parent 001130c0da
commit f19ed0b74c
5 changed files with 154 additions and 28 deletions

View File

@@ -62,8 +62,8 @@ class TestUtils(unittest.TestCase):
stock_entry = {"item": item, "to_warehouse": "_Test Warehouse - _TC", "qty": 1, "rate": 10} stock_entry = {"item": item, "to_warehouse": "_Test Warehouse - _TC", "qty": 1, "rate": 10}
se1 = make_stock_entry(posting_date="2022-01-01", **stock_entry) se1 = make_stock_entry(posting_date="2022-01-01", **stock_entry)
se2 = make_stock_entry(posting_date="2022-02-01", **stock_entry)
se3 = make_stock_entry(posting_date="2022-03-01", **stock_entry) se3 = make_stock_entry(posting_date="2022-03-01", **stock_entry)
se2 = make_stock_entry(posting_date="2022-02-01", **stock_entry)
for doc in (se1, se2, se3): for doc in (se1, se2, se3):
vouchers.append((doc.doctype, doc.name)) vouchers.append((doc.doctype, doc.name))

View File

@@ -3,13 +3,23 @@
from json import loads from json import loads
from typing import List, Tuple from typing import TYPE_CHECKING, List, Optional, Tuple
import frappe import frappe
import frappe.defaults import frappe.defaults
from frappe import _, throw from frappe import _, throw
from frappe.model.meta import get_field_precision from frappe.model.meta import get_field_precision
from frappe.utils import cint, cstr, flt, formatdate, get_number_format_info, getdate, now, nowdate from frappe.utils import (
cint,
create_batch,
cstr,
flt,
formatdate,
get_number_format_info,
getdate,
now,
nowdate,
)
from six import string_types from six import string_types
import erpnext import erpnext
@@ -19,6 +29,9 @@ from erpnext.accounts.doctype.account.account import get_account_currency # noq
from erpnext.stock import get_warehouse_account_map from erpnext.stock import get_warehouse_account_map
from erpnext.stock.utils import get_stock_value_on from erpnext.stock.utils import get_stock_value_on
if TYPE_CHECKING:
from erpnext.stock.doctype.repost_item_valuation.repost_item_valuation import RepostItemValuation
class FiscalYearError(frappe.ValidationError): class FiscalYearError(frappe.ValidationError):
pass pass
@@ -28,6 +41,9 @@ class PaymentEntryUnlinkError(frappe.ValidationError):
pass pass
GL_REPOSTING_CHUNK = 100
@frappe.whitelist() @frappe.whitelist()
def get_fiscal_year( def get_fiscal_year(
date=None, fiscal_year=None, label="Date", verbose=1, company=None, as_dict=False date=None, fiscal_year=None, label="Date", verbose=1, company=None, as_dict=False
@@ -1122,38 +1138,55 @@ def update_gl_entries_after(
def repost_gle_for_stock_vouchers( def repost_gle_for_stock_vouchers(
stock_vouchers, posting_date, company=None, warehouse_account=None stock_vouchers: List[Tuple[str, str]],
posting_date: str,
company: Optional[str] = None,
warehouse_account=None,
repost_doc: Optional["RepostItemValuation"] = None,
): ):
if not stock_vouchers: if not stock_vouchers:
return return
def _delete_gl_entries(voucher_type, voucher_no):
frappe.db.sql(
"""delete from `tabGL Entry`
where voucher_type=%s and voucher_no=%s""",
(voucher_type, voucher_no),
)
stock_vouchers = sort_stock_vouchers_by_posting_date(stock_vouchers)
if not warehouse_account: if not warehouse_account:
warehouse_account = get_warehouse_account_map(company) warehouse_account = get_warehouse_account_map(company)
precision = get_field_precision(frappe.get_meta("GL Entry").get_field("debit")) or 2 precision = get_field_precision(frappe.get_meta("GL Entry").get_field("debit")) or 2
gle = get_voucherwise_gl_entries(stock_vouchers, posting_date) stock_vouchers = sort_stock_vouchers_by_posting_date(stock_vouchers)
for voucher_type, voucher_no in stock_vouchers: if repost_doc and repost_doc.gl_reposting_index:
existing_gle = gle.get((voucher_type, voucher_no), []) # Restore progress
voucher_obj = frappe.get_cached_doc(voucher_type, voucher_no) stock_vouchers = stock_vouchers[cint(repost_doc.gl_reposting_index) :]
expected_gle = voucher_obj.get_gl_entries(warehouse_account)
if expected_gle: for stock_vouchers_chunk in create_batch(stock_vouchers, GL_REPOSTING_CHUNK):
if not existing_gle or not compare_existing_and_expected_gle( gle = get_voucherwise_gl_entries(stock_vouchers_chunk, posting_date)
existing_gle, expected_gle, precision for voucher_type, voucher_no in stock_vouchers_chunk:
): existing_gle = gle.get((voucher_type, voucher_no), [])
voucher_obj = frappe.get_doc(voucher_type, voucher_no)
expected_gle = voucher_obj.get_gl_entries(warehouse_account)
if expected_gle:
if not existing_gle or not compare_existing_and_expected_gle(
existing_gle, expected_gle, precision
):
_delete_gl_entries(voucher_type, voucher_no)
voucher_obj.make_gl_entries(gl_entries=expected_gle, from_repost=True)
else:
_delete_gl_entries(voucher_type, voucher_no) _delete_gl_entries(voucher_type, voucher_no)
voucher_obj.make_gl_entries(gl_entries=expected_gle, from_repost=True)
else: if not frappe.flags.in_test:
_delete_gl_entries(voucher_type, voucher_no) frappe.db.commit()
if repost_doc:
repost_doc.db_set(
"gl_reposting_index", cint(repost_doc.gl_reposting_index) + len(stock_vouchers_chunk)
)
def _delete_gl_entries(voucher_type, voucher_no):
frappe.db.sql(
"""delete from `tabGL Entry`
where voucher_type=%s and voucher_no=%s""",
(voucher_type, voucher_no),
)
def sort_stock_vouchers_by_posting_date( def sort_stock_vouchers_by_posting_date(
@@ -1167,6 +1200,9 @@ def sort_stock_vouchers_by_posting_date(
.select(sle.voucher_type, sle.voucher_no, sle.posting_date, sle.posting_time, sle.creation) .select(sle.voucher_type, sle.voucher_no, sle.posting_date, sle.posting_time, sle.creation)
.where((sle.is_cancelled == 0) & (sle.voucher_no.isin(voucher_nos))) .where((sle.is_cancelled == 0) & (sle.voucher_no.isin(voucher_nos)))
.groupby(sle.voucher_type, sle.voucher_no) .groupby(sle.voucher_type, sle.voucher_no)
.orderby(sle.posting_date)
.orderby(sle.posting_time)
.orderby(sle.creation)
).run(as_dict=True) ).run(as_dict=True)
sorted_vouchers = [(sle.voucher_type, sle.voucher_no) for sle in sles] sorted_vouchers = [(sle.voucher_type, sle.voucher_no) for sle in sles]

View File

@@ -25,7 +25,8 @@
"items_to_be_repost", "items_to_be_repost",
"affected_transactions", "affected_transactions",
"distinct_item_and_warehouse", "distinct_item_and_warehouse",
"current_index" "current_index",
"gl_reposting_index"
], ],
"fields": [ "fields": [
{ {
@@ -181,12 +182,20 @@
"label": "Affected Transactions", "label": "Affected Transactions",
"no_copy": 1, "no_copy": 1,
"read_only": 1 "read_only": 1
},
{
"default": "0",
"fieldname": "gl_reposting_index",
"fieldtype": "Int",
"hidden": 1,
"label": "GL reposting index",
"read_only": 1
} }
], ],
"index_web_pages_for_search": 1, "index_web_pages_for_search": 1,
"is_submittable": 1, "is_submittable": 1,
"links": [], "links": [],
"modified": "2022-04-18 14:08:08.821602", "modified": "2022-06-13 12:20:22.182322",
"modified_by": "Administrator", "modified_by": "Administrator",
"module": "Stock", "module": "Stock",
"name": "Repost Item Valuation", "name": "Repost Item Valuation",

View File

@@ -87,6 +87,7 @@ class RepostItemValuation(Document):
self.current_index = 0 self.current_index = 0
self.distinct_item_and_warehouse = None self.distinct_item_and_warehouse = None
self.items_to_be_repost = None self.items_to_be_repost = None
self.gl_reposting_index = 0
self.db_update() self.db_update()
def deduplicate_similar_repost(self): def deduplicate_similar_repost(self):
@@ -192,6 +193,7 @@ def repost_gl_entries(doc):
directly_dependent_transactions + list(repost_affected_transaction), directly_dependent_transactions + list(repost_affected_transaction),
doc.posting_date, doc.posting_date,
doc.company, doc.company,
repost_doc=doc,
) )

View File

@@ -2,10 +2,14 @@
# See license.txt # See license.txt
from unittest.mock import MagicMock, call
import frappe import frappe
from frappe.tests.utils import FrappeTestCase from frappe.tests.utils import FrappeTestCase
from frappe.utils import nowdate from frappe.utils import nowdate
from frappe.utils.data import add_to_date, today
from erpnext.accounts.utils import repost_gle_for_stock_vouchers
from erpnext.controllers.stock_controller import create_item_wise_repost_entries from erpnext.controllers.stock_controller import create_item_wise_repost_entries
from erpnext.stock.doctype.item.test_item import make_item from erpnext.stock.doctype.item.test_item import make_item
from erpnext.stock.doctype.purchase_receipt.test_purchase_receipt import make_purchase_receipt from erpnext.stock.doctype.purchase_receipt.test_purchase_receipt import make_purchase_receipt
@@ -13,10 +17,11 @@ from erpnext.stock.doctype.repost_item_valuation.repost_item_valuation import (
in_configured_timeslot, in_configured_timeslot,
) )
from erpnext.stock.doctype.stock_entry.stock_entry_utils import make_stock_entry from erpnext.stock.doctype.stock_entry.stock_entry_utils import make_stock_entry
from erpnext.stock.tests.test_utils import StockTestMixin
from erpnext.stock.utils import PendingRepostingError from erpnext.stock.utils import PendingRepostingError
class TestRepostItemValuation(FrappeTestCase): class TestRepostItemValuation(FrappeTestCase, StockTestMixin):
def tearDown(self): def tearDown(self):
frappe.flags.dont_execute_stock_reposts = False frappe.flags.dont_execute_stock_reposts = False
@@ -193,3 +198,77 @@ class TestRepostItemValuation(FrappeTestCase):
[["a", "b"], ["c", "d"]], [["a", "b"], ["c", "d"]],
sorted(frappe.parse_json(frappe.as_json(set([("a", "b"), ("c", "d")])))), sorted(frappe.parse_json(frappe.as_json(set([("a", "b"), ("c", "d")])))),
) )
def test_gl_repost_progress(self):
from erpnext.accounts import utils
# lower numbers to simplify test
orig_chunk_size = utils.GL_REPOSTING_CHUNK
utils.GL_REPOSTING_CHUNK = 1
self.addCleanup(setattr, utils, "GL_REPOSTING_CHUNK", orig_chunk_size)
doc = frappe.new_doc("Repost Item Valuation")
doc.db_set = MagicMock()
vouchers = []
company = "_Test Company with perpetual inventory"
posting_date = today()
for _ in range(3):
se = make_stock_entry(company=company, qty=1, rate=2, target="Stores - TCP1")
vouchers.append((se.doctype, se.name))
repost_gle_for_stock_vouchers(stock_vouchers=vouchers, posting_date=posting_date, repost_doc=doc)
self.assertIn(call("gl_reposting_index", 1), doc.db_set.mock_calls)
doc.db_set.reset_mock()
doc.gl_reposting_index = 1
repost_gle_for_stock_vouchers(stock_vouchers=vouchers, posting_date=posting_date, repost_doc=doc)
self.assertNotIn(call("gl_reposting_index", 1), doc.db_set.mock_calls)
def test_gl_complete_gl_reposting(self):
from erpnext.accounts import utils
# lower numbers to simplify test
orig_chunk_size = utils.GL_REPOSTING_CHUNK
utils.GL_REPOSTING_CHUNK = 2
self.addCleanup(setattr, utils, "GL_REPOSTING_CHUNK", orig_chunk_size)
item = self.make_item().name
company = "_Test Company with perpetual inventory"
for _ in range(10):
make_stock_entry(item=item, company=company, qty=1, rate=10, target="Stores - TCP1")
# consume
consumption = make_stock_entry(item=item, company=company, qty=1, source="Stores - TCP1")
self.assertGLEs(
consumption,
[{"credit": 10, "debit": 0}],
gle_filters={"account": "Stock In Hand - TCP1"},
)
# backdated receipt
backdated_receipt = make_stock_entry(
item=item,
company=company,
qty=1,
rate=50,
target="Stores - TCP1",
posting_date=add_to_date(today(), days=-1),
)
self.assertGLEs(
backdated_receipt,
[{"credit": 0, "debit": 50}],
gle_filters={"account": "Stock In Hand - TCP1"},
)
# check that original consumption GLe is updated
self.assertGLEs(
consumption,
[{"credit": 50, "debit": 0}],
gle_filters={"account": "Stock In Hand - TCP1"},
)