refactor: validations to prevent duplicate jobs
This commit is contained in:
@@ -7,6 +7,7 @@ from frappe import _, qb
|
|||||||
from frappe.desk.notifications import clear_notifications
|
from frappe.desk.notifications import clear_notifications
|
||||||
from frappe.model.document import Document
|
from frappe.model.document import Document
|
||||||
from frappe.utils import cint, comma_and, create_batch, get_link_to_form
|
from frappe.utils import cint, comma_and, create_batch, get_link_to_form
|
||||||
|
from frappe.utils.background_jobs import get_job, is_job_enqueued
|
||||||
|
|
||||||
|
|
||||||
class TransactionDeletionRecord(Document):
|
class TransactionDeletionRecord(Document):
|
||||||
@@ -41,6 +42,15 @@ class TransactionDeletionRecord(Document):
|
|||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
super(TransactionDeletionRecord, self).__init__(*args, **kwargs)
|
super(TransactionDeletionRecord, self).__init__(*args, **kwargs)
|
||||||
self.batch_size = 5000
|
self.batch_size = 5000
|
||||||
|
# Tasks are listged by their execution order
|
||||||
|
self.task_to_internal_method_map = {
|
||||||
|
"Delete Bins": "delete_bins",
|
||||||
|
"Delete Leads and Addresses": "delete_lead_addresses",
|
||||||
|
"Reset Company Values": "reset_company_values",
|
||||||
|
"Clear Notifications": "delete_notifications",
|
||||||
|
"Initialize Summary Table": "initialize_doctypes_to_be_deleted_table",
|
||||||
|
"Delete Transactions": "delete_company_transactions",
|
||||||
|
}
|
||||||
|
|
||||||
def validate(self):
|
def validate(self):
|
||||||
frappe.only_for("System Manager")
|
frappe.only_for("System Manager")
|
||||||
@@ -57,6 +67,16 @@ class TransactionDeletionRecord(Document):
|
|||||||
title=_("Not Allowed"),
|
title=_("Not Allowed"),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def generate_job_name_for_task(self, task=None):
|
||||||
|
method = self.task_to_internal_method_map[task]
|
||||||
|
return f"{self.name}_{method}"
|
||||||
|
|
||||||
|
def generate_job_name_for_all_tasks(self):
|
||||||
|
job_names = []
|
||||||
|
for method in self.task_to_internal_method_map.values():
|
||||||
|
job_names.append(self.generate_job_name_for_task)
|
||||||
|
return job_names
|
||||||
|
|
||||||
def before_submit(self):
|
def before_submit(self):
|
||||||
if queued_docs := frappe.db.get_all(
|
if queued_docs := frappe.db.get_all(
|
||||||
"Transaction Deletion Record",
|
"Transaction Deletion Record",
|
||||||
@@ -65,7 +85,7 @@ class TransactionDeletionRecord(Document):
|
|||||||
):
|
):
|
||||||
frappe.throw(
|
frappe.throw(
|
||||||
_(
|
_(
|
||||||
"Cannot queue multi docs for one company. {0} is already queued/running for company: {1}"
|
"Cannot enqueue multi docs for one company. {0} is already queued/running for company: {1}"
|
||||||
).format(
|
).format(
|
||||||
comma_and([get_link_to_form("Transaction Deletion Record", x) for x in queued_docs]),
|
comma_and([get_link_to_form("Transaction Deletion Record", x) for x in queued_docs]),
|
||||||
frappe.bold(self.company),
|
frappe.bold(self.company),
|
||||||
@@ -94,12 +114,17 @@ class TransactionDeletionRecord(Document):
|
|||||||
def on_cancel(self):
|
def on_cancel(self):
|
||||||
self.db_set("status", "Cancelled")
|
self.db_set("status", "Cancelled")
|
||||||
|
|
||||||
def chain_call(self, method):
|
def chain_call(self, task=None):
|
||||||
|
if task and task in self.task_to_internal_method_map:
|
||||||
|
method = self.task_to_internal_method_map[task]
|
||||||
|
job_id = self.generate_job_name_for_task(task)
|
||||||
|
|
||||||
frappe.enqueue(
|
frappe.enqueue(
|
||||||
"frappe.utils.background_jobs.run_doc_method",
|
"frappe.utils.background_jobs.run_doc_method",
|
||||||
doctype=self.doctype,
|
doctype=self.doctype,
|
||||||
name=self.name,
|
name=self.name,
|
||||||
doc_method=method,
|
doc_method=method,
|
||||||
|
job_id=job_id,
|
||||||
queue="long",
|
queue="long",
|
||||||
enqueue_after_commit=True,
|
enqueue_after_commit=True,
|
||||||
)
|
)
|
||||||
@@ -109,13 +134,27 @@ class TransactionDeletionRecord(Document):
|
|||||||
if not self.clear_notifications:
|
if not self.clear_notifications:
|
||||||
clear_notifications()
|
clear_notifications()
|
||||||
self.db_set("clear_notifications", 1)
|
self.db_set("clear_notifications", 1)
|
||||||
self.chain_call("initialize_doctypes_to_be_deleted_table")
|
self.chain_call(task="Initialize Summary Table")
|
||||||
|
|
||||||
def populate_doctypes_to_be_ignored_table(self):
|
def populate_doctypes_to_be_ignored_table(self):
|
||||||
doctypes_to_be_ignored_list = get_doctypes_to_be_ignored()
|
doctypes_to_be_ignored_list = get_doctypes_to_be_ignored()
|
||||||
for doctype in doctypes_to_be_ignored_list:
|
for doctype in doctypes_to_be_ignored_list:
|
||||||
self.append("doctypes_to_be_ignored", {"doctype_name": doctype})
|
self.append("doctypes_to_be_ignored", {"doctype_name": doctype})
|
||||||
|
|
||||||
|
def validate_running_task_for_doc(self, job_names: list = None):
|
||||||
|
# at most only one task should be runnning
|
||||||
|
running_tasks = []
|
||||||
|
for x in job_names:
|
||||||
|
if is_job_enqueued(x):
|
||||||
|
running_tasks.append(get_job(x).get_id())
|
||||||
|
|
||||||
|
if running_tasks:
|
||||||
|
frappe.throw(
|
||||||
|
_("{0} is already running for {1}").format(
|
||||||
|
comma_and([get_link_to_form("RQ Job", x) for x in running_tasks]), self.name
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
def validate_doc_status(self):
|
def validate_doc_status(self):
|
||||||
if self.status != "Running":
|
if self.status != "Running":
|
||||||
frappe.throw(
|
frappe.throw(
|
||||||
@@ -123,6 +162,9 @@ class TransactionDeletionRecord(Document):
|
|||||||
get_link_to_form("Transaction Deletion Record", self.name)
|
get_link_to_form("Transaction Deletion Record", self.name)
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
# make sure that job none of tasks are already running
|
||||||
|
job_names = self.generate_job_name_for_all_tasks()
|
||||||
|
self.validate_running_task_for_doc(job_names=job_names)
|
||||||
|
|
||||||
@frappe.whitelist()
|
@frappe.whitelist()
|
||||||
def delete_bins(self):
|
def delete_bins(self):
|
||||||
@@ -136,7 +178,7 @@ class TransactionDeletionRecord(Document):
|
|||||||
self.company,
|
self.company,
|
||||||
)
|
)
|
||||||
self.db_set("delete_bin_data", 1)
|
self.db_set("delete_bin_data", 1)
|
||||||
self.chain_call(method="delete_lead_addresses")
|
self.chain_call(task="Delete Leads and Addresses")
|
||||||
|
|
||||||
def delete_lead_addresses(self):
|
def delete_lead_addresses(self):
|
||||||
"""Delete addresses to which leads are linked"""
|
"""Delete addresses to which leads are linked"""
|
||||||
@@ -178,7 +220,7 @@ class TransactionDeletionRecord(Document):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
self.db_set("delete_leads_and_addresses", 1)
|
self.db_set("delete_leads_and_addresses", 1)
|
||||||
self.chain_call(method="reset_company_values")
|
self.chain_call(task="Reset Company Values")
|
||||||
|
|
||||||
def reset_company_values(self):
|
def reset_company_values(self):
|
||||||
self.validate_doc_status()
|
self.validate_doc_status()
|
||||||
@@ -188,7 +230,7 @@ class TransactionDeletionRecord(Document):
|
|||||||
company_obj.sales_monthly_history = None
|
company_obj.sales_monthly_history = None
|
||||||
company_obj.save()
|
company_obj.save()
|
||||||
self.db_set("reset_company_default_values", 1)
|
self.db_set("reset_company_default_values", 1)
|
||||||
self.chain_call(method="delete_notifications")
|
self.chain_call(task="Clear Notifications")
|
||||||
|
|
||||||
def initialize_doctypes_to_be_deleted_table(self):
|
def initialize_doctypes_to_be_deleted_table(self):
|
||||||
self.validate_doc_status()
|
self.validate_doc_status()
|
||||||
@@ -205,7 +247,7 @@ class TransactionDeletionRecord(Document):
|
|||||||
# Initialize
|
# Initialize
|
||||||
self.populate_doctypes_table(tables, docfield["parent"], docfield["fieldname"], 0)
|
self.populate_doctypes_table(tables, docfield["parent"], docfield["fieldname"], 0)
|
||||||
self.db_set("initialize_doctypes_table", 1)
|
self.db_set("initialize_doctypes_table", 1)
|
||||||
self.chain_call(method="delete_company_transactions")
|
self.chain_call(task="Delete Transactions")
|
||||||
|
|
||||||
def delete_company_transactions(self):
|
def delete_company_transactions(self):
|
||||||
self.validate_doc_status()
|
self.validate_doc_status()
|
||||||
@@ -245,7 +287,8 @@ class TransactionDeletionRecord(Document):
|
|||||||
docfield.doctype, filters={"parent": self.name, "done": 0}, pluck="doctype_name"
|
docfield.doctype, filters={"parent": self.name, "done": 0}, pluck="doctype_name"
|
||||||
)
|
)
|
||||||
if pending_doctypes:
|
if pending_doctypes:
|
||||||
self.chain_call(method="delete_company_transactions")
|
# as method is enqueued after commit, calling itself will not make validate_doc_status to throw
|
||||||
|
self.chain_call(task="Delete Transactions")
|
||||||
else:
|
else:
|
||||||
self.db_set("status", "Completed")
|
self.db_set("status", "Completed")
|
||||||
self.db_set("delete_transactions", 1)
|
self.db_set("delete_transactions", 1)
|
||||||
|
|||||||
Reference in New Issue
Block a user