Skip to content

Commit

Permalink
Adds try-except, messages log and remove retry from flows.update_vtex…
Browse files Browse the repository at this point in the history
…_products
  • Loading branch information
elitonzky committed Feb 29, 2024
1 parent fea9f0c commit 3b9728a
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 37 deletions.
2 changes: 0 additions & 2 deletions marketplace/clients/flows/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from django.conf import settings

from marketplace.clients.base import RequestClient
from marketplace.clients.decorators import retry_on_exception


class InternalAuthentication(RequestClient):
Expand Down Expand Up @@ -91,7 +90,6 @@ def update_status_catalog(self, flow_object_uuid, fba_catalog_id, is_active: boo
)
return response

@retry_on_exception()
def update_vtex_products(self, products, flow_object_uuid, dict_catalog):
data = {
"catalog": dict_catalog,
Expand Down
19 changes: 18 additions & 1 deletion marketplace/services/vtex/generic_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ def _wait_for_upload_completion(self, upload_id) -> bool:
return True

Check warning on line 253 in marketplace/services/vtex/generic_service.py

View check run for this annotation

Codecov / codecov/patch

marketplace/services/vtex/generic_service.py#L252-L253

Added lines #L252 - L253 were not covered by tests

print(

Check warning on line 255 in marketplace/services/vtex/generic_service.py

View check run for this annotation

Codecov / codecov/patch

marketplace/services/vtex/generic_service.py#L255

Added line #L255 was not covered by tests
f"Waiting {wait_time} seconds to get feed: {self.feed_id} upload status."
f"Waiting {wait_time} seconds to get feed: {self.feed_id} upload {upload_id} status."
)
time.sleep(wait_time)
total_wait_time += wait_time
Expand All @@ -271,6 +271,7 @@ def first_product_insert_with_catalog(cls, vtex_app: App, catalog_id: str):
wpp_cloud = cls._get_wpp_cloud(wpp_cloud_uuid)

catalog = cls._get_or_sync_catalog(wpp_cloud, catalog_id)
cls._delete_existing_feeds_ifexists(catalog)

Check warning on line 274 in marketplace/services/vtex/generic_service.py

View check run for this annotation

Codecov / codecov/patch

marketplace/services/vtex/generic_service.py#L274

Added line #L274 was not covered by tests
cls._link_catalog_to_vtex_app_if_needed(catalog, vtex_app)

cls._send_insert_task(credentials, catalog)
Expand Down Expand Up @@ -338,6 +339,22 @@ def _link_catalog_to_vtex_app_if_needed(catalog, vtex_app):
f"Catalog {catalog.name} successfully linked to VTEX app: {vtex_app.uuid}."
)

@staticmethod
def _delete_existing_feeds_ifexists(catalog):
"""Deletes existing feeds linked to the catalog and logs their IDs."""
feeds = catalog.feeds.all()
total = feeds.count()
if total > 0:
print(f"Deleting {total} feed(s) linked to catalog {catalog.name}.")
for feed in feeds:
print(f"Deleting feed with ID {feed.facebook_feed_id}.")
feed.delete()
print(

Check warning on line 352 in marketplace/services/vtex/generic_service.py

View check run for this annotation

Codecov / codecov/patch

marketplace/services/vtex/generic_service.py#L345-L352

Added lines #L345 - L352 were not covered by tests
f"All feeds linked to catalog {catalog.name} have been successfully deleted."
)
else:
print(f"No feeds linked to catalog {catalog.name} to delete.")

Check warning on line 356 in marketplace/services/vtex/generic_service.py

View check run for this annotation

Codecov / codecov/patch

marketplace/services/vtex/generic_service.py#L356

Added line #L356 was not covered by tests

@staticmethod
def _send_insert_task(credentials, catalog):
from marketplace.celery import app as celery_app
Expand Down
12 changes: 11 additions & 1 deletion marketplace/webhooks/vtex/product_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,16 @@
class VtexProductUpdateWebhook(APIView):
authentication_classes = []
permission_classes = [AllowAny]
queue_manager_class = WebhookQueueManager

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._queue_manager = None

def get_queue_manager(
self, app_uuid, sku_id
) -> WebhookQueueManager: # pragma: no cover
return self.queue_manager_class(app_uuid, sku_id)

def post(self, request, app_uuid):
app = self.get_app(app_uuid)
Expand All @@ -24,7 +34,7 @@ def post(self, request, app_uuid):
)

sku_id = self.get_sku_id()
queue_manager = WebhookQueueManager(app_uuid, sku_id)
queue_manager = self.get_queue_manager(app_uuid, sku_id)
if queue_manager.have_processing_product():
queue_manager.enqueue_webhook_data(request.data)
return Response(
Expand Down
41 changes: 40 additions & 1 deletion marketplace/webhooks/vtex/tests/test_product_updates.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import uuid

from unittest.mock import patch
from unittest.mock import Mock, patch

from django.urls import reverse

Expand All @@ -9,6 +9,19 @@
from marketplace.applications.models import App


class MockWebhookQueueManager:
def __init__(self, app_uuid, sku_id, processing_product=False):
self.app_uuid = app_uuid
self.sku_id = sku_id
self.processing_product = processing_product

def have_processing_product(self):
return self.processing_product

def enqueue_webhook_data(self, data):
pass


class SetUpTestBase(APIBaseTestCase):
view_class = VtexProductUpdateWebhook

Expand Down Expand Up @@ -66,6 +79,16 @@ def setUp(self):
self.mock_send_task = patcher_celery.start()
self.addCleanup(patcher_celery.stop)

# Mock Webhook manager
mock_webhook_manager = MockWebhookQueueManager("1", "2")
patcher_fb = patch.object(
self.view_class,
"get_queue_manager",
Mock(return_value=mock_webhook_manager),
)
self.addCleanup(patcher_fb.stop)
patcher_fb.start()


class WebhookTestCase(MockServiceTestCase):
def test_request_ok(self):
Expand Down Expand Up @@ -97,3 +120,19 @@ def test_webhook_with_app_not_found(self):
url, {"data": "webhook_payload"}, app_uuid=app_uuid
)
self.assertEqual(response.status_code, 404)

def test_webhook_with_processing_product(self):
mock_webhook_manager = MockWebhookQueueManager(
"1", "2", processing_product=True
)

with patch.object(
self.view_class, "get_queue_manager", return_value=mock_webhook_manager
):
response = self.request.post(self.url, self.body, app_uuid=self.app.uuid)

self.assertEqual(response.status_code, 200)
self.assertEqual(
response.json,
{"message": "Webhook product update added to the processing queue"},
)
87 changes: 55 additions & 32 deletions marketplace/wpp_products/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,43 +114,60 @@ def _sync_local_catalogs(self, all_catalogs_id, local_catalog_ids):

@celery_app.task(name="task_insert_vtex_products")
def task_insert_vtex_products(**kwargs):
print("Starting first product insert")
print("Starting task: 'task_insert_vtex_products'")
vtex_service = ProductInsertionService()
flows_service = FlowsService(FlowsClient())

credentials = kwargs.get("credentials")
catalog_uuid = kwargs.get("catalog_uuid")

catalog = Catalog.objects.get(uuid=catalog_uuid)
if not all([credentials, catalog_uuid]):
logger.error(
"Missing required parameters [credentials, catalog_uuid] for task_insert_vtex_products"
)
return

try:
catalog = Catalog.objects.get(uuid=catalog_uuid)
api_credentials = APICredentials(
app_key=credentials.get("app_key"),
app_token=credentials.get("app_token"),
domain=credentials.get("domain"),
app_key=credentials["app_key"],
app_token=credentials["app_token"],
domain=credentials["domain"],
)
print(f"Starting first product insert for catalog: {str(catalog.name)}")
products = vtex_service.first_product_insert(api_credentials, catalog)
if products is None:
print("There are no products to be shipped after processing the rules")
return

except Exception as e:
logger.exception(
f"An error occurred during the first insertion of vtex products for catalog {catalog.name}, {e}"
)
return

try:
dict_catalog = {
"name": catalog.name,
"facebook_catalog_id": catalog.facebook_catalog_id,
}
flows_service.update_vtex_products(
products, str(catalog.app.flow_object_uuid), dict_catalog
)
print("Products created and sent to flows successfully")
print("Products successfully sent to flows")
except Exception as e:
logger.error(
f"Error on insert vtex products for catalog {str(catalog.uuid)}, {e}"
f"Error on send vtex products to flows for catalog {catalog_uuid}, {e}"
)

print(
f"finishing creation products, task: 'task_insert_vtex_products' catalog {catalog.name}"
)


@celery_app.task(name="task_update_vtex_products")
def task_update_vtex_products(**kwargs):
print("Starting product update")
print("Starting task: 'task_update_vtex_products'")
vtex_base_service = VtexServiceBase()
flows_service = FlowsService(FlowsClient())

Expand All @@ -173,35 +190,40 @@ def task_update_vtex_products(**kwargs):
app_key=app_key, app_token=app_token, domain=domain
)
for catalog in vtex_app.vtex_catalogs.all():
if catalog.feeds.all().exists():
product_feed = catalog.feeds.all().first() # The first feed created
print(f"Starting product update for app: {str(vtex_app.uuid)}")

vtex_update_service = ProductUpdateService(
api_credentials=api_credentials,
catalog=catalog,
webhook_data=webhook_data,
product_feed=product_feed,
if not catalog.feeds.all().exists():
logger.error(
f"No data feed found in the database. Vtex app: {vtex_app.uuid}"
)
products = vtex_update_service.webhook_product_insert()
if products is None:
logger.info(
f"No products to process after treatment for VTEX app {app_uuid}. Task ending."
)
continue

dict_catalog = {
"name": catalog.name,
"facebook_catalog_id": catalog.facebook_catalog_id,
}
flows_service.update_vtex_products(
products, str(catalog.app.flow_object_uuid), dict_catalog
continue

product_feed = catalog.feeds.all().first() # The first feed created
print(f"Starting product update for app: {str(vtex_app.uuid)}")

vtex_update_service = ProductUpdateService(
api_credentials=api_credentials,
catalog=catalog,
webhook_data=webhook_data,
product_feed=product_feed,
)
products = vtex_update_service.webhook_product_insert()
if products is None:
print(
f"No products to process after treatment for VTEX app {app_uuid}. Task ending."
)
print("Webhook Products updated and sent to flows successfully")
continue

dict_catalog = {
"name": catalog.name,
"facebook_catalog_id": catalog.facebook_catalog_id,
}
flows_service.update_vtex_products(
products, str(catalog.app.flow_object_uuid), dict_catalog
)
print("Products successfully sent to flows")

except Exception as e:
logger.error(
f"Error on updating Webhook vtex products for app {app_uuid}, {str(e)}"
f"An error occurred during the updating Webhook vtex products for app {app_uuid}, {str(e)}"
)

# Checking and processing pending updates in the queue
Expand All @@ -219,4 +241,5 @@ def task_update_vtex_products(**kwargs):
)
print(f"Processing queued webhook data for SKU {sku_id} and app:{app_uuid}.")

print("Finishing update product, task: 'task_update_vtex_products'")
return

0 comments on commit 3b9728a

Please sign in to comment.