Skip to content

Commit

Permalink
Fix Add redis.lock on the task task_update_vtex_products
Browse files Browse the repository at this point in the history
  • Loading branch information
elitonzky committed Mar 4, 2024
1 parent d3dc231 commit 2f7446c
Showing 1 changed file with 51 additions and 43 deletions.
94 changes: 51 additions & 43 deletions marketplace/wpp_products/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,53 +178,61 @@ def task_update_vtex_products(**kwargs):
queue_manager = WebhookQueueManager(app_uuid, sku_id)
key = queue_manager.get_processing_key()
redis = get_redis_connection()
with redis.lock(key):
try:
vtex_app = App.objects.get(uuid=app_uuid, configured=True, code="vtex")
(
domain,
app_key,
app_token,
) = vtex_base_service.get_vtex_credentials_or_raise(vtex_app)
api_credentials = APICredentials(
app_key=app_key, app_token=app_token, domain=domain
)
for catalog in vtex_app.vtex_catalogs.all():
if not catalog.feeds.all().exists():
logger.error(
f"No data feed found in the database. Vtex app: {vtex_app.uuid}"
)
continue

product_feed = catalog.feeds.all().first() # The first feed created
print(f"Starting product update for app: {str(vtex_app.uuid)}")

vtex_update_service = ProductUpdateService(
api_credentials=api_credentials,
catalog=catalog,
webhook_data=webhook_data,
product_feed=product_feed,
if queue_manager.have_processing_product():
queue_manager.enqueue_webhook_data(webhook_data)
print(
f"Product '{sku_id}' is already being updated, the request has been added to the queue."
)
return
else:
with redis.lock(key):
try:
vtex_app = App.objects.get(uuid=app_uuid, configured=True, code="vtex")
(
domain,
app_key,
app_token,
) = vtex_base_service.get_vtex_credentials_or_raise(vtex_app)
api_credentials = APICredentials(
app_key=app_key, app_token=app_token, domain=domain
)
products = vtex_update_service.webhook_product_insert()
if products is None:
print(
f"No products to process after treatment for VTEX app {app_uuid}. Task ending."
for catalog in vtex_app.vtex_catalogs.all():
if not catalog.feeds.all().exists():
logger.error(
f"No data feed found in the database. Vtex app: {vtex_app.uuid}"
)
continue

product_feed = catalog.feeds.all().first() # The first feed created
print(f"Starting product update for app: {str(vtex_app.uuid)}")

vtex_update_service = ProductUpdateService(
api_credentials=api_credentials,
catalog=catalog,
webhook_data=webhook_data,
product_feed=product_feed,
)
continue

dict_catalog = {
"name": catalog.name,
"facebook_catalog_id": catalog.facebook_catalog_id,
}
flows_service.update_vtex_products(
products, str(catalog.app.flow_object_uuid), dict_catalog
)
print("Products successfully sent to flows")
products = vtex_update_service.webhook_product_insert()
if products is None:
print(
f"No products to process after treatment for VTEX app {app_uuid}. Task ending."
)
continue

dict_catalog = {
"name": catalog.name,
"facebook_catalog_id": catalog.facebook_catalog_id,
}
flows_service.update_vtex_products(
products, str(catalog.app.flow_object_uuid), dict_catalog
)
print("Products successfully sent to flows")

except Exception as e:
logger.error(
f"An error occurred during the updating Webhook vtex products for app {app_uuid}, {str(e)}"
)
except Exception as e:
logger.error(
f"An error occurred during the updating Webhook vtex products for app {app_uuid}, {str(e)}"
)

# Checking and processing pending updates in the queue
queued_webhook_data = queue_manager.dequeue_webhook_data()
Expand Down

0 comments on commit 2f7446c

Please sign in to comment.