From 0f999d488c743d8f205b5374155bf9a91155dc5f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20Skr=C3=B8vseth?= Date: Mon, 22 Jul 2024 11:15:48 +0200 Subject: [PATCH] Fix Redis --- server/src/auth/cache/cache-gauge.ts | 12 ++- server/src/auth/cache/cache.ts | 52 +++++++--- server/src/auth/cache/interface.ts | 9 -- server/src/auth/cache/memory-cache.ts | 58 ++++++----- server/src/auth/cache/redis-cache.ts | 142 +++++++++++++++++++++++--- server/src/auth/cache/types.ts | 5 + server/src/auth/get-auth-client.ts | 3 + server/src/config/config.ts | 8 -- server/src/init.ts | 2 - server/src/plugins/health.ts | 34 ++++-- 10 files changed, 241 insertions(+), 84 deletions(-) delete mode 100644 server/src/auth/cache/interface.ts create mode 100644 server/src/auth/cache/types.ts diff --git a/server/src/auth/cache/cache-gauge.ts b/server/src/auth/cache/cache-gauge.ts index 1674e5246..10310536e 100644 --- a/server/src/auth/cache/cache-gauge.ts +++ b/server/src/auth/cache/cache-gauge.ts @@ -3,21 +3,27 @@ import { Counter, Gauge, Histogram } from 'prom-client'; const labelNames = ['hit'] as const; -export const cacheRedisGauge = new Counter({ +export const redisCacheGauge = new Counter({ name: 'obo_redis_cache', help: 'Number of requests to the Redis OBO cache. "hit" is the type of hit: "miss", "invalid", "hit" or "expired".', labelNames, registers: [proxyRegister], }); -export const cacheGauge = new Counter({ +export const redisCacheSizeGauge = new Gauge({ + name: 'obo_redis_cache_size', + help: 'Number of OBO tokens in the Redis cache.', + registers: [proxyRegister], +}); + +export const memoryCacheGauge = new Counter({ name: 'obo_cache', help: 'Number of requests to the OBO cache. "hit" is the type of hit: "miss", "redis", "hit", or "expired".', labelNames, registers: [proxyRegister], }); -export const cacheSizeGauge = new Gauge({ +export const memoryCacheSizeGauge = new Gauge({ name: 'obo_cache_size', help: 'Number of OBO tokens in the cache.', registers: [proxyRegister], diff --git a/server/src/auth/cache/cache.ts b/server/src/auth/cache/cache.ts index 1a108c977..d8e77d574 100644 --- a/server/src/auth/cache/cache.ts +++ b/server/src/auth/cache/cache.ts @@ -1,5 +1,4 @@ -import { OboCacheInterface } from '@app/auth/cache/interface'; -import { oboMemoryCache } from '@app/auth/cache/memory-cache'; +import { OboMemoryCache } from '@app/auth/cache/memory-cache'; import { OboRedisCache } from '@app/auth/cache/redis-cache'; import { optionalEnvString } from '@app/config/env-var'; @@ -7,24 +6,40 @@ const REDIS_URI = optionalEnvString('REDIS_URI_OBO_CACHE'); const REDIS_USERNAME = optionalEnvString('REDIS_USERNAME_OBO_CACHE'); const REDIS_PASSWORD = optionalEnvString('REDIS_PASSWORD_OBO_CACHE'); -class OboTieredCache implements OboCacheInterface { - private oboRedisCache: OboRedisCache; +class OboTieredCache { + #oboRedisCache: OboRedisCache; + #oboMemoryCache: OboMemoryCache | null = null; + #isReady = false; constructor(redisUri: string, redisUsername: string, redisPassword: string) { - this.oboRedisCache = new OboRedisCache(redisUri, redisUsername, redisPassword); + this.#oboRedisCache = new OboRedisCache(redisUri, redisUsername, redisPassword); + this.#init(); + } + + async #init() { + await this.#oboRedisCache.init(); + const allTokenMessages = await this.#oboRedisCache.getAll(); + const oboMemoryCache = new OboMemoryCache(allTokenMessages); + this.#oboMemoryCache = oboMemoryCache; + this.#oboRedisCache.addTokenListener(({ key, token, expiresAt }) => oboMemoryCache.set(key, token, expiresAt)); + this.#isReady = true; } public async get(key: string): Promise { - const memoryHit = await oboMemoryCache.get(key); + if (this.#oboMemoryCache === null) { + return null; + } + + const memoryHit = this.#oboMemoryCache.get(key); if (memoryHit !== null) { return memoryHit.token; } - const redisHit = await this.oboRedisCache.get(key); + const redisHit = await this.#oboRedisCache.get(key); if (redisHit !== null) { - oboMemoryCache.set(key, redisHit.token, redisHit.expiresAt); + this.#oboMemoryCache.set(key, redisHit.token, redisHit.expiresAt); return redisHit.token; } @@ -33,19 +48,30 @@ class OboTieredCache implements OboCacheInterface { } public async set(key: string, token: string, expiresAt: number): Promise { - await Promise.all([oboMemoryCache.set(key, token, expiresAt), this.oboRedisCache.set(key, token, expiresAt)]); + this.#oboMemoryCache?.set(key, token, expiresAt); + await this.#oboRedisCache.set(key, token, expiresAt); + } + + public get isReady(): boolean { + return this.#isReady && this.#oboRedisCache.isReady; } } class OboSimpleCache { - public async get(key: string): Promise { - const memoryHit = await oboMemoryCache.get(key); + #oboMemoryCache = new OboMemoryCache([]); + + public get(key: string): string | null { + const memoryHit = this.#oboMemoryCache.get(key); return memoryHit?.token ?? null; } - public async set(key: string, token: string, expiresAt: number): Promise { - await oboMemoryCache.set(key, token, expiresAt); + public set(key: string, token: string, expiresAt: number): void { + this.#oboMemoryCache.set(key, token, expiresAt); + } + + public get isReady(): boolean { + return true; } } diff --git a/server/src/auth/cache/interface.ts b/server/src/auth/cache/interface.ts deleted file mode 100644 index 7b50bfcc0..000000000 --- a/server/src/auth/cache/interface.ts +++ /dev/null @@ -1,9 +0,0 @@ -export interface OboCacheTierInterface { - get(key: string): Promise<{ token: string; expiresAt: number } | null>; - set(key: string, token: string, expiresAt: number): Promise; -} - -export interface OboCacheInterface { - get(key: string): Promise; - set(key: string, token: string, expiresAt: number): Promise; -} diff --git a/server/src/auth/cache/memory-cache.ts b/server/src/auth/cache/memory-cache.ts index 4733b2ed9..a33520f63 100644 --- a/server/src/auth/cache/memory-cache.ts +++ b/server/src/auth/cache/memory-cache.ts @@ -1,27 +1,33 @@ -import { cacheGauge, cacheSizeGauge } from '@app/auth/cache/cache-gauge'; -import { OboCacheTierInterface } from '@app/auth/cache/interface'; +import { memoryCacheGauge, memoryCacheSizeGauge } from '@app/auth/cache/cache-gauge'; +import { TokenMessage } from '@app/auth/cache/types'; import { getLogger } from '@app/logger'; -const log = getLogger('obo-cache'); +const log = getLogger('obo-memory-cache'); type Value = [string, number]; -export class OboMemoryCache implements OboCacheTierInterface { - private cache: Map = new Map(); +export class OboMemoryCache { + #cache: Map; + + constructor(tokenMessages: TokenMessage[]) { + this.#cache = new Map( + tokenMessages.map((tokenMessage) => [tokenMessage.key, [tokenMessage.token, tokenMessage.expiresAt]]), + ); + + log.info({ msg: `Created OBO memory cache with ${tokenMessages.length} tokens.` }); - constructor() { /** * Clean OBO token cache every 10 minutes. * OBO tokens expire after 1 hour. */ - setInterval(() => this.clean(), 10 * 60 * 1000); // 10 minutes. + setInterval(() => this.#clean(), 10 * 60 * 1_000); } - public async get(key: string) { - const value = this.cache.get(key); + public get(key: string) { + const value = this.#cache.get(key); if (value === undefined) { - cacheGauge.inc({ hit: 'miss' }); + memoryCacheGauge.inc({ hit: 'miss' }); return null; } @@ -29,43 +35,43 @@ export class OboMemoryCache implements OboCacheTierInterface { const [token, expiresAt] = value; if (expiresAt <= now()) { - cacheGauge.inc({ hit: 'expired' }); - this.cache.delete(key); - cacheSizeGauge.set(this.cache.size); + memoryCacheGauge.inc({ hit: 'expired' }); + this.#cache.delete(key); + memoryCacheSizeGauge.set(this.#cache.size); return null; } - cacheGauge.inc({ hit: 'hit' }); + memoryCacheGauge.inc({ hit: 'hit' }); return { token, expiresAt }; } - public async set(key: string, token: string, expiresAt: number) { - this.cache.set(key, [token, expiresAt]); - cacheSizeGauge.set(this.cache.size); + public set(key: string, token: string, expiresAt: number) { + this.#cache.set(key, [token, expiresAt]); + memoryCacheSizeGauge.set(this.#cache.size); } - private all() { - return Array.from(this.cache.entries()); + #all() { + return Array.from(this.#cache.entries()); } - private clean() { - const before = this.cache.size; + #clean() { + const before = this.#cache.size; const timestamp = now(); - const deleted: number = this.all() + const deleted: number = this.#all() .map(([key, [, expires_at]]) => { if (expires_at <= timestamp) { - return this.cache.delete(key); + return this.#cache.delete(key); } return false; }) .filter((d) => d).length; - const after = this.cache.size; - cacheSizeGauge.set(after); + const after = this.#cache.size; + memoryCacheSizeGauge.set(after); if (deleted === 0) { log.debug({ msg: `Cleaned the OBO token cache. No expired tokens found. Cache had ${before} tokens.` }); @@ -80,5 +86,3 @@ export class OboMemoryCache implements OboCacheTierInterface { } const now = () => Math.ceil(Date.now() / 1_000); - -export const oboMemoryCache = new OboMemoryCache(); diff --git a/server/src/auth/cache/redis-cache.ts b/server/src/auth/cache/redis-cache.ts index c32eea835..dcff43f97 100644 --- a/server/src/auth/cache/redis-cache.ts +++ b/server/src/auth/cache/redis-cache.ts @@ -1,17 +1,85 @@ import { RedisClientType, createClient } from 'redis'; import { getLogger } from '@app/logger'; -import { OboCacheTierInterface } from '@app/auth/cache/interface'; -import { cacheGauge, cacheRedisGauge } from '@app/auth/cache/cache-gauge'; +import { memoryCacheGauge, redisCacheGauge, redisCacheSizeGauge } from '@app/auth/cache/cache-gauge'; +import { TokenMessage } from '@app/auth/cache/types'; -const log = getLogger('redis-obo-cache'); +const log = getLogger('obo-redis-cache'); -export class OboRedisCache implements OboCacheTierInterface { - private client: RedisClientType; +export type TokenListener = (message: TokenMessage) => void; + +const TOKEN_CHANNEL = 'obo-token'; + +export class OboRedisCache { + #subscribeClient: RedisClientType; + #publishClient: RedisClientType; + #dataClient: RedisClientType; + + #listeners: TokenListener[] = []; constructor(url: string, username: string, password: string) { - this.client = createClient({ url, username, password }); - this.client.on('error', (error) => log.error({ msg: 'Redis Client Error', error })); - this.client.connect(); + this.#dataClient = createClient({ url, username, password, pingInterval: 3_000 }); + this.#dataClient.on('error', (error) => log.error({ msg: 'Redis Data Client Error', error })); + + this.#subscribeClient = this.#dataClient.duplicate(); + this.#subscribeClient.on('error', (error) => log.error({ msg: 'Redis Subscribe Client Error', error })); + + this.#publishClient = this.#dataClient.duplicate(); + this.#publishClient.on('error', (error) => log.error({ msg: 'Redis Publish Client Error', error })); + } + + public async init() { + await Promise.all([this.#subscribeClient.connect(), this.#publishClient.connect(), this.#dataClient.connect()]); + + await this.#subscribeClient.subscribe(TOKEN_CHANNEL, (json) => { + try { + const parsed: unknown = JSON.parse(json); + + if (!isTokenMessage(parsed)) { + log.warn({ msg: 'Invalid token message' }); + + return; + } + + for (const listener of this.#listeners) { + listener(parsed); + } + } catch (error) { + log.warn({ msg: 'Failed to parse token message', error }); + } + }); + + this.#refreshCacheSizeMetric(); + } + + #refreshCacheSizeMetric = async () => { + const count = await this.#dataClient.dbSize(); + redisCacheSizeGauge.set(count); + }; + + public async getAll(): Promise { + const keys = await this.#dataClient.keys('*'); + const tokens = await this.#dataClient.mGet(keys); + + const promises = tokens.map(async (token, index) => { + if (token === null) { + return null; + } + + const key = keys[index]; + + if (key === undefined) { + return null; + } + const ttl = await this.#dataClient.ttl(key); + + if (ttl === -2 || ttl === -1 || ttl === 0) { + return null; + } + + return { token, key, expiresAt: now() + ttl }; + }); + + return Promise.all(promises).then((results) => results.filter(isNotNull)); } public async get(key: string) { @@ -21,36 +89,78 @@ export class OboRedisCache implements OboCacheTierInterface { * Returns -1 if the key exists but has no associated expire. * @see https://redis.io/docs/latest/commands/ttl/ */ - const [token, ttl] = await Promise.all([this.client.get(key), this.client.ttl(key)]); + const [token, ttl] = await Promise.all([this.#dataClient.get(key), this.#dataClient.ttl(key)]); if (token === null || ttl === -2) { - cacheRedisGauge.inc({ hit: 'miss' }); + redisCacheGauge.inc({ hit: 'miss' }); return null; } if (ttl === -1) { - cacheRedisGauge.inc({ hit: 'invalid' }); - this.client.del(key); + redisCacheGauge.inc({ hit: 'invalid' }); + this.#dataClient.del(key); + this.#refreshCacheSizeMetric(); return null; } if (ttl === 0) { - cacheRedisGauge.inc({ hit: 'expired' }); + redisCacheGauge.inc({ hit: 'expired' }); return null; } - cacheRedisGauge.inc({ hit: 'hit' }); - cacheGauge.inc({ hit: 'redis' }); + redisCacheGauge.inc({ hit: 'hit' }); + memoryCacheGauge.inc({ hit: 'redis' }); return { token, expiresAt: now() + ttl }; } public async set(key: string, token: string, expiresAt: number) { - await this.client.set(key, token, { EXAT: expiresAt }); + const json = JSON.stringify({ key, token, expiresAt } satisfies TokenMessage); + this.#publishClient.publish(TOKEN_CHANNEL, json); + + await this.#dataClient.set(key, token, { EXAT: expiresAt }); + + this.#refreshCacheSizeMetric(); + } + + public addTokenListener(listener: TokenListener) { + this.#listeners.push(listener); + } + + public removeTokenListener(listener: TokenListener) { + const index = this.#listeners.indexOf(listener); + + if (index !== -1) { + this.#listeners.splice(index, 1); + } + } + + public get isReady() { + return ( + this.#dataClient.isReady && + this.#dataClient.isOpen && + this.#publishClient.isReady && + this.#publishClient.isOpen && + this.#subscribeClient.isReady && + this.#subscribeClient.isOpen && + this.#subscribeClient.isPubSubActive + ); } } const now = () => Math.floor(Date.now() / 1_000); + +const isNotNull = (value: T | null): value is T => value !== null; + +const isTokenMessage = (message: unknown): message is TokenMessage => { + if (typeof message !== 'object' || message === null) { + return false; + } + + const { key, token, expiresAt } = message as TokenMessage; + + return typeof key === 'string' && typeof token === 'string' && typeof expiresAt === 'number'; +}; diff --git a/server/src/auth/cache/types.ts b/server/src/auth/cache/types.ts new file mode 100644 index 000000000..64f8864af --- /dev/null +++ b/server/src/auth/cache/types.ts @@ -0,0 +1,5 @@ +export interface TokenMessage { + key: string; + token: string; + expiresAt: number; +} diff --git a/server/src/auth/get-auth-client.ts b/server/src/auth/get-auth-client.ts index 894626503..002d98c5b 100644 --- a/server/src/auth/get-auth-client.ts +++ b/server/src/auth/get-auth-client.ts @@ -1,6 +1,7 @@ import { BaseClient, Issuer } from 'openid-client'; import { AZURE_APP_CLIENT_ID, AZURE_APP_JWK, AZURE_APP_WELL_KNOWN_URL } from '@app/config/config'; import { getLogger } from '@app/logger'; +import { isLocal } from '@app/config/env'; const log = getLogger('auth-client'); @@ -31,3 +32,5 @@ export const getAzureADClient = async () => { throw error; } }; + +export const getIsAzureClientReady = () => isLocal || azureADClient !== null; diff --git a/server/src/config/config.ts b/server/src/config/config.ts index ccb64d5c2..f2696e613 100644 --- a/server/src/config/config.ts +++ b/server/src/config/config.ts @@ -29,11 +29,3 @@ export const PROXY_VERSION = requiredEnvString('VERSION', defaultValue); export const PORT = requiredEnvString('PORT', '8080'); export const NAIS_CLUSTER_NAME = requiredEnvString('NAIS_CLUSTER_NAME', defaultValue); export const START_TIME = Date.now(); - -let isReady = false; - -export const setIsReady = () => { - isReady = true; -}; - -export const getIsReady = () => isReady; diff --git a/server/src/init.ts b/server/src/init.ts index b71fb7b62..a251c4b65 100644 --- a/server/src/init.ts +++ b/server/src/init.ts @@ -1,5 +1,4 @@ import { isDeployed } from '@app/config/env'; -import { setIsReady } from '@app/config/config'; import { getLogger } from '@app/logger'; import { EmojiIcons, sendToSlack } from '@app/slack'; import { getAzureADClient } from '@app/auth/get-auth-client'; @@ -18,7 +17,6 @@ export const init = async () => { const time = getDuration(start); log.info({ msg: `Azure AD client initialized in ${formatDuration(time)}`, data: { time } }); } - setIsReady(); } catch (e) { await resetClientsAndUniqueUsersMetrics(); diff --git a/server/src/plugins/health.ts b/server/src/plugins/health.ts index 6aeb2c9b1..f5577be9e 100644 --- a/server/src/plugins/health.ts +++ b/server/src/plugins/health.ts @@ -1,17 +1,39 @@ -import { getIsReady } from '@app/config/config'; +import { oboCache } from '@app/auth/cache/cache'; +import { getIsAzureClientReady } from '@app/auth/get-auth-client'; +import { getLogger } from '@app/logger'; import fastifyPlugin from 'fastify-plugin'; export const HEALTH_PLUGIN_ID = 'health'; +const log = getLogger('liveness'); + export const healthPlugin = fastifyPlugin( (app, _, pluginDone) => { app.get('/isAlive', (__, reply) => reply.status(200).type('text/plain').send('Alive')); - app.get('/isReady', async (__, reply) => - getIsReady() - ? reply.status(200).type('text/plain').send('Ready') - : reply.status(503).type('text/plain').send('Not Ready'), - ); + app.get('/isReady', async (__, reply) => { + const isAzureClientReady = getIsAzureClientReady(); + + if (!oboCache.isReady && !isAzureClientReady) { + log.info({ msg: 'OBO Cache and Azure Client not ready' }); + + return reply.status(503).type('text/plain').send('OBO Cache and Azure Client not ready'); + } + + if (!oboCache.isReady) { + log.info({ msg: 'OBO Cache not ready' }); + + return reply.status(503).type('text/plain').send('OBO Cache not ready'); + } + + if (!isAzureClientReady) { + log.info({ msg: 'Azure Client not ready' }); + + return reply.status(503).type('text/plain').send('Azure Client not ready'); + } + + return reply.status(200).type('text/plain').send('Ready'); + }); pluginDone(); },