Skip to content

Commit

Permalink
Fix IAP worker to prevent delivery duplication
Browse files Browse the repository at this point in the history
- Separate create tx and stage tx
    - Create transactions first and save
    - Then stage it after
- This prevents delivery duplication casud by DB save missing
  • Loading branch information
U-lis committed Dec 13, 2024
1 parent ef110c2 commit 24b15d4
Showing 1 changed file with 71 additions and 36 deletions.
107 changes: 71 additions & 36 deletions worker/worker/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from dataclasses import dataclass
from typing import List, Optional, Tuple, Union

from sqlalchemy import create_engine, select
from sqlalchemy import create_engine, select, func
from sqlalchemy.orm import Session, joinedload, scoped_session, sessionmaker

from common import logger
Expand All @@ -23,6 +23,8 @@
from common.utils.receipt import PlanetID
from common.utils.transaction import create_unsigned_tx, append_signature_to_unsigned_tx

STAGE = os.environ.get("STAGE")
REGION_NAME = os.environ.get("REGION_NAME", "us-east-2")
DB_URI = os.environ.get("DB_URI")
db_password = fetch_secrets(os.environ.get("REGION_NAME"), os.environ.get("SECRET_ARN"))["password"]
DB_URI = DB_URI.replace("[DB_PASSWORD]", db_password)
Expand All @@ -47,7 +49,8 @@ class SQSMessageRecord:
eventSourceARN: str
awsRegion: str

# Avoid TypeError when init dataclass. https://stackoverflow.com/questions/54678337/how-does-one-ignore-extra-arguments-passed-to-a-dataclass # noqa
# Avoid TypeError when init dataclass.
# https://stackoverflow.com/questions/54678337/how-does-one-ignore-extra-arguments-passed-to-a-dataclass
def __init__(self, **kwargs):
names = set([f.name for f in dataclasses.fields(self)])
for k, v in kwargs.items():
Expand All @@ -65,27 +68,18 @@ def __post_init__(self):
self.Records = [SQSMessageRecord(**x) for x in self.Records]


def process(sess: Session, message: SQSMessageRecord, nonce: int = None) -> Tuple[
Tuple[bool, str, Optional[str]], int, bytes
]:
stage = os.environ.get("STAGE", "development")
region_name = os.environ.get("REGION_NAME", "us-east-2")
logging.debug(f"STAGE: {stage} || REGION: {region_name}")
account = Account(fetch_kms_key_id(stage, region_name))
planet_id: PlanetID = PlanetID(bytes(message.body["planet_id"], 'utf-8'))

gql = GQL(GQL_DICT[planet_id], HEADLESS_GQL_JWT_SECRET)
if not nonce:
nonce = gql.get_next_nonce(account.address)
def create_tx(sess: Session, account: Account, receipt: Receipt) -> bytes:
if receipt.tx is not None:
return bytes.fromhex(receipt.tx)

product = sess.scalar(
select(Product)
.options(joinedload(Product.fav_list)).options(joinedload(Product.fungible_item_list))
.where(Product.id == message.body.get("product_id"))
.where(Product.id == receipt.product_id)
)

package_name = PackageName(message.body.get("package_name"))
avatar_address = Address(message.body.get("avatar_addr"))
package_name = PackageName(receipt.package_name)
avatar_address = Address(receipt.avatar_addr)
memo = json.dumps({"iap":
{"g_sku": product.google_sku,
"a_sku": product.apple_sku_k if package_name == PackageName.NINE_CHRONICLES_K
Expand All @@ -96,25 +90,35 @@ def process(sess: Session, message: SQSMessageRecord, nonce: int = None) -> Tupl
for item in product.fungible_item_list:
claim_data.append(FungibleAssetValue.from_raw_data(
ticker=item.fungible_item_id, decimal_places=0,
amount=item.amount * (5 if planet_id in (PlanetID.THOR, PlanetID.THOR_INTERNAL) else 1)
amount=item.amount * (5 if receipt.planet_id in (PlanetID.THOR, PlanetID.THOR_INTERNAL) else 1)
))
for fav in product.fav_list:
claim_data.append(FungibleAssetValue.from_raw_data(
ticker=fav.ticker, decimal_places=fav.decimal_places,
amount=fav.amount * (5 if planet_id in (PlanetID.THOR, PlanetID.THOR_INTERNAL) else 1)
amount=fav.amount * (5 if receipt.planet_id in (PlanetID.THOR, PlanetID.THOR_INTERNAL) else 1)
))

action = ClaimItems(claim_data=[{"avatarAddress": avatar_address, "fungibleAssetValues": claim_data}], memo=memo)

unsigned_tx = create_unsigned_tx(
planet_id=planet_id,
public_key=account.pubkey.hex(), address=account.address, nonce=nonce,
plain_value=action.plain_value, timestamp=datetime.datetime.utcnow() + datetime.timedelta(days=7)
planet_id=PlanetID(receipt.planet_id),
public_key=account.pubkey.hex(), address=account.address, nonce=receipt.nonce,
plain_value=action.plain_value,
timestamp=datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(days=7)
)

signature = account.sign_tx(unsigned_tx)
signed_tx = append_signature_to_unsigned_tx(unsigned_tx, signature)
return gql.stage(signed_tx), nonce, signed_tx
return signed_tx


def stage_tx(receipt: Receipt) -> Tuple[bool, str, Optional[str]]:
stage = os.environ.get("STAGE", "development")
region_name = os.environ.get("REGION_NAME", "us-east-2")
logging.debug(f"STAGE: {stage} || REGION: {region_name}")
gql = GQL(GQL_DICT[receipt.planet_id], HEADLESS_GQL_JWT_SECRET)

return gql.stage(bytes.fromhex(receipt.tx))


def handle(event, context):
Expand All @@ -131,32 +135,63 @@ def handle(event, context):

results = []
sess = scoped_session(sessionmaker(bind=engine))
account = Account(fetch_kms_key_id(STAGE, REGION_NAME))
gql_dict = {planet: GQL(url, HEADLESS_GQL_JWT_SECRET) for planet, url in GQL_DICT.items()}
db_nonce_dict = {
x.planet_id: x.nonce
for x in sess.execute(select(Receipt.planet_id, func.max(Receipt.nonce)).group_by(Receipt.planet_id)).all()
}
nonce_dict = {}
target_list = []

# Set nonce first before process
uuid_list = [x.body.get("uuid") for x in message.Records if x.body.get("uuid") is not None]
receipt_dict = {str(x.uuid): x for x in sess.scalars(select(Receipt).where(Receipt.uuid.in_(uuid_list)))}
nonce_dict = {}
for i, record in enumerate(message.Records):
try:
receipt = receipt_dict.get(record.body.get("uuid"))
logger.debug(f"UUID : {record.body.get('uuid')}")
receipt = receipt_dict.get(record.body.get("uuid"))
if not receipt:
success, msg, tx_id = False, f"{record.body.get('uuid')} is not exist in Receipt history", None
# Receipt not found
success, msg, tx_id = False, f"{receipt.uuid} is not exist in Receipt", None
logger.error(msg)
elif receipt.tx_id:
success, msg, tx_id = False, f"{record.body.get('uuid')} is already treated with Tx : {receipt.tx_id}", None
# Tx already staged
success, msg, tx_id = False, f"{receipt.uuid} is already treated with Tx : {receipt.tx_id}", None
logger.warning(msg)
elif receipt.tx:
# Tx already created
target_list.append((receipt, record))
logger.info(f"{receipt.uuid} already has created tx with nonce {receipt.nonce}")
else:
# Fresh receipt
receipt.tx_status = TxStatus.CREATED
(success, msg, tx_id), nonce_dict[receipt.planet_id], signed_tx = process(
sess, record, nonce=nonce_dict.get(receipt.planet_id, None)
receipt.nonce = max( # max nonce of
nonce_dict.get( # current handling nonce (or nonce in blockchain)
receipt.planet_id,
gql_dict[receipt.planet_id].get_next_nonce(account.address)
),
db_nonce_dict.get(receipt.planet_id, 0) # DB stored nonce
)
receipt.nonce = nonce_dict[receipt.planet_id]
if success:
nonce_dict[receipt.planet_id] += 1
receipt.tx_id = tx_id
receipt.tx_status = TxStatus.STAGED
receipt.tx = signed_tx.hex()
receipt.tx = create_tx(sess, account, receipt).hex()
nonce_dict[receipt.planet_id] = receipt.nonce + 1
target_list.append((receipt, record))
logger.info(f"{receipt.uuid}: Tx created with nonce: {receipt.nonce}")
sess.add(receipt)
sess.commit()
except Exception as e:
logger.error(f"Error occurred: {record.body.get('uuid')} :: {e}")
continue
sess.commit()

# Stage created tx
logger.info(f"Stage {len(target_list)} receipts")
for receipt, record in target_list:
try:
success, msg, tx_id = stage_tx(receipt)
receipt.tx_id = tx_id
receipt.tx_status = TxStatus.STAGED
sess.add(receipt)
sess.commit()

result = {
"sqs_message_id": record.messageId,
Expand Down

0 comments on commit 24b15d4

Please sign in to comment.