Skip to content

Commit

Permalink
[PUBLISH] Push items directly to publish queue when possible.
Browse files Browse the repository at this point in the history
This patch add a `publish:push` command which directly put items to
publish services queue without passing by the `published` collection.

SDESK-7448
  • Loading branch information
jerome-poisson committed Jan 1, 2025
1 parent 36445b8 commit bd2ce14
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 7 deletions.
4 changes: 3 additions & 1 deletion apps/publish/content/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,9 @@ def update(self, id, updates, original):
self._update_archive(original, updates, should_insert_into_versions=auto_publish)
self.update_published_collection(published_item_id=original[ID_FIELD], updated=updated)

from apps.publish.enqueue import enqueue_published
from apps.publish.enqueue import enqueue_published, push_publish

push_publish.apply_async(str(id))

enqueue_published.apply_async()

Expand Down
58 changes: 57 additions & 1 deletion apps/publish/enqueue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@
# AUTHORS and LICENSE files distributed with this source code, or
# at https://www.sourcefabric.org/superdesk/license

from datetime import datetime
import logging
from typing import cast

from apps.archive.archive import ArchiveService
from superdesk.core import get_current_app
from superdesk.resource_fields import ID_FIELD, VERSION
import superdesk
Expand All @@ -31,7 +34,7 @@
from superdesk.utc import utcnow
from superdesk.profiling import ProfileManager
from apps.content import push_content_notification
from superdesk.errors import ConnectionTimeout
from superdesk.errors import ConnectionTimeout, PublishQueueError
from celery.exceptions import SoftTimeLimitExceeded
from superdesk.publish.publish_content import publish

Expand Down Expand Up @@ -232,11 +235,64 @@ def enqueue_items(self, published_items):
logger.error("Failed to publish the following items: {}".format(failed_items.keys()))


class PushContent(superdesk.Command):
"""Publish items directly to relevant services.
Example:
::
$ python manage.py publish:push item_id [content_type]
"""

def run(self, published_item_id_s: str, content_type: str | None = None) -> None:
"""Publish item directly in relevant service queue.
:param published_item_id_s: ID of the item to publish.
:param content_type: Type of the content.
:raises PublishQueueError.article_not_found_error: Could not find item with given ID.
"""
published_item_id = ObjectId(published_item_id_s)
archive_service = cast(ArchiveService, get_resource_service(ARCHIVE))
published_item = archive_service.find_one(req=None, _id=published_item_id)
if published_item is None:
logger.error("Can't find item with id {published_item_id_s!r}.")
raise PublishQueueError.article_not_found_error()
logger.info(
"Push publishing item with id: {} and item_id: {}".format(published_item_id, published_item.get("item_id"))
)

if published_item.get(ITEM_STATE) == CONTENT_STATE.SCHEDULED:
publish_schedule = published_item.get(PUBLISH_SCHEDULE)
if published_item is None:
logger.warning(f"Publish schedule is missing: {published_item=}")
return
if publish_schedule > utcnow():
logger.debug(f"Item is scheduled for a later date, we don't push it: {published_item}")
return

try:
get_enqueue_service(published_item[ITEM_OPERATION]).enqueue_item(published_item, content_type)
except Exception:
logger.exception(f"Can't enqueue item: {published_item}")
raise


superdesk.command("publish:enqueue", EnqueueContent())
superdesk.command("publish:push", PushContent())


@celery.task(soft_time_limit=600)
def enqueue_published():
"""Pick new items from ``published`` collection and enqueue it."""
with ProfileManager("publish:enqueue"):
EnqueueContent().run()


@celery.task(soft_time_limit=600)
def push_publish(published_item_id_s: str, content_type: str | None = None):
"""Push item directly in destination service queue."""
with ProfileManager("publish:push"):
PushContent().run(published_item_id_s, content_type)
112 changes: 107 additions & 5 deletions tests/publish/enqueue_service_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,27 @@
# AUTHORS and LICENSE files distributed with this source code, or
# at https://www.sourcefabric.org/superdesk/license

from datetime import datetime, timedelta
from unittest import mock

from bson import ObjectId

from apps.archive.archive import ArchiveService
from apps.archive.common import ITEM_OPERATION
from apps.packages.package_service import PackageService
from apps.publish.enqueue import enqueue_service, PushContent
from apps.publish.enqueue import enqueue_published
from apps.publish.enqueue.enqueue_service import EnqueueService
from content_api.publish.service import PublishService
from superdesk.errors import PublishQueueError
from superdesk.metadata.item import (
CONTENT_STATE,
ITEM_STATE,
PUBLISH_SCHEDULE,
)
from superdesk.resource_fields import ID_FIELD, VERSION
from superdesk.tests import TestCase
from content_api.publish.service import PublishService
from apps.publish.enqueue import enqueue_service
from apps.publish.enqueue.enqueue_service import EnqueueService
from apps.packages.package_service import PackageService
from apps.archive.archive import ArchiveService
from superdesk.utc import utcnow


def _fake_extend_subscriber_items(self, subscriber_items, subscribers, package_item, package_item_id, subscriber_codes):
Expand Down Expand Up @@ -301,3 +313,93 @@ async def test_queue_transmission_without_cache(self):
):
service.queue_transmission(doc, subscribers)
self.assertEqual(formatter.format.call_count, 2)


class PushContentTest(TestCase):
async def asyncSetUp(self):
await super().asyncSetUp()
self.test_id_1 = ObjectId()
self.test_id_2 = ObjectId()
self.test_id_3 = ObjectId()

self.app.data.insert(
"archive",
[
{
"_id": self.test_id_1,
"item_id": self.test_id_1,
"type": "text",
"headline": "test headline toto",
"version": 1,
"task": {},
ITEM_STATE: CONTENT_STATE.SCHEDULED,
ITEM_OPERATION: "publish",
PUBLISH_SCHEDULE: utcnow() + timedelta(hours=1),
},
{
"_id": self.test_id_2,
"item_id": "2",
"type": "text",
"headline": "test headline 2",
"version": 1,
"task": {},
ITEM_STATE: CONTENT_STATE.SCHEDULED,
ITEM_OPERATION: "publish",
PUBLISH_SCHEDULE: utcnow() - timedelta(hours=1),
},
{
"_id": self.test_id_3,
"item_id": "3",
"type": "text",
"headline": "test headline 3",
VERSION: 1,
"task": {},
ITEM_STATE: CONTENT_STATE.PUBLISHED,
ITEM_OPERATION: "publish",
},
],
)

@mock.patch.object(EnqueueService, "enqueue_item")
async def test_push_scheduled_item_in_future(self, mock_enqueue):
"""``enqueue_item`` is not called if the item is scheduled in the future."""
cmd = PushContent()
cmd.run(str(self.test_id_1))
mock_enqueue.assert_not_called()

@mock.patch.object(EnqueueService, "enqueue_item")
async def test_push_scheduled_item_in_past(self, mock_enqueue):
"""``enqueue_item`` is called if the item is scheduled but the publish schedule is passed."""
cmd = PushContent()
cmd.run(str(self.test_id_2))
mock_enqueue.assert_called_once()

@mock.patch("apps.publish.enqueue.enqueue_service.get_resource_service", return_value=None)
async def test_push_non_existent_item(self, mock_get_resource_service):
"""Exception is raise if item is not found."""
cmd = PushContent()
non_existent_id = ObjectId()
with self.assertRaises(PublishQueueError):
cmd.run(str(non_existent_id))

@mock.patch.object(EnqueueService, "enqueue_item")
async def test_push_with_content_type(self, mock_enqueue):
"""Content type is used when present."""
cmd = PushContent()
cmd.run(str(self.test_id_2), "test_content_type")
mock_enqueue.assert_called_once_with(mock.ANY, "test_content_type")

@mock.patch.object(EnqueueService, "enqueue_item")
async def test_push_publish(self, mock_enqueue):
"""Push publish calls ``enqueue_item`` when it's not scheduled."""
cmd = PushContent()
cmd.run(str(self.test_id_3))
mock_enqueue.assert_called_once()

@mock.patch.object(EnqueueService, "enqueue_item", side_effect=Exception("Test error"))
async def test_push_with_enqueue_error(self, mock_enqueue):
"""Exception is propagated when ``enqueue_item`` raises one."""
cmd = PushContent()
with self.assertRaises(Exception) as context:
cmd.run(str(self.test_id_2))
assert str(context.exception) == "Test error"

0 comments on commit bd2ce14

Please sign in to comment.