Skip to content

Commit

Permalink
Merge pull request #1523 from OpenC3/python_redis_cluster_changes
Browse files Browse the repository at this point in the history
Python Redis Cluster Updates (Enterprise)
  • Loading branch information
ryanmelt authored Sep 18, 2024
2 parents b6e77d0 + 6d40d23 commit 96d62b8
Show file tree
Hide file tree
Showing 7 changed files with 319 additions and 295 deletions.
2 changes: 1 addition & 1 deletion openc3-redis/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ RUN if [[ $OPENC3_DEPENDENCY_REGISTRY == 'docker.io' ]]; then \
# Modify the redis user and group to be 1001
# The default alpine redis container uses 999
# See https://github.com/docker-library/redis/blob/master/7.2/alpine/Dockerfile
apk add shadow; \
apk add shadow bash; \
usermod -u 1001 redis; \
groupmod -g 1001 redis; \
# Remove gosu to eliminate a ton of CVEs
Expand Down
2 changes: 1 addition & 1 deletion openc3/python/openc3/utilities/bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def getClient(cls):
my_module = importlib.import_module("." + OPENC3_CLOUD.lower() + "_bucket", "openc3.utilities")
# If the file doesn't exist try the Enterprise module
except ModuleNotFoundError:
my_module = importlib.import_module("." + OPENC3_CLOUD.lower() + "_bucket", "openc3-enterprise.utilities")
my_module = importlib.import_module("." + OPENC3_CLOUD.lower() + "_bucket", "openc3enterprise.utilities")
return getattr(my_module, bucket_class)()

def create(self, bucket):
Expand Down
241 changes: 3 additions & 238 deletions openc3/python/openc3/utilities/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,249 +14,14 @@
# This file may also be used under the terms of a commercial license
# if purchased from OpenC3, Inc.

import redis
from redis.exceptions import TimeoutError
from openc3.utilities.connection_pool import ConnectionPool
from contextlib import contextmanager
import threading
from openc3.environment import *

if OPENC3_REDIS_CLUSTER:
openc3_redis_cluster = True
else:
openc3_redis_cluster = False

from openc3.utilities.store_implementation import Store, StoreConnectionPool, StoreMeta, EphemeralStore # noqa: F401

class StoreConnectionPool(ConnectionPool):
@contextmanager
def pipelined(self):
if openc3_redis_cluster:
yield # TODO: Update keys to support pipelining in cluster
else:
with self.get() as redis:
pipeline = redis.pipeline(transaction=False)
thread_id = threading.get_native_id()
self.pipelines[thread_id] = pipeline
try:
yield
finally:
pipeline.execute()
self.pipelines[thread_id] = None

@contextmanager
def get(self):
thread_id = threading.get_native_id()
if thread_id not in self.pipelines:
self.pipelines[thread_id] = None
pipeline = self.pipelines[thread_id]
if pipeline:
yield pipeline
else:
item = None
with self.lock:
if not self.pool.empty():
item = self.pool.get(False)
elif self.count < self.pool_size:
item = self.ctor()
self.count += 1
else:
item = self.pool.get()
try:
yield item
finally:
self.pool.put(item)


class StoreMeta(type):
def __getattribute__(cls, func):
if func == "instance" or func == "instance_mutex" or func == "my_instance":
return super().__getattribute__(func)

def method(*args, **kw_args):
return getattr(cls.instance(), func)(*args, **kw_args)

return method


class Store(metaclass=StoreMeta):
# Variable that holds the singleton instance
my_instance = None

# Mutex used to ensure that only one instance is created
instance_mutex = threading.Lock()

# Get the singleton instance
@classmethod
def instance(cls, pool_size=100):
if cls.my_instance:
return cls.my_instance

with cls.instance_mutex:
cls.my_instance = cls(pool_size)
return cls.my_instance

# Delegate all unknown methods to redis through the @redis_pool
def __getattr__(self, func):
with self.redis_pool.get() as redis:

def method(*args, **kwargs):
return getattr(redis, func)(*args, **kwargs)

return method

def __init__(self, pool_size=10):
self.redis_host = OPENC3_REDIS_HOSTNAME
self.redis_port = OPENC3_REDIS_PORT
self.redis_pool = StoreConnectionPool(self.build_redis, pool_size)
self.topic_offsets = {}
self.pipelines = {}

if not openc3_redis_cluster:

def build_redis(self):
# NOTE: We can't use decode_response because it tries to decode the binary
# packet buffer which does not work. Thus strings come back as bytes like
# b"target_name" and we decode them using b"target_name".decode()
return redis.Redis(
host=self.redis_host,
port=self.redis_port,
username=OPENC3_REDIS_USERNAME,
password=OPENC3_REDIS_PASSWORD,
)

###########################################################################
# Stream APIs
###########################################################################

def get_oldest_message(self, topic):
with self.redis_pool.get() as redis:
result = redis.xrange(topic, count=1)
if result and len(result) > 0:
return result[0]
else:
return None

def get_newest_message(self, topic):
with self.redis_pool.get() as redis:
# Default in xrevrange is range end '+', start '-' which means get all
# elements from higher ID to lower ID and since we're limiting to 1
# we get the last element. See https://redis.io/commands/xrevrange.
result = redis.xrevrange(topic, count=1)
if result and len(result) > 0:
first = list(result[0])
first[0] = first[0].decode()
return first
else:
return (None, None)

def get_last_offset(self, topic):
with self.redis_pool.get() as redis:
result = redis.xrevrange(topic, count=1)
if result and result[0] and result[0][0]:
return result[0][0].decode()
else:
return "0-0"

def update_topic_offsets(self, topics):
offsets = []
for topic in topics:
# Normally we will just be grabbing the topic offset
# this allows xread to get everything past this point
thread_id = threading.get_native_id()
if thread_id not in self.topic_offsets:
self.topic_offsets[thread_id] = {}
topic_offsets = self.topic_offsets[thread_id]
last_id = topic_offsets.get(topic)
if last_id:
offsets.append(last_id)
else:
# If there is no topic offset this is the first call.
# Get the last offset ID so we'll start getting everything from now on
offsets.append(self.get_last_offset(topic))
topic_offsets[topic] = offsets[-1]
return offsets

if not openc3_redis_cluster:

def read_topics(self, topics, offsets=None, timeout_ms=1000, count=None):
if len(topics) == 0:
return {}
thread_id = threading.get_native_id()
if thread_id not in self.topic_offsets:
self.topic_offsets[thread_id] = {}
topic_offsets = self.topic_offsets[thread_id]
try:
with self.redis_pool.get() as redis:
if not offsets:
offsets = self.update_topic_offsets(topics)
streams = {}
index = 0
for topic in topics:
streams[topic] = offsets[index]
index += 1
result = redis.xread(streams, block=timeout_ms, count=count)
if result and len(result) > 0:
for topic, messages in result:
for msg_id, msg_hash in messages:
if isinstance(topic, bytes):
topic = topic.decode()
if isinstance(msg_id, bytes):
msg_id = msg_id.decode()
topic_offsets[topic] = msg_id
yield topic, msg_id, msg_hash, redis
return result
except TimeoutError:
# Should return an empty hash not array - xread returns a hash
return {}

# Add new entry to the redis stream.
# > https://www.rubydoc.info/github/redis/redis-rb/Redis:xadd
#
# @example Without options
# store.write_topic('MANGO__TOPIC', {'message' => 'something'})
# @example With options
# store.write_topic('MANGO__TOPIC', {'message' => 'something'}, id: '0-0', maxlen: 1000, approximate: 'true')
#
# @param topic [String] the stream / topic
# @param msg_hash [Hash] one or multiple field-value pairs
#
# @option opts [String] :id the entry id, default value is `*`, it means auto generation,
# if `nil` id is passed it will be changed to `*`
# @option opts [Integer] :maxlen max length of entries, default value is `nil`, it means will grow forever
# @option opts [String] :approximate whether to add `~` modifier of maxlen or not, default value is 'true'
#
# @return [String] the entry id
def write_topic(self, topic, msg_hash, id="*", maxlen=None, approximate=True):
if not id:
id = "*"
with self.redis_pool.get() as redis:
return redis.xadd(topic, msg_hash, id=id, maxlen=maxlen, approximate=approximate)

# Trims older entries of the redis stream if needed.
# > https://www.rubydoc.info/github/redis/redis-rb/Redis:xtrim
#
# @example Without options
# store.trim_topic('MANGO__TOPIC', 1000)
# @example With options
# store.trim_topic('MANGO__TOPIC', 1000, approximate: 'true', limit: 0)
#
# @param topic [String] the stream key
# @param minid [Integer] Id to throw away data up to
# @param approximate [Boolean] whether to add `~` modifier of maxlen or not
# @param limit [Boolean] number of items to return from the call
#
# @return [Integer] the number of entries actually deleted
def trim_topic(self, topic, minid, approximate=True, limit=0):
with self.redis_pool.get() as redis:
return redis.xtrim(name=topic, minid=minid, approximate=approximate, limit=limit)


class EphemeralStore(Store):
# Variable that holds the singleton instance
my_instance = None

def __init__(self, pool_size=10):
super().__init__(pool_size)
self.redis_host = OPENC3_REDIS_EPHEMERAL_HOSTNAME
self.redis_port = OPENC3_REDIS_EPHEMERAL_PORT
self.redis_pool = StoreConnectionPool(self.build_redis, pool_size)
if openc3_redis_cluster:
import openc3enterprise.utilities.store # noqa: F401
Loading

0 comments on commit 96d62b8

Please sign in to comment.