Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PUBLISH]Cache Formatter.format result when suitable. #2757

Merged
merged 3 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 56 additions & 24 deletions apps/publish/enqueue/enqueue_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from eve.utils import ParsedRequest
from quart_babel import gettext as _

import superdesk
from superdesk.core import get_current_app, get_app_config
from superdesk.resource_fields import ID_FIELD, VERSION
from superdesk.flask import g
Expand All @@ -29,7 +30,7 @@
from superdesk.notification import push_notification
from superdesk.publish import SUBSCRIBER_TYPES
from superdesk.publish.publish_queue import PUBLISHED_IN_PACKAGE
from superdesk.publish.formatters import get_formatter
from superdesk.publish.formatters import Formatter, get_formatter
from apps.publish.content.utils import filter_digital, filter_non_digital
from apps.publish.content.common import BasePublishService
from apps.archive.common import get_user, get_utc_schedule
Expand Down Expand Up @@ -435,16 +436,23 @@ def get_destinations(self, subscriber):
return destinations

@elasticapm.capture_span()
def queue_transmission(self, doc, subscribers, subscriber_codes=None, associations=None, sent=False):
def queue_transmission(
self,
doc: dict,
subscribers: list,
subscriber_codes: dict | None = None,
associations: dict | None = None,
sent: bool = False,
) -> tuple[list, bool]:
"""Method formats and then queues the article for transmission to the passed subscribers.

::Important Note:: Format Type across Subscribers can repeat. But we can't have formatted item generated once
based on the format_types configured across for all the subscribers as the formatted item must have a published
sequence number generated by Subscriber.

:param dict doc: document to queue for transmission
:param list subscribers: List of subscriber dict.
:return : (list, bool) tuple of list of missing formatters and boolean flag. True if queued else False
:param doc: document to queue for transmission
:param subscribers: List of subscriber dict.
:return : tuple of list of missing formatters and boolean flag. True if queued else False
"""
if associations is None:
associations = {}
Expand All @@ -469,6 +477,8 @@ def queue_transmission(self, doc, subscribers, subscriber_codes=None, associatio
no_formatters = []
filtered_document = self.filter_document(doc)
app = get_current_app()
cache: dict[tuple[Formatter, str], list[dict]] = {}

for subscriber in subscribers:
try:
if (
Expand Down Expand Up @@ -496,23 +506,45 @@ def queue_transmission(self, doc, subscribers, subscriber_codes=None, associatio
continue

formatter.set_destination(destination, subscriber)
formatted_docs = formatter.format(
self.filter_document(doc) if embed_package_items else filtered_document.copy(),
subscriber,
subscriber_codes.get(subscriber[ID_FIELD]),
)
if formatter.use_cache:
formatted_docs = cache.get((formatter, doc[ID_FIELD]))
else:
formatted_docs = None

if formatted_docs is None:
# Either caching is not available for this formatter, or it's the first time that we format
# this document.
formatted_docs = []
format_ret = formatter.format(
self.filter_document(doc) if embed_package_items else filtered_document.copy(),
subscriber,
subscriber_codes.get(subscriber[ID_FIELD]),
)

for idx, publish_data in enumerate(formatted_docs):
if not isinstance(publish_data, dict):
pub_seq_num, formatted_doc = publish_data
formatted_docs[idx] = {
"published_seq_num": pub_seq_num,
"formatted_item": formatted_doc,
}
else:
assert (
"published_seq_num" in publish_data and "formatted_item" in publish_data
), "missing keys in publish_data"
for publish_data in format_ret:
if not isinstance(publish_data, dict):
pub_seq_num, formatted_doc = publish_data
formatted_docs.append(
{
"published_seq_num": pub_seq_num,
"formatted_item": formatted_doc,
}
)
else:
assert (
"published_seq_num" in publish_data and "formatted_item" in publish_data
), "missing keys in publish_data"
formatted_docs.append(publish_data)

if formatter.use_cache:
cache[(formatter, doc[ID_FIELD])] = formatted_docs
else:
# We have cached documents, we still need to update ``published_seq_num``.
resource_service = superdesk.get_resource_service("subscribers")
published_seq_num = resource_service.generate_sequence_number(subscriber) # type: ignore

for doc in formatted_docs:
doc["publish_seq_num"] = published_seq_num

for publish_queue_item in formatted_docs:
publish_queue_item["item_id"] = doc["item_id"]
Expand Down Expand Up @@ -968,14 +1000,14 @@ def _get_codes(self, item):
return []

@staticmethod
def filter_document(doc):
def filter_document(doc: dict) -> dict:
"""
Filter document:
1. Remove fields that should not be there given it's profile.
2. Remove `None` valued renditions.

:param dict doc: document to filter
:return: dict filtered document
:param doc: document to filter
:return: filtered document
"""

# remove fields that should not be there given it's profile.
Expand Down
21 changes: 16 additions & 5 deletions superdesk/publish/formatters/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import logging

from lxml import etree
from typing import List, Optional, Type
from typing import List, Type
from superdesk.metadata.item import ITEM_TYPE, CONTENT_TYPE, FORMATS, FORMAT
from superdesk.etree import parse_html
from superdesk.text_utils import get_text
Expand All @@ -29,7 +29,9 @@ class Formatter:
# If name is set it will be visible in UI.
# Set to `None` for base classes which are
# extended later and shouldn't be used on its own.
name: Optional[str]
name: str | None
#: If set, formatted article will be re-used between destinations and subscribers.
use_cache: bool = True

def __init__(self) -> None:
self.can_preview = False
Expand All @@ -41,8 +43,16 @@ def __init_subclass__(cls, **kwargs) -> None:
super().__init_subclass__(**kwargs)
formatters.append(cls)

def format(self, article, subscriber, codes=None):
"""Formats the article and returns the transformed string"""
def format(self, article: dict, subscriber: dict, codes: list | None = None) -> list[tuple[int, str] | dict]:
"""Formats the article.

:param article: Article to format.
:param subscriber: Subscriber to the article.
:param codes: Selector codes.
:return: list of formatted article, either as a tuple of publish sequence number
and formatted article, or as a dict
:raises FormatterError: if the formatter fails to format an article
"""
raise NotImplementedError()

def export(self, article, subscriber, codes=None):
Expand Down Expand Up @@ -136,10 +146,11 @@ def set_destination(self, destination=None, subscriber=None):
self.subscriber = subscriber


def get_formatter(format_type: str, article):
def get_formatter(format_type: str, article: dict) -> Formatter | None:
for formatter_instance in get_all_formatters():
if formatter_instance.can_format(format_type, article):
return formatter_instance
return None


def get_all_formatters() -> List[Formatter]:
Expand Down
5 changes: 3 additions & 2 deletions superdesk/publish/formatters/email_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
# at https://www.sourcefabric.org/superdesk/license

import json
from typing import Any
import superdesk

from superdesk.publish.formatters import Formatter
Expand Down Expand Up @@ -55,11 +56,11 @@ def _inject_dateline(self, formatted_article):
formatted_article["body_html"] = sd_etree.to_string(body_html_elem)

# TODO-ASYNC: Support async formatters in publishing code
async def format(self, article, subscriber, codes=None):
async def format(self, article: dict, subscriber: dict, codes: list | None = None) -> list[tuple[int, str] | dict]: # type: ignore
formatted_article = deepcopy(article)
remove_all_embeds(formatted_article)
pub_seq_num = superdesk.get_resource_service("subscribers").generate_sequence_number(subscriber)
doc = {}
doc: dict[str, Any] = {}
try:
if formatted_article.get(FORMAT) == FORMATS.HTML:
if formatted_article.get("dateline", {}).get("text"):
Expand Down
2 changes: 1 addition & 1 deletion superdesk/publish/formatters/idml_formatter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def __init__(self):
super(self.__class__, self).__init__()
self.format_type = "idml"

def format(self, article, subscriber, codes=None):
def format(self, article: dict, subscriber: dict, codes: list | None = None) -> list[tuple[int, str] | dict]:
try:
publish_seq_num = superdesk.get_resource_service("subscribers").generate_sequence_number(subscriber)
idml_bytes = Converter().create_idml(article)
Expand Down
6 changes: 3 additions & 3 deletions superdesk/publish/formatters/imatrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ def _transform_to_ninjs(self, article, subscriber, recursive=True):
"concepts": self._format_concepts(article),
"headline": get_text(article["headline"]),
"preamble": get_text(article["abstract"], lf_on_block=True).strip() if article.get("abstract") else "",
"dateline": article["dateline"]["text"]
if article.get("dateline") and article["dateline"].get("text")
else "",
"dateline": (
article["dateline"]["text"] if article.get("dateline") and article["dateline"].get("text") else ""
),
"body": [line.strip() for line in get_text(article["body_html"], lf_on_block=True).split("\n") if line],
}

Expand Down
5 changes: 4 additions & 1 deletion superdesk/publish/formatters/newsml_1_2_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ class NewsML12Formatter(Formatter):
NewsML 1.2 Formatter
"""

# We can't cache due to the use of publish_sequence_no in formatted output.
use_cache = False

XML_ROOT = '<?xml version="1.0"?><!DOCTYPE NewsML SYSTEM "http://www.provider.com/dtd/NewsML_1.2.dtd">'
newml_content_type = {
CONTENT_TYPE.PICTURE: "Photo",
Expand All @@ -48,7 +51,7 @@ class NewsML12Formatter(Formatter):
name = "NewsML 1.2"
type = "newsml12"

def format(self, article, subscriber, codes=None):
def format(self, article: dict, subscriber: dict, codes: list | None = None) -> list[tuple[int, str] | dict]:
"""
Create article in NewsML1.2 format

Expand Down
4 changes: 3 additions & 1 deletion superdesk/publish/formatters/newsml_g2_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class NewsMLG2Formatter(Formatter):

name = "NewsML G2"
type = "newsmlg2"
# Published_seq_num is used if format, so we can't use cache.
use_cache = False

_message_nsmap = {
None: "http://iptc.org/std/nar/2006-10-01/",
Expand All @@ -69,7 +71,7 @@ class NewsMLG2Formatter(Formatter):
def _format_date(self, date):
return date.strftime("%Y-%m-%dT%H:%M:%S+00:00")

def format(self, article, subscriber, codes=None):
def format(self, article: dict, subscriber: dict, codes: list | None = None) -> list[tuple[int, str] | dict]:
"""Create article in NewsML G2 format

:param dict article:
Expand Down
2 changes: 1 addition & 1 deletion superdesk/publish/formatters/ninjs_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def __init__(self):
self.can_export = True
self.internal_renditions = get_app_config("NINJS_COMMON_RENDITIONS", []) + ["original"]

def format(self, article, subscriber, codes=None):
def format(self, article: dict, subscriber: dict, codes: list | None = None) -> list[tuple[int, str] | dict]:
try:
pub_seq_num = superdesk.get_resource_service("subscribers").generate_sequence_number(subscriber)

Expand Down
5 changes: 4 additions & 1 deletion superdesk/publish/formatters/nitf_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ class NITFFormatter(Formatter):
Format items to `NITF <https://iptc.org/standards/nitf/>`_ version *3.6*.
"""

# We can't use cache due to use of subscriber for formatting.
use_cache = False

XML_ROOT = '<?xml version="1.0"?>'
ENCODING = "UTF-8"

Expand Down Expand Up @@ -135,7 +138,7 @@ def __init__(self):
"style": {"nitf": EraseElement}, # <style> may be there in case of bad paste
}

def format(self, article, subscriber, codes=None):
def format(self, article: dict, subscriber: dict, codes: list | None = None) -> list[tuple[int, str] | dict]:
try:
pub_seq_num = superdesk.get_resource_service("subscribers").generate_sequence_number(subscriber)

Expand Down
67 changes: 67 additions & 0 deletions tests/publish/enqueue_service_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@

from unittest import mock

from superdesk.resource_fields import ID_FIELD, VERSION
from superdesk.tests import TestCase
from content_api.publish.service import PublishService
from apps.publish.enqueue import enqueue_service
from apps.publish.enqueue.enqueue_service import EnqueueService
from apps.packages.package_service import PackageService
from apps.archive.archive import ArchiveService
Expand Down Expand Up @@ -234,3 +236,68 @@ async def test_content_api_package_publishing(self, content_api_publish):
# Mock.assert_called_once is only available in Python 3.6
# so we emulate it by counting the number of calls
assert content_api_publish.call_count == 1

async def test_queue_transmission_with_cache(self):
service = EnqueueService()
doc = {
ID_FIELD: "test_id",
"type": "text",
VERSION: 1,
"item_id": "test_id",
"unique_name": "test_unique_name",
"headline": "test_headline",
"priority": 1,
}
subscribers = [
{"_id": "sub1", "name": "Subscriber 1", "priority": 1},
{"_id": "sub2", "name": "Subscriber 2", "priority": 2},
]

formatter = mock.Mock()
formatter.use_cache = True
formatter.format.return_value = [
{
"published_seq_num": 1,
"formatted_item": "formatted_content",
}
]

with mock.patch.object(enqueue_service, "get_formatter", return_value=formatter):
with mock.patch.object(
EnqueueService, "get_destinations", return_value=[{"format": "ninjs", "delivery_type": "ftp"}]
):
# There should be only the first call, to format the document
service.queue_transmission(doc, subscribers)
formatter.format.assert_called_once()

async def test_queue_transmission_without_cache(self):
service = EnqueueService()
doc = {
ID_FIELD: "test_id",
"type": "text",
VERSION: 1,
"item_id": "test_id",
"unique_name": "test_unique_name",
"headline": "test_headline",
"priority": 1,
}
subscribers = [
{"_id": "sub1", "name": "Subscriber 1", "priority": 1},
{"_id": "sub2", "name": "Subscriber 2", "priority": 2},
]

formatter = mock.Mock()
formatter.use_cache = False
formatter.format.return_value = [
{
"published_seq_num": 1,
"formatted_item": "formatted_content",
}
]

with mock.patch.object(enqueue_service, "get_formatter", return_value=formatter):
with mock.patch.object(
EnqueueService, "get_destinations", return_value=[{"format": "ninjs", "delivery_type": "ftp"}]
):
service.queue_transmission(doc, subscribers)
self.assertEqual(formatter.format.call_count, 2)
Loading