Skip to content

Commit

Permalink
Add async support to ingest handlers
Browse files Browse the repository at this point in the history
SDESK-7472
  • Loading branch information
eos87 committed Jan 16, 2025
1 parent 154dedf commit 283dc70
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 22 deletions.
4 changes: 2 additions & 2 deletions apps/rules/routing_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def on_delete(self, doc):
if self.backend.find_one("ingest_providers", req=None, routing_scheme=doc[ID_FIELD]):
raise SuperdeskApiError.forbiddenError(_("Routing scheme is applied to channel(s). It cannot be deleted."))

def apply_routing_scheme(self, ingest_item, provider, routing_scheme):
async def apply_routing_scheme(self, ingest_item, provider, routing_scheme):
"""Applies routing scheme and applies appropriate action (fetch, publish) to the item
:param item: ingest item to which routing scheme needs to applied.
Expand Down Expand Up @@ -225,7 +225,7 @@ def apply_routing_scheme(self, ingest_item, provider, routing_scheme):
% (item_id, routing_scheme.get("name"), rule.get("name"))
)

rule_handler.apply_rule(rule, ingest_item, routing_scheme)
await rule_handler.apply_rule(rule, ingest_item, routing_scheme)
if rule.get("actions", {}).get("exit", False):
logger.info(
"Exiting routing scheme. Item: %s . Routing Scheme: %s. "
Expand Down
13 changes: 6 additions & 7 deletions apps/rules/rule_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@
# AUTHORS and LICENSE files distributed with this source code, or
# at https://www.sourcefabric.org/superdesk/license

from typing import Dict, Any
from typing import Dict, Any, Awaitable
import logging

from quart_babel import lazy_gettext
from quart_babel.speaklater import LazyString
from quart_babel import lazy_gettext, LazyString

from superdesk.core import get_app_config
from superdesk.resource_fields import ID_FIELD
Expand All @@ -30,10 +29,10 @@ class RoutingRuleHandler:
supported_configs: Dict[str, bool]
default_values: Dict[str, Any]

def can_handle(self, rule, ingest_item, routing_scheme) -> bool:
async def can_handle(self, rule, ingest_item, routing_scheme) -> Awaitable[bool]:
raise NotImplementedError()

def apply_rule(self, rule, ingest_item, routing_scheme):
async def apply_rule(self, rule, ingest_item, routing_scheme):
raise NotImplementedError()


Expand Down Expand Up @@ -121,12 +120,12 @@ class DeskFetchPublishRoutingRuleHandler(RoutingRuleHandler):
},
}

def can_handle(self, rule, ingest_item, routing_scheme):
async def can_handle(self, rule, ingest_item, routing_scheme):
return ingest_item.get(ITEM_TYPE) in (
MEDIA_TYPES + (CONTENT_TYPE.TEXT, CONTENT_TYPE.PREFORMATTED, CONTENT_TYPE.COMPOSITE)
)

def apply_rule(self, rule, ingest_item, routing_scheme):
async def apply_rule(self, rule, ingest_item, routing_scheme):
if rule.get("actions", {}).get("preserve_desk", False) and ingest_item.get("task", {}).get("desk"):
desk = get_resource_service("desks").find_one(req=None, _id=ingest_item["task"]["desk"])
if ingest_item.get("task", {}).get("stage"):
Expand Down
14 changes: 8 additions & 6 deletions superdesk/io/commands/update_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ async def update_provider(provider, rule_set=None, routing_scheme=None, sync=Fal
logger.warning("lock expired while updating provider %s", provider[ID_FIELD])
return
items = generator.send(failed)
failed = ingest_items(items, provider, feeding_service, rule_set, routing_scheme)
failed = await ingest_items(items, provider, feeding_service, rule_set, routing_scheme)
update_last_item_updated(update, items)

if not update.get(LAST_ITEM_ARRIVED) or update[LAST_ITEM_ARRIVED] < datetime.now(tz=pytz.utc):
Expand Down Expand Up @@ -512,7 +512,7 @@ def ingest_cancel(item, feeding_service):
ingest_service.patch(relative["_id"], update)


def ingest_items(items, provider, feeding_service, rule_set=None, routing_scheme=None):
async def ingest_items(items, provider, feeding_service, rule_set=None, routing_scheme=None):
all_items = filter_expired_items(provider, items)
items_dict = {doc[GUID_FIELD]: doc for doc in all_items}
items_in_package = []
Expand All @@ -524,7 +524,7 @@ def ingest_items(items, provider, feeding_service, rule_set=None, routing_scheme
]

for item in [doc for doc in all_items if doc.get(ITEM_TYPE) != CONTENT_TYPE.COMPOSITE]:
ingested, ids = ingest_item(
ingested, ids = await ingest_item(
item,
provider,
feeding_service,
Expand All @@ -550,7 +550,7 @@ def ingest_items(items, provider, feeding_service, rule_set=None, routing_scheme
ref["residRef"] = items_dict.get(ref["residRef"], {}).get(ID_FIELD)
if item[GUID_FIELD] in failed_items:
continue
ingested, ids = ingest_item(item, provider, feeding_service, rule_set, routing_scheme)
ingested, ids = await ingest_item(item, provider, feeding_service, rule_set, routing_scheme)
if ingested:
created_ids = created_ids + ids
else:
Expand All @@ -569,7 +569,7 @@ def ingest_items(items, provider, feeding_service, rule_set=None, routing_scheme
return failed_items


def ingest_item(item, provider, feeding_service, rule_set=None, routing_scheme=None, expiry=None):
async def ingest_item(item, provider, feeding_service, rule_set=None, routing_scheme=None, expiry=None):
items_ids = []
try:
ingest_collection = get_ingest_collection(feeding_service, item)
Expand Down Expand Up @@ -715,7 +715,9 @@ def ingest_item(item, provider, feeding_service, rule_set=None, routing_scheme=N

if routing_scheme and new_version:
routed = ingest_service.find_one(_id=item[ID_FIELD], req=None)
superdesk.get_resource_service("routing_schemes").apply_routing_scheme(routed, provider, routing_scheme)
await superdesk.get_resource_service("routing_schemes").apply_routing_scheme(
routed, provider, routing_scheme
)

except Exception as ex:
logger.exception(ex)
Expand Down
14 changes: 7 additions & 7 deletions superdesk/tests/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ def step_create_new_macro(context, macro_name):
@async_run_until_complete
async def step_impl_fetch_from_provider_ingest(context, provider_name, guid):
async with context.app.test_request_context(context.app.config["URL_PREFIX"]):
fetch_from_provider(context, provider_name, guid)
await fetch_from_provider(context, provider_name, guid)


@when('we fetch from "{provider_name}" ingest "{guid}" (mocking with "{mock_file}")')
Expand All @@ -639,7 +639,7 @@ async def step_impl_fetch_from_provider_ingest_with_mocking(context, provider_na

with responses.RequestsMock() as rsps:
apply_mock_file(rsps, mock_file, fixture_path=get_provider_file_path(provider))
fetch_from_provider(context, provider_name, guid)
await fetch_from_provider(context, provider_name, guid)


@when('we run update_ingest command for "{provider_name}"')
Expand Down Expand Up @@ -746,7 +746,7 @@ async def step_impl_fetch_from_provider_ingest_using_routing(context, provider_n
_id = apply_placeholders(context, context.text)
routing_scheme = get_resource_service("routing_schemes").find_one(_id=_id, req=None)
embed_routing_scheme_rules(routing_scheme)
fetch_from_provider(context, provider_name, guid, routing_scheme)
await fetch_from_provider(context, provider_name, guid, routing_scheme)


@when('we ingest and fetch "{provider_name}" "{guid}" to desk "{desk}" stage "{stage}" using routing_scheme')
Expand All @@ -758,7 +758,7 @@ async def step_impl_fetch_from_provider_ingest_using_routing_with_desk(context,
stage_id = apply_placeholders(context, stage)
routing_scheme = get_resource_service("routing_schemes").find_one(_id=_id, req=None)
embed_routing_scheme_rules(routing_scheme)
fetch_from_provider(context, provider_name, guid, routing_scheme, desk_id, stage_id)
await fetch_from_provider(context, provider_name, guid, routing_scheme, desk_id, stage_id)


@when('we ingest with routing scheme "{provider_name}" "{guid}"')
Expand All @@ -768,7 +768,7 @@ async def step_impl_ingest_with_routing_scheme(context, provider_name, guid):
_id = apply_placeholders(context, context.text)
routing_scheme = get_resource_service("routing_schemes").find_one(_id=_id, req=None)
embed_routing_scheme_rules(routing_scheme)
fetch_from_provider(context, provider_name, guid, routing_scheme)
await fetch_from_provider(context, provider_name, guid, routing_scheme)


def get_provider_file_path(provider, filename=""):
Expand All @@ -778,7 +778,7 @@ def get_provider_file_path(provider, filename=""):
return os.path.join(provider.get("config", {}).get("path", ""), filename)


def fetch_from_provider(context, provider_name, guid, routing_scheme=None, desk_id=None, stage_id=None):
async def fetch_from_provider(context, provider_name, guid, routing_scheme=None, desk_id=None, stage_id=None):
ingest_provider_service = get_resource_service("ingest_providers")
provider = ingest_provider_service.find_one(name=provider_name, req=None)
provider["routing_scheme"] = routing_scheme
Expand Down Expand Up @@ -824,7 +824,7 @@ def fetch_from_provider(context, provider_name, guid, routing_scheme=None, desk_

item["task"] = {"desk": ObjectId(desk_id), "stage": ObjectId(stage_id)}

failed = context.ingest_items(
failed = await context.ingest_items(
items, provider, provider_service, rule_set=rule_set, routing_scheme=provider.get("routing_scheme")
)
assert len(failed) == 0, failed
Expand Down

0 comments on commit 283dc70

Please sign in to comment.