Skip to content

Commit

Permalink
feat: brownie modules (#36)
Browse files Browse the repository at this point in the history
  • Loading branch information
fmarek-kindred authored Jun 6, 2024
1 parent 559846b commit f3c851e
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 43 deletions.
1 change: 1 addition & 0 deletions brownie/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ KAFKA_CLIENT_ID="$K8S_NAMESPACE:$SERVICE_NAME"
KAFKA_USERNAME=admin
KAFKA_PASSWORD=admin

ENABLED_MODULES=postgres,kafka
BROWNIE_NODE_OPTIONS=--max-heap-size=256
# Pattern with group wich extracts timestamp made of 14 digits: yyyyMMddHHmmss prefixed with "ts"
TIMESTAMP_PATTRN="^.*pit.*_(ts\d{14,14}).*"
Expand Down
1 change: 1 addition & 0 deletions brownie/deployment/helm/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ kind: ConfigMap
metadata:
name: {{ include "application.name" . }}
data:
ENABLED_MODULES: {{ .Values.ENABLED_MODULES | quote }}
SERVICE_NAME: {{ .Values.SERVICE_NAME | quote }}
BROWNIE_NODE_OPTIONS: {{ .Values.BROWNIE_NODE_OPTIONS | quote }}
DRY_RUN: {{ .Values.DRY_RUN | quote }}
Expand Down
1 change: 1 addition & 0 deletions brownie/scripts/deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ HELM_OVERWRITES="\
--set KAFKA_PORT=${KAFKA_PORT} \
--set KAFKA_USERNAME=${KAFKA_USERNAME} \
--set KAFKA_PASSWORD=${KAFKA_PASSWORD} \
--set ENABLED_MODULES=${ENABLED_MODULES } \
--set BROWNIE_DEPLOY_DEV_SECRET_STORE=${BROWNIE_DEPLOY_DEV_SECRET_STORE} \
--set BROWNIE_NODE_OPTIONS=${BROWNIE_NODE_OPTIONS} \
--set pod.repository=${REGISTRY_URL}/${SERVICE_NAME} \
Expand Down
42 changes: 25 additions & 17 deletions brownie/src/bootstrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,26 +42,34 @@ export const readParams = (): Config => {

logger.info("Application started with arguments: \n%s", JSON.stringify(Object.fromEntries(params), null, 2))

const db: PgConfig = new PgConfig(
fnReadValue(params, PgConfig.PARAM_PGHOST),
fnReadValue(params, PgConfig.PARAM_PGPORT),
fnReadValue(params, PgConfig.PARAM_PGDATABASE),
fnReadValue(params, PgConfig.PARAM_PGUSER),
fnReadValue(params, PgConfig.PARAM_PGPASSWORD)
)
const enabledModules = fnReadValue(params, Config.PARAM_ENABLED_MODULES)
let pgConfig: PgConfig | null = null
if (Config.isModuleEnabled(enabledModules, PgConfig.MODULE_NAME)) {
pgConfig = new PgConfig(
fnReadValue(params, PgConfig.PARAM_PGHOST),
fnReadValue(params, PgConfig.PARAM_PGPORT),
fnReadValue(params, PgConfig.PARAM_PGDATABASE),
fnReadValue(params, PgConfig.PARAM_PGUSER),
fnReadValue(params, PgConfig.PARAM_PGPASSWORD)
)
}

const kafka: KafkaConfig = new KafkaConfig(
fnReadValue(params, KafkaConfig.PARAM_BROKERS),
fnReadValue(params, KafkaConfig.PARAM_CLIENT_ID),
fnReadValue(params, KafkaConfig.PARAM_USERNAME, false),
fnReadValue(params, KafkaConfig.PARAM_PASSWORD, false),
fnReadValue(params, KafkaConfig.PARAM_PORT, false),
fnReadValue(params, KafkaConfig.PARAM_SASL_MECHANISM, false)
)
let kafkaConfig: KafkaConfig | null = null
if (Config.isModuleEnabled(enabledModules, KafkaConfig.MODULE_NAME)) {
kafkaConfig = new KafkaConfig(
fnReadValue(params, KafkaConfig.PARAM_BROKERS),
fnReadValue(params, KafkaConfig.PARAM_CLIENT_ID),
fnReadValue(params, KafkaConfig.PARAM_USERNAME, false),
fnReadValue(params, KafkaConfig.PARAM_PASSWORD, false),
fnReadValue(params, KafkaConfig.PARAM_PORT, false),
fnReadValue(params, KafkaConfig.PARAM_SASL_MECHANISM, false)
)
}

return new Config(
db,
kafka,
enabledModules,
pgConfig,
kafkaConfig,
new RegExp(fnReadValue(params, Config.PARAM_TIMESTAMP_PATTERN, true, Config.DEFAULT_TIMESTAMP_PATTERN)),
Config.parseRetention(fnReadValue(params, Config.PARAM_RETENTION_PERIOD, true, Config.DEFAULT_RETENTION_PERIOD)),
fnReadValue(params, Config.PARAM_DRY_RUN, false, false) === "true"
Expand Down
14 changes: 12 additions & 2 deletions brownie/src/config.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { logger } from "./logger.js"

export class PgConfig {
static MODULE_NAME: string = "postgresql"

static PARAM_PGHOST: string = "--pghost"
static PARAM_PGPORT: string = "--pgport"
static PARAM_PGDATABASE: string = "--pgdatabase"
Expand All @@ -22,6 +24,8 @@ export class PgConfig {
}

export class KafkaConfig {
static MODULE_NAME: string = "kafka"

static PARAM_BROKERS: string = "--kafka-brokers"
static PARAM_PORT: string = "--kafka-port"
static PARAM_CLIENT_ID: string = "--kafka-client-id"
Expand Down Expand Up @@ -58,15 +62,17 @@ export class KafkaConfig {
}

export class Config {
static PARAM_ENABLED_MODULES: string = "--enabled-modules"
static PARAM_DRY_RUN: string = "--dry-run"
static PARAM_RETENTION_PERIOD: string = "--retention-period"
static PARAM_TIMESTAMP_PATTERN: string = "--timestamp-pattern"
static DEFAULT_TIMESTAMP_PATTERN: RegExp = /^.*pit.*(ts[0-9]{14,14}).*$/
static DEFAULT_RETENTION_PERIOD: string = "3days"

constructor(
readonly pg: PgConfig,
readonly kafka: KafkaConfig,
readonly enabledModules: string,
readonly pg: PgConfig | null,
readonly kafka: KafkaConfig | null,
readonly timestampPattern: RegExp,
readonly retentionMinutes: number,
readonly dryRun: boolean
Expand All @@ -84,4 +90,8 @@ export class Config {

throw new Error(`Invalid format for retention. Expected "<digit><unit>", got: ${ value }`)
}

static isModuleEnabled = (modules: string, module: string): boolean => {
return (modules.indexOf(module) !== -1)
}
}
46 changes: 25 additions & 21 deletions brownie/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,18 @@ const main = async () => {

logger.info("main(), Parsed configuration: \n%s", JSON.stringify({ ...cleanedConfig, timestampPattern: config.timestampPattern.toString() }, null, 2))

logger.info("Cleaning old databases...")
const cleanedDbsCount = await cleanOldDatabases(config)
if (cleanedDbsCount > 0) {
logger.info("Dropped %s database%s", cleanedDbsCount, cleanedDbsCount > 1 ? "s" : "")
} else {
logger.info("There are no databases to clean")
if (cfg.Config.isModuleEnabled(config.enabledModules, cfg.PgConfig.MODULE_NAME)) {
await cleanPostgresDatabases(config)
}
logger.info("\n\n")

logger.info("Cleaning kafka topics...")
const cleanedTopicsCount = await cleanTopics(config)
if (cleanedTopicsCount > 0) {
logger.info("Deleted %s topics%s", cleanedTopicsCount, cleanedTopicsCount > 1 ? "s" : "")
} else {
logger.info("There are no topics to delete")
if (cfg.Config.isModuleEnabled(config.enabledModules, cfg.KafkaConfig.MODULE_NAME)) {
await cleanTopics(config)
}
}

const cleanOldDatabases = async (config: cfg.Config): Promise<number> => {
const cleanPostgresDatabases = async (config: cfg.Config) => {
logger.info("Cleaning old databases...")

let cleanedCount = 0
const pgClient = new pg.Client({
user: config.pg.username,
Expand Down Expand Up @@ -68,25 +61,25 @@ const cleanOldDatabases = async (config: cfg.Config): Promise<number> => {
}

try {
logger.info("cleanOldDatabases(): Deleting the expired database: %s", dbname)
logger.info("cleanPostgresDatabases(): Deleting the expired database: %s", dbname)

if (config.dryRun) {
logger.info("cleanOldDatabases(): Database has NOT been dropped (dry run mode): %s", dbname)
logger.info("cleanPostgresDatabases(): Database has NOT been dropped (dry run mode): %s", dbname)
} else {
await pgClient.query({
name: `drop-db-${ dbname }`,
text: `DROP DATABASE "${ dbname }"`
})

logger.info("cleanOldDatabases(): Database has been dropped: %s", dbname)
logger.info("cleanPostgresDatabases(): Database has been dropped: %s", dbname)
}
cleanedCount++

const sleep = new Promise(resolve => setTimeout(resolve, 2_000))
await sleep

} catch (e) {
logger.error("cleanOldDatabases(): Unable to drop database '%s'. Error: %s", dbname, e.message)
logger.error("cleanPostgresDatabases(): Unable to drop database '%s'. Error: %s", dbname, e.message)
if (e.cause) logger.error(e.cause)
if (e.stack) logger.error("Stack:\n%s", e.stack)
}
Expand All @@ -95,10 +88,17 @@ const cleanOldDatabases = async (config: cfg.Config): Promise<number> => {
pgClient?.end()
}

return cleanedCount
if (cleanedCount > 0) {
logger.info("Dropped %s database%s", cleanedCount, cleanedCount > 1 ? "s" : "")
} else {
logger.info("There are no databases to clean")
}
logger.info("\n\n")
}

const cleanTopics = async (config: cfg.Config): Promise<number> => {
const cleanTopics = async (config: cfg.Config) => {
logger.info("Cleaning kafka topics...")

let cleanedCount = 0

const kafkaConfig: KafkaJsConfig = {
Expand Down Expand Up @@ -158,7 +158,11 @@ const cleanTopics = async (config: cfg.Config): Promise<number> => {
await kafkaAdmin.disconnect()
}

return cleanedCount
if (cleanedCount > 0) {
logger.info("Deleted %s topics%s", cleanedCount, cleanedCount > 1 ? "s" : "")
} else {
logger.info("There are no topics to delete")
}
}

main()
Expand Down
32 changes: 29 additions & 3 deletions brownie/test/bootstrap.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { PgConfig, KafkaConfig, Config } from "../src/config.js"
import { fnReadValue, readParams } from "../src/bootstrap.js"
import { logger } from "../src/logger.js"

const modulesParams = [ Config.PARAM_ENABLED_MODULES, `${KafkaConfig.MODULE_NAME}, ${PgConfig.MODULE_NAME}` ]
const requiredParams = [
PgConfig.PARAM_PGHOST, "127.0.0.1",
PgConfig.PARAM_PGPORT, 1000,
Expand Down Expand Up @@ -88,7 +89,7 @@ describe("bootstrap with correct configs", () => {

it("readParams() should return populated config", () => {
const allParams: Array<string | number> = ["skip-first", ""]
mockParams(sandbox, process, 'argv', allParams.concat(requiredParams, optionalParams))
mockParams(sandbox, process, 'argv', allParams.concat(modulesParams, requiredParams, optionalParams))
sandbox.stub(process, 'env').value({})

const config = readParams()
Expand All @@ -99,7 +100,7 @@ describe("bootstrap with correct configs", () => {
sandbox.stub(process, 'argv').value([ PgConfig.PARAM_PGDATABASE, "" ])

const allParams: Array<string | number> = []
mockParams(sandbox, process, 'env', allParams.concat(requiredParams, optionalParams))
mockParams(sandbox, process, 'env', allParams.concat(modulesParams, requiredParams, optionalParams))

const config = readParams()
evaluatePopulated(config)
Expand All @@ -108,7 +109,7 @@ describe("bootstrap with correct configs", () => {
it("readParams() should use default values for optional params", () => {
sandbox.stub(process, 'env').value({})
const params: Array<string | number> = ["skip-first", ""]
mockParams(sandbox, process, 'argv', params.concat(requiredParams))
mockParams(sandbox, process, 'argv', params.concat(modulesParams, requiredParams))

const config = readParams()
evaluatePopulated(
Expand All @@ -121,6 +122,31 @@ describe("bootstrap with correct configs", () => {
)
})

it("readParams() should not load configs for disabled kafka", () => {
sandbox.stub(process, 'env').value({})
const params: Array<string | number> = ["skip-first", ""]
mockParams(sandbox, process, 'argv', params.concat([ Config.PARAM_ENABLED_MODULES, PgConfig.MODULE_NAME ], requiredParams))

const config = readParams()
console.log(config)
chai.expect(config.enabledModules).eq(PgConfig.MODULE_NAME)
chai.expect(config.pg).be.not.null
chai.expect(config.kafka).be.null
})

it("readParams() should not load configs for disabled postgres", () => {
sandbox.stub(process, 'env').value({})
const params: Array<string | number> = ["skip-first", ""]
mockParams(sandbox, process, 'argv', params.concat([ Config.PARAM_ENABLED_MODULES, KafkaConfig.MODULE_NAME ], requiredParams))

const config = readParams()
console.log(config)
chai.expect(config.enabledModules).eq(KafkaConfig.MODULE_NAME)
chai.expect(config.kafka).be.not.null
chai.expect(config.pg).be.null

})

afterEach(() => {
sandbox.restore()
})
Expand Down
6 changes: 6 additions & 0 deletions brownie/test/config.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,10 @@ describe("Tests for Config", () => {
chai.expect(() => Config.parseRetention("2")).to.throw("Invalid format for retention. Expected \"<digit><unit>\", got: 2")
})

it("isModuleEnabled", () => {
chai.expect(Config.isModuleEnabled("abc, xyz", "abc")).be.true
chai.expect(Config.isModuleEnabled("abc, xyz", "xyz")).be.true
chai.expect(Config.isModuleEnabled("abc, xyz", "klm")).be.false
})

})

0 comments on commit f3c851e

Please sign in to comment.