Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python Redis Cluster Updates (Enterprise) #1523

Merged
merged 2 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion openc3-redis/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ RUN if [[ $OPENC3_DEPENDENCY_REGISTRY == 'docker.io' ]]; then \
# Modify the redis user and group to be 1001
# The default alpine redis container uses 999
# See https://github.com/docker-library/redis/blob/master/7.2/alpine/Dockerfile
apk add shadow; \
apk add shadow bash; \
usermod -u 1001 redis; \
groupmod -g 1001 redis; \
# Remove gosu to eliminate a ton of CVEs
Expand Down
2 changes: 1 addition & 1 deletion openc3/python/openc3/utilities/bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def getClient(cls):
my_module = importlib.import_module("." + OPENC3_CLOUD.lower() + "_bucket", "openc3.utilities")
# If the file doesn't exist try the Enterprise module
except ModuleNotFoundError:
my_module = importlib.import_module("." + OPENC3_CLOUD.lower() + "_bucket", "openc3-enterprise.utilities")
my_module = importlib.import_module("." + OPENC3_CLOUD.lower() + "_bucket", "openc3enterprise.utilities")
return getattr(my_module, bucket_class)()

def create(self, bucket):
Expand Down
241 changes: 3 additions & 238 deletions openc3/python/openc3/utilities/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,249 +14,14 @@
# This file may also be used under the terms of a commercial license
# if purchased from OpenC3, Inc.

import redis
from redis.exceptions import TimeoutError
from openc3.utilities.connection_pool import ConnectionPool
from contextlib import contextmanager
import threading
from openc3.environment import *

if OPENC3_REDIS_CLUSTER:
openc3_redis_cluster = True
else:
openc3_redis_cluster = False

from openc3.utilities.store_implementation import Store, StoreConnectionPool, StoreMeta, EphemeralStore # noqa: F401

class StoreConnectionPool(ConnectionPool):
@contextmanager
def pipelined(self):
if openc3_redis_cluster:
yield # TODO: Update keys to support pipelining in cluster
else:
with self.get() as redis:
pipeline = redis.pipeline(transaction=False)
thread_id = threading.get_native_id()
self.pipelines[thread_id] = pipeline
try:
yield
finally:
pipeline.execute()
self.pipelines[thread_id] = None

@contextmanager
def get(self):
thread_id = threading.get_native_id()
if thread_id not in self.pipelines:
self.pipelines[thread_id] = None
pipeline = self.pipelines[thread_id]
if pipeline:
yield pipeline
else:
item = None
with self.lock:
if not self.pool.empty():
item = self.pool.get(False)
elif self.count < self.pool_size:
item = self.ctor()
self.count += 1
else:
item = self.pool.get()
try:
yield item
finally:
self.pool.put(item)


class StoreMeta(type):
def __getattribute__(cls, func):
if func == "instance" or func == "instance_mutex" or func == "my_instance":
return super().__getattribute__(func)

def method(*args, **kw_args):
return getattr(cls.instance(), func)(*args, **kw_args)

return method


class Store(metaclass=StoreMeta):
# Variable that holds the singleton instance
my_instance = None

# Mutex used to ensure that only one instance is created
instance_mutex = threading.Lock()

# Get the singleton instance
@classmethod
def instance(cls, pool_size=100):
if cls.my_instance:
return cls.my_instance

with cls.instance_mutex:
cls.my_instance = cls(pool_size)
return cls.my_instance

# Delegate all unknown methods to redis through the @redis_pool
def __getattr__(self, func):
with self.redis_pool.get() as redis:

def method(*args, **kwargs):
return getattr(redis, func)(*args, **kwargs)

return method

def __init__(self, pool_size=10):
self.redis_host = OPENC3_REDIS_HOSTNAME
self.redis_port = OPENC3_REDIS_PORT
self.redis_pool = StoreConnectionPool(self.build_redis, pool_size)
self.topic_offsets = {}
self.pipelines = {}

if not openc3_redis_cluster:

def build_redis(self):
# NOTE: We can't use decode_response because it tries to decode the binary
# packet buffer which does not work. Thus strings come back as bytes like
# b"target_name" and we decode them using b"target_name".decode()
return redis.Redis(
host=self.redis_host,
port=self.redis_port,
username=OPENC3_REDIS_USERNAME,
password=OPENC3_REDIS_PASSWORD,
)

###########################################################################
# Stream APIs
###########################################################################

def get_oldest_message(self, topic):
with self.redis_pool.get() as redis:
result = redis.xrange(topic, count=1)
if result and len(result) > 0:
return result[0]
else:
return None

def get_newest_message(self, topic):
with self.redis_pool.get() as redis:
# Default in xrevrange is range end '+', start '-' which means get all
# elements from higher ID to lower ID and since we're limiting to 1
# we get the last element. See https://redis.io/commands/xrevrange.
result = redis.xrevrange(topic, count=1)
if result and len(result) > 0:
first = list(result[0])
first[0] = first[0].decode()
return first
else:
return (None, None)

def get_last_offset(self, topic):
with self.redis_pool.get() as redis:
result = redis.xrevrange(topic, count=1)
if result and result[0] and result[0][0]:
return result[0][0].decode()
else:
return "0-0"

def update_topic_offsets(self, topics):
offsets = []
for topic in topics:
# Normally we will just be grabbing the topic offset
# this allows xread to get everything past this point
thread_id = threading.get_native_id()
if thread_id not in self.topic_offsets:
self.topic_offsets[thread_id] = {}
topic_offsets = self.topic_offsets[thread_id]
last_id = topic_offsets.get(topic)
if last_id:
offsets.append(last_id)
else:
# If there is no topic offset this is the first call.
# Get the last offset ID so we'll start getting everything from now on
offsets.append(self.get_last_offset(topic))
topic_offsets[topic] = offsets[-1]
return offsets

if not openc3_redis_cluster:

def read_topics(self, topics, offsets=None, timeout_ms=1000, count=None):
if len(topics) == 0:
return {}
thread_id = threading.get_native_id()
if thread_id not in self.topic_offsets:
self.topic_offsets[thread_id] = {}
topic_offsets = self.topic_offsets[thread_id]
try:
with self.redis_pool.get() as redis:
if not offsets:
offsets = self.update_topic_offsets(topics)
streams = {}
index = 0
for topic in topics:
streams[topic] = offsets[index]
index += 1
result = redis.xread(streams, block=timeout_ms, count=count)
if result and len(result) > 0:
for topic, messages in result:
for msg_id, msg_hash in messages:
if isinstance(topic, bytes):
topic = topic.decode()
if isinstance(msg_id, bytes):
msg_id = msg_id.decode()
topic_offsets[topic] = msg_id
yield topic, msg_id, msg_hash, redis
return result
except TimeoutError:
# Should return an empty hash not array - xread returns a hash
return {}

# Add new entry to the redis stream.
# > https://www.rubydoc.info/github/redis/redis-rb/Redis:xadd
#
# @example Without options
# store.write_topic('MANGO__TOPIC', {'message' => 'something'})
# @example With options
# store.write_topic('MANGO__TOPIC', {'message' => 'something'}, id: '0-0', maxlen: 1000, approximate: 'true')
#
# @param topic [String] the stream / topic
# @param msg_hash [Hash] one or multiple field-value pairs
#
# @option opts [String] :id the entry id, default value is `*`, it means auto generation,
# if `nil` id is passed it will be changed to `*`
# @option opts [Integer] :maxlen max length of entries, default value is `nil`, it means will grow forever
# @option opts [String] :approximate whether to add `~` modifier of maxlen or not, default value is 'true'
#
# @return [String] the entry id
def write_topic(self, topic, msg_hash, id="*", maxlen=None, approximate=True):
if not id:
id = "*"
with self.redis_pool.get() as redis:
return redis.xadd(topic, msg_hash, id=id, maxlen=maxlen, approximate=approximate)

# Trims older entries of the redis stream if needed.
# > https://www.rubydoc.info/github/redis/redis-rb/Redis:xtrim
#
# @example Without options
# store.trim_topic('MANGO__TOPIC', 1000)
# @example With options
# store.trim_topic('MANGO__TOPIC', 1000, approximate: 'true', limit: 0)
#
# @param topic [String] the stream key
# @param minid [Integer] Id to throw away data up to
# @param approximate [Boolean] whether to add `~` modifier of maxlen or not
# @param limit [Boolean] number of items to return from the call
#
# @return [Integer] the number of entries actually deleted
def trim_topic(self, topic, minid, approximate=True, limit=0):
with self.redis_pool.get() as redis:
return redis.xtrim(name=topic, minid=minid, approximate=approximate, limit=limit)


class EphemeralStore(Store):
# Variable that holds the singleton instance
my_instance = None

def __init__(self, pool_size=10):
super().__init__(pool_size)
self.redis_host = OPENC3_REDIS_EPHEMERAL_HOSTNAME
self.redis_port = OPENC3_REDIS_EPHEMERAL_PORT
self.redis_pool = StoreConnectionPool(self.build_redis, pool_size)
if openc3_redis_cluster:
import openc3enterprise.utilities.store # noqa: F401
Loading
Loading