Skip to content

Commit

Permalink
refactor(chalice): refactored db-drivers
Browse files Browse the repository at this point in the history
refactor(scripts): defined ch-dataPort
  • Loading branch information
tahayk committed Dec 9, 2024
1 parent 79ed719 commit c6238da
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 34 deletions.
54 changes: 21 additions & 33 deletions api/chalicelib/utils/ch_client_exp.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,18 @@

import clickhouse_connect
from clickhouse_connect.driver.query import QueryContext
from clickhouse_connect.driver.exceptions import DatabaseError
from decouple import config

logger = logging.getLogger(__name__)

_CH_CONFIG = {"host": config("ch_host"),
"user": config("ch_user", default="default"),
"password": config("ch_password", default=""),
"port": config("ch_port_http", cast=int),
"client_name": config("APP_NAME", default="PY")}
CH_CONFIG = dict(_CH_CONFIG)

settings = {}
if config('ch_timeout', cast=int, default=-1) > 0:
logging.info(f"CH-max_execution_time set to {config('ch_timeout')}s")
Expand All @@ -18,14 +27,6 @@
logging.info(f"CH-receive_timeout set to {config('ch_receive_timeout')}s")
settings = {**settings, "receive_timeout": config('ch_receive_timeout', cast=int)}

logger.info("-- CH config --")
logger.info(f'host={config("ch_host")}')
logger.info(f'database={config("ch_database", default="default")}')
logger.info(f'user={config("ch_user", default="default")}')
logger.info(f'password={config("ch_password", default="")}')
logger.info(f'port={config("ch_port_http", cast=int)}')
logger.info(f'settings={settings}')

extra_args = {}
if config("CH_COMPRESSION", cast=bool, default=True):
extra_args["compression"] = "lz4"
Expand All @@ -47,21 +48,17 @@ def wrapper(*args, **kwargs):


class ClickHouseConnectionPool:
def __init__(self, min_size, max_size, settings):
def __init__(self, min_size, max_size):
self.min_size = min_size
self.max_size = max_size
self.pool = Queue()
self.lock = threading.Lock()
self.total_connections = 0
self.settings = settings

# Initialize the pool with min_size connections
for _ in range(self.min_size):
client = clickhouse_connect.get_client(host=config("ch_host"),
client = clickhouse_connect.get_client(**CH_CONFIG,
database=config("ch_database", default="default"),
user=config("ch_user", default="default"),
password=config("ch_password", default=""),
port=config("ch_port_http", cast=int),
settings=settings,
**extra_args)
self.pool.put(client)
Expand All @@ -75,15 +72,10 @@ def get_connection(self):
except Empty:
with self.lock:
if self.total_connections < self.max_size:
client = clickhouse_connect.get_client(
host=config("ch_host"),
database=config("ch_database", default="default"),
user=config("ch_user", default="default"),
password=config("ch_password", default=""),
port=config("ch_port_http", cast=int),
settings=settings,
**extra_args
)
client = clickhouse_connect.get_client(**CH_CONFIG,
database=config("ch_database", default="default"),
settings=settings,
**extra_args)
self.total_connections += 1
return client
# If max_size reached, wait until a connection is available
Expand Down Expand Up @@ -119,12 +111,11 @@ def make_pool():
except Exception as error:
logger.error("Error while closing all connexions to CH", error)
try:
CH_pool = ClickHouseConnectionPool(min_size=config("PG_MINCONN", cast=int, default=4),
max_size=config("PG_MAXCONN", cast=int, default=8),
settings=settings)
CH_pool = ClickHouseConnectionPool(min_size=config("CH_MINCONN", cast=int, default=4),
max_size=config("CH_MAXCONN", cast=int, default=8))
if CH_pool is not None:
logger.info("Connection pool created successfully for CH")
except Exception as error:
except ConnectionError as error:
logger.error("Error while connecting to CH", error)
if RETRY < RETRY_MAX:
RETRY += 1
Expand All @@ -140,15 +131,12 @@ class ClickHouseClient:

def __init__(self, database=None):
if self.__client is None:
if config('CH_POOL', cast=bool, default=True):
if database is None and config('CH_POOL', cast=bool, default=True):
self.__client = CH_pool.get_connection()
else:
self.__client = clickhouse_connect.get_client(host=config("ch_host"),
self.__client = clickhouse_connect.get_client(**CH_CONFIG,
database=database if database else config("ch_database",
default="default"),
user=config("ch_user", default="default"),
password=config("ch_password", default=""),
port=config("ch_port_http", cast=int),
settings=settings,
**extra_args)
self.__client.execute = transform_result(self.__client.query)
Expand All @@ -173,7 +161,7 @@ def __exit__(self, *args):


async def init():
logger.info(f">CH_POOL:{config('CH_POOL', default=None)}")
logger.info(f">use CH_POOL:{config('CH_POOL', default=True)}")
if config('CH_POOL', cast=bool, default=True):
make_pool()

Expand Down
2 changes: 1 addition & 1 deletion api/chalicelib/utils/pg_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def recreate_cursor(self, rollback=False):


async def init():
logger.info(f">PG_POOL:{config('PG_POOL', default=None)}")
logger.info(f">use PG_POOL:{config('PG_POOL', default=True)}")
if config('PG_POOL', cast=bool, default=True):
make_pool()

Expand Down
1 change: 1 addition & 0 deletions scripts/helmcharts/vars.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ clickhouse: &clickhouse
password: ""
service:
webPort: 9000
dataPort: 8123

# For enterpriseEdition
quickwit: &quickwit
Expand Down

0 comments on commit c6238da

Please sign in to comment.