Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase timeout and disable Postgres notify on send #27

Merged
merged 5 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions src/smpp_gateway/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def __init__(
submit_sm_params: dict,
set_priority_flag: bool,
mt_messages_per_second: int,
event_loop_timeout: int,
*args,
**kwargs,
):
Expand All @@ -81,6 +82,7 @@ def __init__(
self.submit_sm_params = submit_sm_params
self.set_priority_flag = set_priority_flag
self.mt_messages_per_second = mt_messages_per_second
self.event_loop_timeout = event_loop_timeout
super().__init__(*args, **kwargs)
self._pg_conn = pg_listen(self.backend.name)

Expand Down Expand Up @@ -176,11 +178,13 @@ def receive_pg_notify(self):
self.send_mt_messages()

def send_mt_messages(self):
limit = self.mt_messages_per_second * self.timeout
limit = self.mt_messages_per_second * self.event_loop_timeout
smses = get_mt_messages_to_send(limit=limit, backend=self.backend)
if len(smses) == 0:
return
logger.info(f"Found {len(smses)} messages to send in {self.timeout} seconds")
logger.info(
f"Found {len(smses)} messages to send in {self.event_loop_timeout} seconds"
)
submit_sm_resps = []
for sms in smses:
params = {**self.submit_sm_params, **sms["params"]}
Expand Down Expand Up @@ -237,7 +241,7 @@ def listen(self, ignore_error_codes=None, auto_send_enquire_link=True):
while True:
# When either main socket has data or _pg_conn has data, select.select will return
rlist, _, _ = select.select(
[self._socket, self._pg_conn], [], [], self.timeout
[self._socket, self._pg_conn], [], [], self.event_loop_timeout
)
if not rlist and auto_send_enquire_link:
self.logger.debug("Socket timeout, listening again")
Expand Down
13 changes: 13 additions & 0 deletions src/smpp_gateway/management/commands/smpp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,19 @@ def add_arguments(self, parser):
type=int,
default=os.environ.get("SMPPLIB_MT_MESSAGES_PER_SECOND", 20),
)
parser.add_argument(
"--socket-timeout",
type=int,
default=os.environ.get("SMPPLIB_SOCKET_TIMEOUT", 30),
)
parser.add_argument(
"--event-loop-timeout",
type=int,
default=os.environ.get("SMPPLIB_EVENT_LOOP_TIMEOUT", 5),
help="Timeout for listening for Postgres notifications and new "
"incoming messages. This is also the time between enquire_link "
"PDUs sent to the SMPP server when there is no other traffic.",
)
parser.add_argument(
"--database-url",
default=os.environ.get("DATABASE_URL"),
Expand Down
8 changes: 5 additions & 3 deletions src/smpp_gateway/outgoing.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import logging

from django.db import connection
from django.utils import timezone
from rapidsms.backends.base import BackendBase

from smpp_gateway.models import MTMessage
from smpp_gateway.queries import pg_notify
from smpp_gateway.utils import grouper

logger = logging.getLogger(__name__)
Expand All @@ -16,6 +16,8 @@ class SMPPGatewayBackend(BackendBase):
# Optional additional params from:
# https://github.com/python-smpplib/python-smpplib/blob/d9d91beb2d7f37915b13a064bb93f907379342ec/smpplib/command.py#L652-L700
OPTIONAL_PARAMS = ("source_addr",)
# The minimum priority_flag value for which to send a Postgres notification
minimum_notify_priority_flag = MTMessage.PriorityFlag.LEVEL_2.value

def configure(self, **kwargs):
self.send_group_size = kwargs.get("send_group_size", 100)
Expand Down Expand Up @@ -48,5 +50,5 @@ def send(self, id_, text, identities, context=None):
MTMessage.objects.bulk_create(
[MTMessage(**kwargs) for kwargs in kwargs_group]
)
with connection.cursor() as curs:
curs.execute(f"NOTIFY {self.model.name}")
if context.get("priority_flag", 0) >= self.minimum_notify_priority_flag:
pg_notify(self.model.name)
7 changes: 7 additions & 0 deletions src/smpp_gateway/smpp.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ def get_smpplib_client(
submit_sm_params: dict,
set_priority_flag: bool,
mt_messages_per_second: int,
socket_timeout: int,
event_loop_timeout: int,
hc_check_uuid: str,
hc_ping_key: str,
hc_check_slug: str,
Expand All @@ -38,10 +40,13 @@ def get_smpplib_client(
submit_sm_params,
set_priority_flag,
mt_messages_per_second,
event_loop_timeout,
# Pass-through arguments to smpplib.client.Client:
host,
port,
allow_unknown_opt_params=True,
sequence_generator=sequence_generator,
timeout=socket_timeout,
)
return client

Expand Down Expand Up @@ -73,6 +78,8 @@ def start_smpp_client(options):
json.loads(options["submit_sm_params"]),
options["set_priority_flag"],
options["mt_messages_per_second"],
options["socket_timeout"],
options["event_loop_timeout"],
options["hc_check_uuid"],
options["hc_ping_key"],
options["hc_check_slug"],
Expand Down
10 changes: 10 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ def test_received_mo_message(self):
{}, # submit_sm_params
False, # set_priority_flag
20, # mt_messages_per_second
30, # socket_timeout
5, # event_loop_timeout
"", # hc_check_uuid
"", # hc_ping_key
"", # hc_check_slug
Expand Down Expand Up @@ -63,6 +65,8 @@ def test_received_message_receipt(self):
{}, # submit_sm_params
False, # set_priority_flag
20, # mt_messages_per_second
30, # socket_timeout
5, # event_loop_timeout
"", # hc_check_uuid
"", # hc_ping_key
"", # hc_check_slug
Expand Down Expand Up @@ -106,6 +110,8 @@ def test_received_null_short_message(self):
{}, # submit_sm_params
False, # set_priority_flag
20, # mt_messages_per_second
30, # socket_timeout
5, # event_loop_timeout
"", # hc_check_uuid
"", # hc_ping_key
"", # hc_check_slug
Expand Down Expand Up @@ -145,6 +151,8 @@ def test_message_sent_handler():
{}, # submit_sm_params
False, # set_priority_flag
20, # mt_messages_per_second
30, # socket_timeout
5, # event_loop_timeout
"", # hc_check_uuid
"", # hc_ping_key
"", # hc_check_slug
Expand Down Expand Up @@ -183,6 +191,8 @@ def get_client_and_message(
submit_sm_params or {},
set_priority_flag,
20, # mt_messages_per_second
30, # socket_timeout
5, # event_loop_timeout
"", # hc_check_uuid
"", # hc_ping_key
"", # hc_check_slug
Expand Down
48 changes: 47 additions & 1 deletion tests/test_router.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from unittest.mock import patch

from django.conf import settings
from django.test import TestCase
from django.test.utils import override_settings

from smpp_gateway.models import MTMessage
from smpp_gateway.router import PriorityBlockingRouter

from .factories import ConnectionFactory
Expand All @@ -15,7 +19,9 @@
)
class PriorityBlockingRouterTest(TestCase):
def setUp(self):
self.router = PriorityBlockingRouter(apps=[], backends={})
self.router = PriorityBlockingRouter(
apps=[], backends=settings.INSTALLED_BACKENDS
)
self.connection = ConnectionFactory()

def test_new_incoming_message(self):
Expand Down Expand Up @@ -70,3 +76,43 @@ def test_outgoing_message_extra_backend_context_has_priority_flag(self):
)
context = msg.extra_backend_context()
self.assertEqual(context["priority_flag"], msg.default_priority_flag.value)

def test_no_postgres_notification_for_low_priority_messages(self):
"""Tests that a Postgres NOTIFY is not done for messages where the
priority_flag is less than 2.
"""
for priority in MTMessage.PriorityFlag.values[:2]:
msg = self.router.new_outgoing_message(
text="foo",
connections=[self.connection],
fields={"priority_flag": priority},
)
with patch("smpp_gateway.outgoing.pg_notify") as mock_pg_notify:
self.router.send_to_backend(
backend_name="smppsim",
id_=msg.id,
text=msg.text,
identities=[self.connection.identity],
context=msg.fields,
)
mock_pg_notify.assert_not_called()

def test_postgres_notification_for_high_priority_messages(self):
"""Tests that a Postgres NOTIFY is done for messages where the
priority_flag is at least 2.
"""
for priority in MTMessage.PriorityFlag.values[2:]:
msg = self.router.new_outgoing_message(
text="foo",
connections=[self.connection],
fields={"priority_flag": priority},
)
with patch("smpp_gateway.outgoing.pg_notify") as mock_pg_notify:
self.router.send_to_backend(
backend_name="smppsim",
id_=msg.id,
text=msg.text,
identities=[self.connection.identity],
context=msg.fields,
)
mock_pg_notify.assert_called_with("smppsim")
Loading