perf: reduce memory usage by paging through records

While migrating GL entries to Payment Ledger, page through records using
primary key to reduce memory usage.

(cherry picked from commit fee0ca8cd9)
This commit is contained in:
ruthra kumar
2023-01-20 15:32:16 +05:30
committed by Mergify
parent 6e7eee7fa3
commit 3ce8dc70cb

View File

@@ -2,7 +2,8 @@ import frappe
from frappe import qb from frappe import qb
from frappe.query_builder import Case, CustomFunction from frappe.query_builder import Case, CustomFunction
from frappe.query_builder.custom import ConstantColumn from frappe.query_builder.custom import ConstantColumn
from frappe.query_builder.functions import IfNull from frappe.query_builder.functions import Count, IfNull
from frappe.utils import flt
from erpnext.accounts.doctype.accounting_dimension.accounting_dimension import ( from erpnext.accounts.doctype.accounting_dimension.accounting_dimension import (
get_dimensions, get_dimensions,
@@ -17,9 +18,9 @@ def create_accounting_dimension_fields():
make_dimension_in_accounting_doctypes(dimension, ["Payment Ledger Entry"]) make_dimension_in_accounting_doctypes(dimension, ["Payment Ledger Entry"])
def generate_name_for_payment_ledger_entries(gl_entries): def generate_name_for_payment_ledger_entries(gl_entries, start):
for index, entry in enumerate(gl_entries, 1): for index, entry in enumerate(gl_entries, 0):
entry.name = index entry.name = start + index
def get_columns(): def get_columns():
@@ -81,6 +82,14 @@ def insert_chunk_into_payment_ledger(insert_query, gl_entries):
def execute(): def execute():
"""
Description:
Migrate records from `tabGL Entry` to `tabPayment Ledger Entry`.
Patch is non-resumable. if patch failed or is terminatted abnormally, clear 'tabPayment Ledger Entry' table manually before re-running. Re-running is safe only during V13->V14 update.
Note: Post successful migration to V14, re-running is NOT-SAFE and SHOULD NOT be attempted.
"""
if frappe.reload_doc("accounts", "doctype", "payment_ledger_entry"): if frappe.reload_doc("accounts", "doctype", "payment_ledger_entry"):
# create accounting dimension fields in Payment Ledger # create accounting dimension fields in Payment Ledger
create_accounting_dimension_fields() create_accounting_dimension_fields()
@@ -89,52 +98,90 @@ def execute():
account = qb.DocType("Account") account = qb.DocType("Account")
ifelse = CustomFunction("IF", ["condition", "then", "else"]) ifelse = CustomFunction("IF", ["condition", "then", "else"])
gl_entries = ( # Get Records Count
qb.from_(gl) accounts = (
.inner_join(account) qb.from_(account)
.on((gl.account == account.name) & (account.account_type.isin(["Receivable", "Payable"]))) .select(account.name)
.select( .where((account.account_type == "Receivable") | (account.account_type == "Payable"))
gl.star, .orderby(account.name)
ConstantColumn(1).as_("docstatus"),
account.account_type.as_("account_type"),
IfNull(
ifelse(gl.against_voucher_type == "", None, gl.against_voucher_type), gl.voucher_type
).as_("against_voucher_type"),
IfNull(ifelse(gl.against_voucher == "", None, gl.against_voucher), gl.voucher_no).as_(
"against_voucher_no"
),
# convert debit/credit to amount
Case()
.when(account.account_type == "Receivable", gl.debit - gl.credit)
.else_(gl.credit - gl.debit)
.as_("amount"),
# convert debit/credit in account currency to amount in account currency
Case()
.when(
account.account_type == "Receivable",
gl.debit_in_account_currency - gl.credit_in_account_currency,
)
.else_(gl.credit_in_account_currency - gl.debit_in_account_currency)
.as_("amount_in_account_currency"),
)
.where(gl.is_cancelled == 0)
.orderby(gl.creation)
.run(as_dict=True)
) )
un_processed = (
qb.from_(gl)
.select(Count(gl.name))
.where((gl.is_cancelled == 0) & (gl.account.isin(accounts)))
.run()
)[0][0]
# primary key(name) for payment ledger records if un_processed:
generate_name_for_payment_ledger_entries(gl_entries) print(f"Migrating {un_processed} GL Entries to Payment Ledger")
# split data into chunks processed = 0
chunk_size = 1000 last_update_percent = 0
try: batch_size = 5000
for i in range(0, len(gl_entries), chunk_size): last_name = None
insert_query = build_insert_query()
insert_chunk_into_payment_ledger(insert_query, gl_entries[i : i + chunk_size]) while True:
frappe.db.commit() if last_name:
except Exception as err: where_clause = gl.name.gt(last_name) & (gl.is_cancelled == 0)
frappe.db.rollback() else:
ple = qb.DocType("Payment Ledger Entry") where_clause = gl.is_cancelled == 0
qb.from_(ple).delete().where(ple.docstatus >= 0).run()
frappe.db.commit() gl_entries = (
raise err qb.from_(gl)
.inner_join(account)
.on((gl.account == account.name) & (account.account_type.isin(["Receivable", "Payable"])))
.select(
gl.star,
ConstantColumn(1).as_("docstatus"),
account.account_type.as_("account_type"),
IfNull(
ifelse(gl.against_voucher_type == "", None, gl.against_voucher_type), gl.voucher_type
).as_("against_voucher_type"),
IfNull(ifelse(gl.against_voucher == "", None, gl.against_voucher), gl.voucher_no).as_(
"against_voucher_no"
),
# convert debit/credit to amount
Case()
.when(account.account_type == "Receivable", gl.debit - gl.credit)
.else_(gl.credit - gl.debit)
.as_("amount"),
# convert debit/credit in account currency to amount in account currency
Case()
.when(
account.account_type == "Receivable",
gl.debit_in_account_currency - gl.credit_in_account_currency,
)
.else_(gl.credit_in_account_currency - gl.debit_in_account_currency)
.as_("amount_in_account_currency"),
)
.where(where_clause)
.orderby(gl.name)
.limit(batch_size)
.run(as_dict=True)
)
if gl_entries:
last_name = gl_entries[-1].name
# primary key(name) for payment ledger records
generate_name_for_payment_ledger_entries(gl_entries, processed)
try:
insert_query = build_insert_query()
insert_chunk_into_payment_ledger(insert_query, gl_entries)
frappe.db.commit()
processed += len(gl_entries)
# Progress message
percent = flt((processed / un_processed) * 100, 2)
if percent - last_update_percent > 1:
print(f"{percent}% ({processed}) records processed")
last_update_percent = percent
except Exception as err:
print("Migration Failed. Clear `tabPayment Ledger Entry` table before re-running")
raise err
else:
break
print(f"{processed} records have been sucessfully migrated")