From 111db12b921ab9664f8009aabff16ec7138921db Mon Sep 17 00:00:00 2001 From: elitonzky Date: Mon, 8 Jan 2024 15:28:35 -0300 Subject: [PATCH 1/3] Multi task processing to vtex data processor and api retry --- marketplace/clients/decorators.py | 30 ++++++++ marketplace/clients/vtex/client.py | 2 + .../services/vtex/utils/data_processor.py | 74 ++++++++++++++----- 3 files changed, 88 insertions(+), 18 deletions(-) create mode 100644 marketplace/clients/decorators.py diff --git a/marketplace/clients/decorators.py b/marketplace/clients/decorators.py new file mode 100644 index 00000000..c0a596cb --- /dev/null +++ b/marketplace/clients/decorators.py @@ -0,0 +1,30 @@ +import time +import functools + +from marketplace.clients.exceptions import CustomAPIException + + +def retry_on_rate_limit(max_attempts=11, start_sleep_time=1, factor=2): + def decorator_retry(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + attempts, sleep_time = 0, start_sleep_time + while attempts < max_attempts: + try: + return func(*args, **kwargs) + except ( + CustomAPIException + ) as e: # TODO: Map only timeout errors or errors from many requests + print( + f"Retrying... Attempt {attempts + 1} after {sleep_time} seconds, {str(e)}" + ) + time.sleep(sleep_time) + attempts += 1 + sleep_time *= factor + + print("Max retry attempts reached. Raising exception.") + raise Exception("Rate limit exceeded, max retry attempts reached.") + + return wrapper + + return decorator_retry diff --git a/marketplace/clients/vtex/client.py b/marketplace/clients/vtex/client.py index c1bb926f..83b46f42 100644 --- a/marketplace/clients/vtex/client.py +++ b/marketplace/clients/vtex/client.py @@ -1,4 +1,5 @@ from marketplace.clients.base import RequestClient +from marketplace.clients.decorators import retry_on_rate_limit class VtexAuthorization(RequestClient): @@ -70,6 +71,7 @@ def list_active_sellers(self, domain): sellers_data = response.json() return [seller["id"] for seller in sellers_data["items"] if seller["isActive"]] + @retry_on_rate_limit() def get_product_details(self, sku_id, domain): url = ( f"https://{domain}/api/catalog_system/pvt/sku/stockkeepingunitbyid/{sku_id}" diff --git a/marketplace/services/vtex/utils/data_processor.py b/marketplace/services/vtex/utils/data_processor.py index e5e7dad4..c47c8824 100644 --- a/marketplace/services/vtex/utils/data_processor.py +++ b/marketplace/services/vtex/utils/data_processor.py @@ -1,6 +1,9 @@ +import concurrent.futures import pandas as pd import io import dataclasses +import os +import time from dataclasses import dataclass @@ -69,27 +72,62 @@ def extract_fields(product_details, availability_details) -> FacebookProductDTO: @staticmethod def process_product_data( - skus_ids, active_sellers, service, domain, rules, update_product=False + skus_ids, + active_sellers, + service, + domain, + rules, + update_product=False, + max_workers=10, ): - facebook_products = [] - for sku_id in skus_ids: - product_details = service.get_product_details(sku_id, domain) - for seller_id in active_sellers: - availability_details = service.simulate_cart_for_seller( - sku_id, seller_id, domain - ) - if update_product is False and not availability_details["is_available"]: - continue + print("Process product data") + active_sellers = [1] + num_cpus = os.cpu_count() + all_facebook_products = [] - product_dto = DataProcessor.extract_fields( - product_details, availability_details + with concurrent.futures.ThreadPoolExecutor(max_workers=num_cpus) as executor: + futures = [ + executor.submit( + DataProcessor.process_single_sku, + sku_id, + active_sellers, + service, + domain, + rules, + update_product, ) - params = {"seller_id": seller_id} - for rule in rules: - if not rule.apply(product_dto, **params): - break - else: - facebook_products.append(product_dto) + for sku_id in skus_ids + ] + time.sleep( + 15 + ) # TODO: Test whether by waiting this time the processes are terminated correctly + for future in concurrent.futures.as_completed(futures): + try: + all_facebook_products.extend(future.result()) + except Exception as e: + print(f"Exception in thread: {e}") + + return all_facebook_products + + @staticmethod + def process_single_sku( + sku_id, active_sellers, service, domain, rules, update_product + ): + facebook_products = [] + product_details = service.get_product_details(sku_id, domain) + for seller_id in active_sellers: + availability_details = service.simulate_cart_for_seller( + sku_id, seller_id, domain + ) + if update_product is False and not availability_details["is_available"]: + continue + + product_dto = DataProcessor.extract_fields( + product_details, availability_details + ) + params = {"seller_id": seller_id} + if all(rule.apply(product_dto, **params) for rule in rules): + facebook_products.append(product_dto) return facebook_products From 512324249fa2a6b314bc25616e13b82587a9ba8b Mon Sep 17 00:00:00 2001 From: elitonzky Date: Thu, 11 Jan 2024 11:03:30 -0300 Subject: [PATCH 2/3] Add the retry_on_exception decorator to more methods --- marketplace/clients/decorators.py | 6 ++-- marketplace/clients/flows/client.py | 3 ++ marketplace/clients/vtex/client.py | 7 ++-- .../product/product_facebook_manage.py | 32 ++----------------- marketplace/services/vtex/generic_service.py | 6 ++-- .../services/vtex/utils/data_processor.py | 4 +-- marketplace/wpp_products/tasks.py | 2 +- 7 files changed, 18 insertions(+), 42 deletions(-) diff --git a/marketplace/clients/decorators.py b/marketplace/clients/decorators.py index c0a596cb..8e617e59 100644 --- a/marketplace/clients/decorators.py +++ b/marketplace/clients/decorators.py @@ -1,10 +1,8 @@ import time import functools -from marketplace.clients.exceptions import CustomAPIException - -def retry_on_rate_limit(max_attempts=11, start_sleep_time=1, factor=2): +def retry_on_exception(max_attempts=11, start_sleep_time=1, factor=2): def decorator_retry(func): @functools.wraps(func) def wrapper(*args, **kwargs): @@ -13,7 +11,7 @@ def wrapper(*args, **kwargs): try: return func(*args, **kwargs) except ( - CustomAPIException + Exception ) as e: # TODO: Map only timeout errors or errors from many requests print( f"Retrying... Attempt {attempts + 1} after {sleep_time} seconds, {str(e)}" diff --git a/marketplace/clients/flows/client.py b/marketplace/clients/flows/client.py index 3ae08b82..9848b0ae 100644 --- a/marketplace/clients/flows/client.py +++ b/marketplace/clients/flows/client.py @@ -1,6 +1,8 @@ """Client for connection with flows""" from django.conf import settings + from marketplace.clients.base import RequestClient +from marketplace.clients.decorators import retry_on_exception class InternalAuthentication(RequestClient): @@ -89,6 +91,7 @@ def update_status_catalog(self, flow_object_uuid, fba_catalog_id, is_active: boo ) return response + @retry_on_exception() def update_vtex_products(self, products, flow_object_uuid, dict_catalog): data = { "catalog": dict_catalog, diff --git a/marketplace/clients/vtex/client.py b/marketplace/clients/vtex/client.py index 83b46f42..934710ec 100644 --- a/marketplace/clients/vtex/client.py +++ b/marketplace/clients/vtex/client.py @@ -1,5 +1,5 @@ from marketplace.clients.base import RequestClient -from marketplace.clients.decorators import retry_on_rate_limit +from marketplace.clients.decorators import retry_on_exception class VtexAuthorization(RequestClient): @@ -46,6 +46,7 @@ def is_valid_credentials(self, domain): except Exception: return False + @retry_on_exception() def list_all_products_sku_ids(self, domain, page_size=1000): all_skus = [] page = 1 @@ -64,6 +65,7 @@ def list_all_products_sku_ids(self, domain, page_size=1000): return all_skus + @retry_on_exception() def list_active_sellers(self, domain): url = f"https://{domain}/api/seller-register/pvt/sellers" headers = self._get_headers() @@ -71,7 +73,7 @@ def list_active_sellers(self, domain): sellers_data = response.json() return [seller["id"] for seller in sellers_data["items"] if seller["isActive"]] - @retry_on_rate_limit() + @retry_on_exception() def get_product_details(self, sku_id, domain): url = ( f"https://{domain}/api/catalog_system/pvt/sku/stockkeepingunitbyid/{sku_id}" @@ -80,6 +82,7 @@ def get_product_details(self, sku_id, domain): response = self.make_request(url, method="GET", headers=headers) return response.json() + @retry_on_exception() def pub_simulate_cart_for_seller(self, sku_id, seller_id, domain): cart_simulation_url = f"https://{domain}/api/checkout/pub/orderForms/simulation" payload = {"items": [{"id": sku_id, "quantity": 1, "seller": seller_id}]} diff --git a/marketplace/services/product/product_facebook_manage.py b/marketplace/services/product/product_facebook_manage.py index 4449c55d..d33a66a7 100644 --- a/marketplace/services/product/product_facebook_manage.py +++ b/marketplace/services/product/product_facebook_manage.py @@ -1,6 +1,5 @@ from typing import List -from django.db import transaction from django.contrib.auth import get_user_model from marketplace.services.vtex.utils.data_processor import FacebookProductDTO @@ -11,34 +10,7 @@ class ProductFacebookManager: - def save_products_on_database( - self, products: List[FacebookProductDTO], catalog, product_feed - ): - product_instances = [ - Product( - facebook_product_id=dto.id, - title=dto.title, - description=dto.description, - availability=dto.availability, - condition=dto.condition, - price=dto.price, - link=dto.link, - image_link=dto.image_link, - brand=dto.brand, - sale_price=dto.sale_price, - catalog=catalog, - created_by=catalog.created_by, - feed=product_feed, - ) - for dto in products - ] - - with transaction.atomic(): - Product.objects.bulk_create(product_instances) - - return True - - def update_products_on_database( + def create_or_update_products_on_database( self, products: List[FacebookProductDTO], catalog, product_feed ): products_to_update = [] @@ -47,7 +19,7 @@ def update_products_on_database( for dto in products: try: product = Product.objects.get( - facebook_product_id=dto.id, catalog=catalog, feed=product_feed + facebook_product_id=dto.id, catalog=catalog ) # TODO: Optimize to make a single query at the bank product.title = dto.title product.description = dto.description diff --git a/marketplace/services/vtex/generic_service.py b/marketplace/services/vtex/generic_service.py index eb31d9b7..8e0b94c6 100644 --- a/marketplace/services/vtex/generic_service.py +++ b/marketplace/services/vtex/generic_service.py @@ -106,7 +106,9 @@ def first_product_insert(self, credentials: APICredentials, catalog: Catalog): ) products_csv = pvt_service.data_processor.products_to_csv(products) product_feed = self._send_products_to_facebook(products_csv, catalog) - self.product_manager.save_products_on_database(products, catalog, product_feed) + self.product_manager.create_or_update_products_on_database( + products, catalog, product_feed + ) self.app_manager.initial_sync_products_completed(catalog.vtex_app) return pvt_service.data_processor.convert_dtos_to_dicts_list(products) @@ -125,7 +127,7 @@ def webhook_product_insert( products_csv = pvt_service.data_processor.products_to_csv(products_dto) self._update_products_on_facebook(products_csv, catalog, product_feed) - self.product_manager.update_products_on_database( + self.product_manager.create_or_update_products_on_database( products_dto, catalog, product_feed ) return pvt_service.data_processor.convert_dtos_to_dicts_list(products_dto) diff --git a/marketplace/services/vtex/utils/data_processor.py b/marketplace/services/vtex/utils/data_processor.py index c47c8824..462671cc 100644 --- a/marketplace/services/vtex/utils/data_processor.py +++ b/marketplace/services/vtex/utils/data_processor.py @@ -78,10 +78,7 @@ def process_product_data( domain, rules, update_product=False, - max_workers=10, ): - print("Process product data") - active_sellers = [1] num_cpus = os.cpu_count() all_facebook_products = [] @@ -104,6 +101,7 @@ def process_product_data( for future in concurrent.futures.as_completed(futures): try: all_facebook_products.extend(future.result()) + print("total products extend to", len(all_facebook_products)) except Exception as e: print(f"Exception in thread: {e}") diff --git a/marketplace/wpp_products/tasks.py b/marketplace/wpp_products/tasks.py index 8f66f596..e29cd63a 100644 --- a/marketplace/wpp_products/tasks.py +++ b/marketplace/wpp_products/tasks.py @@ -162,9 +162,9 @@ def task_update_vtex_products(**kwargs): app_uuid = kwargs.get("app_uuid") webhook_data = kwargs.get("webhook_data") - vtex_app = App.objects.get(uuid=app_uuid, configured=True, code="vtex") try: + vtex_app = App.objects.get(uuid=app_uuid, configured=True, code="vtex") domain, app_key, app_token = vtex_service.get_vtex_credentials_or_raise( vtex_app ) From 995096dc994b9f3cdb6d973a1e338b4170be851d Mon Sep 17 00:00:00 2001 From: elitonzky Date: Thu, 11 Jan 2024 11:09:59 -0300 Subject: [PATCH 3/3] Changes order of execution of rules --- marketplace/services/vtex/generic_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/marketplace/services/vtex/generic_service.py b/marketplace/services/vtex/generic_service.py index 8e0b94c6..67c33b99 100644 --- a/marketplace/services/vtex/generic_service.py +++ b/marketplace/services/vtex/generic_service.py @@ -79,9 +79,9 @@ def configure(self, app, credentials: APICredentials, wpp_cloud_uuid) -> App: app.config["wpp_cloud_uuid"] = wpp_cloud_uuid app.config["initial_sync_completed"] = False app.config["rules"] = [ + "exclude_alcoholic_drinks", "calculate_by_weight", "currency_pt_br", - "exclude_alcoholic_drinks", "unifies_id_with_seller", ] app.configured = True