Skip to content

Commit

Permalink
Revert "remove aioredis"
Browse files Browse the repository at this point in the history
This reverts commit 4b37a4e.
  • Loading branch information
gitcarbs committed Sep 17, 2024
1 parent eab8c9b commit a533723
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 67 deletions.
2 changes: 1 addition & 1 deletion contrib-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
redis==4.3.4
aioredis==1.3.1
html2text==2019.8.11
aiosmtplib==1.0.6
pre-commit==1.18.2
Expand Down
4 changes: 2 additions & 2 deletions docs/source/contrib/cache.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ applications:
This option is not recommended as they are not invalidating the memory objects.
Its needed to add `redis` as a dependency on your project
Its needed to add `aioredis` as a dependency on your project

### Configuration

Expand All @@ -37,7 +37,7 @@ cache:

This option is the recommended one for more than one process running guillotina on the same DB.

Its needed to add `redis` as a dependency on your project
Its needed to add `aioredis` as a dependency on your project

### Configuration

Expand Down
121 changes: 58 additions & 63 deletions guillotina/contrib/redis/driver.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
try:
import redis.asyncio as aioredis
import aioredis
import aioredis.errors
except ImportError:
print("If you add guillotina.contrib.redis you need to add redis>4.2.0rc1 on your requirements")
print("If you add guillotina.contrib.redis you need to add aioredis on your requirements")
raise

from guillotina import app_settings
from guillotina import metrics
from guillotina.contrib.redis.exceptions import NoRedisConfigured
from redis.asyncio.client import PubSub
from redis.exceptions import ConnectionError
from typing import Dict
from typing import Any
from typing import List
from typing import Optional

Expand All @@ -35,13 +34,12 @@
class watch(metrics.watch):
def __init__(self, operation: str):
super().__init__(
counter=REDIS_OPS,
histogram=REDIS_OPS_PROCESSING_TIME,
labels={"type": operation},
counter=REDIS_OPS, histogram=REDIS_OPS_PROCESSING_TIME, labels={"type": operation}
)


except ImportError:
watch = metrics.watch # type: ignore
watch = metrics.dummy_watch # type: ignore


logger = logging.getLogger("guillotina.contrib.redis")
Expand All @@ -65,8 +63,6 @@ async def initialize(self, loop):
while True:
try:
await self._connect()
with watch("acquire_conn"):
assert await self._pool.ping() is True
self.initialized = True
break
except Exception: # pragma: no cover
Expand All @@ -75,38 +71,44 @@ async def initialize(self, loop):
@backoff.on_exception(backoff.expo, (OSError,), max_time=30, max_tries=4)
async def _connect(self):
settings = app_settings["redis"]
self._conn_pool = aioredis.ConnectionPool.from_url(f"redis://{settings['host']}:{settings['port']}")
self._pool = aioredis.Redis(connection_pool=self._conn_pool)
self._pubsub_channels: Dict[str, PubSub] = {}
with watch("create_pool"):
self._pool = await aioredis.create_pool(
(settings["host"], settings["port"]), **settings["pool"], loop=self._loop
)
with watch("acquire_conn"):
self._conn = await self._pool.acquire()
self._pubsub_subscriptor = aioredis.Redis(self._conn)

async def finalize(self):
await self._conn_pool.disconnect()
if self._pool is not None:
self._pool.close()
await self._pool.wait_closed()
self.initialized = False

@property
def pool(self):
return self._pool

async def info(self):
return await self._pool.execute_command(b"COMMAND", b"INFO", "get")
return await self._pool.execute(b"COMMAND", b"INFO", "get")

# VALUE API

async def set(self, key: str, data: str, *, expire: Optional[int] = None):
if self._pool is None:
raise NoRedisConfigured()
kwargs = {}
args: List[Any] = []
if expire is not None:
kwargs["ex"] = expire
args[:] = [b"EX", expire]
with watch("set"):
ok = await self._pool.set(key, data, **kwargs)
assert ok is True, ok
ok = await self._pool.execute(b"SET", key, data, *args)
assert ok == b"OK", ok

async def get(self, key: str) -> str:
if self._pool is None:
raise NoRedisConfigured()
with watch("get") as w:
val = await self._pool.get(key)
val = await self._pool.execute(b"GET", key)
if not val:
w.labels["type"] = "get_miss"
return val
Expand All @@ -115,68 +117,61 @@ async def delete(self, key: str):
if self._pool is None:
raise NoRedisConfigured()
with watch("delete"):
await self._pool.delete(key)

async def expire(self, key: str, expire: int):
if self._pool is None:
raise NoRedisConfigured()
await self._pool.expire(key, expire)

async def keys_startswith(self, key: str):
if self._pool is None:
raise NoRedisConfigured()
return await self._pool.keys(f"{key}*")
await self._pool.execute(b"DEL", key)

async def delete_all(self, keys: List[str]):
if self._pool is None:
raise NoRedisConfigured()
for key in keys:
try:
with watch("delete_many"):
await self._pool.delete(key)
logger.debug("Deleted cache keys {}".format(keys))
except Exception:
logger.warning("Error deleting cache keys {}".format(keys), exc_info=True)

async def flushall(self, *, async_op: bool = False):
with watch("delete_many"):
for key in keys:
try:
with watch("delete"):
await self._pool.execute(b"DEL", key)
logger.debug("Deleted cache keys {}".format(keys))
except Exception:
logger.warning("Error deleting cache keys {}".format(keys), exc_info=True)

async def flushall(self, *, async_op: Optional[bool] = False):
if self._pool is None:
raise NoRedisConfigured()
ops = [b"FLUSHDB"]
if async_op:
ops.append(b"ASYNC")
with watch("flush"):
await self._pool.flushdb(asynchronous=async_op)
await self._pool.execute(*ops)

# PUBSUB API

async def publish(self, channel_name: str, data: str):
if self._pool is None:
raise NoRedisConfigured()

with watch("publish"):
await self._pool.publish(channel_name, data)
await self._pool.execute(b"publish", channel_name, data)

async def unsubscribe(self, channel_name: str):
if self._pool is None:
if self._pubsub_subscriptor is None:
raise NoRedisConfigured()

p = self._pubsub_channels.pop(channel_name, None)
if p:
try:
await p.unsubscribe(channel_name)
except ConnectionError:
logger.error(f"Error unsubscribing channel {channel_name}", exc_info=True)
finally:
await p.__aexit__(None, None, None)
await self._pubsub_subscriptor.unsubscribe(channel_name)

async def subscribe(self, channel_name: str):
if self._pool is None:
if self._pubsub_subscriptor is None:
raise NoRedisConfigured()
try:
(channel,) = await self._pubsub_subscriptor.subscribe(channel_name)
except aioredis.errors.ConnectionClosedError: # pragma: no cover
# closed in middle
try:
self._pool.close(self._conn)
except Exception:
pass
self._conn = await self._pool.acquire()
self._pubsub_subscriptor = aioredis.Redis(self._conn)
(channel,) = await self._pubsub_subscriptor.subscribe(channel_name)

p: PubSub = await self._pool.pubsub().__aenter__()
self._pubsub_channels[channel_name] = p
await p.subscribe(channel_name)
return self._listener(p)
return self._listener(channel)

async def _listener(self, p: PubSub):
while True:
message = await p.get_message(ignore_subscribe_messages=True, timeout=1)
if message is not None:
yield message["data"]
async def _listener(self, channel: aioredis.Channel):
while await channel.wait_message():
msg = await channel.get()
yield msg
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@
"sphinx-guillotina-theme",
"sphinx-autodoc-typehints",
],
"redis": ["redis==4.3.0"],
"redis": ["aioredis==1.3.1"],
"memcached": ["emcache"],
"mailer": ["html2text>=2018.1.9", "aiosmtplib>=1.0.6"],
},
Expand Down

0 comments on commit a533723

Please sign in to comment.