Skip to content

Commit

Permalink
[9.0] [Security Solution][Siem migrations] Implement rate limit backo…
Browse files Browse the repository at this point in the history
…ff (#211469) (#212177)

# Backport

This will backport the following commits from `main` to `9.0`:
- [[Security Solution][Siem migrations] Implement rate limit backoff
(#211469)](#211469)

<!--- Backport version: 9.6.6 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sorenlouv/backport)

<!--BACKPORT [{"author":{"name":"Sergi
Massaneda","email":"sergi.massaneda@elastic.co"},"sourceCommit":{"committedDate":"2025-02-21T19:54:40Z","message":"[Security
Solution][Siem migrations] Implement rate limit backoff (#211469)\n\n##
Summary\n\nImplements an exponential backoff retry strategy when the LLM
API throws\nrate limit (`429`) errors.\n\n### Backoff
implementation\n\n- The `run` method from the `RuleMigrationsTaskClient`
has been moved to\nthe new `RuleMigrationTaskRunner` class.\n- The
settings for the backoff are defined in this class with:\n```ts\n/**
Exponential backoff configuration to handle rate limit errors */\nconst
RETRY_CONFIG = {\n initialRetryDelaySeconds: 1,\n backoffMultiplier:
2,\n maxRetries: 8,\n // max waiting time 4m15s (1*2^8 = 256s)\n} as
const;\n```\n- Only one rule will be retried at a time, the rest of the
concurrent\nrule translations blocked by the rate limit will await for
the API to\nrecover before attempting the translation
again.\n\n```ts\n/** Executor sleep configuration\n * A sleep time
applied at the beginning of each single rule translation in the
execution pool,\n * The objective of this sleep is to spread the load of
concurrent translations, and prevent hitting the rate limit
repeatedly.\n * The sleep time applied is a random number between
[0-value]. Every time we hit rate limit the value is increased by the
multiplier, up to the limit.\n */\nconst EXECUTOR_SLEEP = {\n
initialValueSeconds: 3,\n multiplier: 2,\n limitSeconds: 96, // 1m36s (5
increases)\n} as const;\n```\n\n### Migration batching
changes\n\n```ts\n/** Number of concurrent rule translations in the pool
*/\nconst TASK_CONCURRENCY = 10 as const;\n/** Number of rules loaded in
memory to be translated in the pool */\nconst TASK_BATCH_SIZE = 100 as
const;\n```\n\n#### Before \n\n- Batches of 15 rules were retrieved and
executed in a `Promise.all`,\nrequiring all of them to be completed
before proceeding to the next\nbatch.\n- A \"batch sleep\" of 10s was
executed at the end of each iteration.\n\n#### In this PR\n\n- Batches
of 100 rules are retrieved and kept in memory. The execution\nis
performed in a task pool with a concurrency of 10 rules. This
ensures\nthere are always 10 rules executing at a time.\n- The \"batch
sleep\" has been removed in favour of an \"execution sleep\"\nof
rand[1-3]s at the start of each single rule migration. This\nindividual
sleep serves two goals:\n - Spread the load when the migration is first
launched.\n- Prevent hitting the rate limit consistently: The sleep
duration is\nincreased every time we hit a rate
limit.\n\n---------\n\nCo-authored-by: kibanamachine
<42973632+kibanamachine@users.noreply.github.com>","sha":"64426b2b4d99901a01ecef66a17db01049b05f1a","branchLabelMapping":{"^v9.1.0$":"main","^v8.19.0$":"8.x","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["release_note:skip","v9.0.0","Team:Threat
Hunting","backport:version","v8.18.0","v9.1.0","v8.19.0"],"title":"[Security
Solution][Siem migrations] Implement rate limit
backoff","number":211469,"url":"https://github.com/elastic/kibana/pull/211469","mergeCommit":{"message":"[Security
Solution][Siem migrations] Implement rate limit backoff (#211469)\n\n##
Summary\n\nImplements an exponential backoff retry strategy when the LLM
API throws\nrate limit (`429`) errors.\n\n### Backoff
implementation\n\n- The `run` method from the `RuleMigrationsTaskClient`
has been moved to\nthe new `RuleMigrationTaskRunner` class.\n- The
settings for the backoff are defined in this class with:\n```ts\n/**
Exponential backoff configuration to handle rate limit errors */\nconst
RETRY_CONFIG = {\n initialRetryDelaySeconds: 1,\n backoffMultiplier:
2,\n maxRetries: 8,\n // max waiting time 4m15s (1*2^8 = 256s)\n} as
const;\n```\n- Only one rule will be retried at a time, the rest of the
concurrent\nrule translations blocked by the rate limit will await for
the API to\nrecover before attempting the translation
again.\n\n```ts\n/** Executor sleep configuration\n * A sleep time
applied at the beginning of each single rule translation in the
execution pool,\n * The objective of this sleep is to spread the load of
concurrent translations, and prevent hitting the rate limit
repeatedly.\n * The sleep time applied is a random number between
[0-value]. Every time we hit rate limit the value is increased by the
multiplier, up to the limit.\n */\nconst EXECUTOR_SLEEP = {\n
initialValueSeconds: 3,\n multiplier: 2,\n limitSeconds: 96, // 1m36s (5
increases)\n} as const;\n```\n\n### Migration batching
changes\n\n```ts\n/** Number of concurrent rule translations in the pool
*/\nconst TASK_CONCURRENCY = 10 as const;\n/** Number of rules loaded in
memory to be translated in the pool */\nconst TASK_BATCH_SIZE = 100 as
const;\n```\n\n#### Before \n\n- Batches of 15 rules were retrieved and
executed in a `Promise.all`,\nrequiring all of them to be completed
before proceeding to the next\nbatch.\n- A \"batch sleep\" of 10s was
executed at the end of each iteration.\n\n#### In this PR\n\n- Batches
of 100 rules are retrieved and kept in memory. The execution\nis
performed in a task pool with a concurrency of 10 rules. This
ensures\nthere are always 10 rules executing at a time.\n- The \"batch
sleep\" has been removed in favour of an \"execution sleep\"\nof
rand[1-3]s at the start of each single rule migration. This\nindividual
sleep serves two goals:\n - Spread the load when the migration is first
launched.\n- Prevent hitting the rate limit consistently: The sleep
duration is\nincreased every time we hit a rate
limit.\n\n---------\n\nCo-authored-by: kibanamachine
<42973632+kibanamachine@users.noreply.github.com>","sha":"64426b2b4d99901a01ecef66a17db01049b05f1a"}},"sourceBranch":"main","suggestedTargetBranches":["9.0","8.x"],"targetPullRequestStates":[{"branch":"9.0","label":"v9.0.0","branchLabelMappingKey":"^v(\\d+).(\\d+).\\d+$","isSourceBranch":false,"state":"NOT_CREATED"},{"branch":"8.18","label":"v8.18.0","branchLabelMappingKey":"^v(\\d+).(\\d+).\\d+$","isSourceBranch":false,"url":"https://github.com/elastic/kibana/pull/212154","number":212154,"state":"MERGED","mergeCommit":{"sha":"4bf719063c6015b1a68703cbcafb56a281a4b491","message":"[8.18]
[Security Solution][Siem migrations] Implement rate limit backoff
(#211469) (#212154)\n\n# Backport\n\nThis will backport the following
commits from `main` to `8.18`:\n- [[Security Solution][Siem migrations]
Implement rate limit
backoff\n(#211469)](https://github.com/elastic/kibana/pull/211469)\n\n\n\n###
Questions ?\nPlease refer to the [Backport
tool\ndocumentation](https://github.com/sorenlouv/backport)\n\n\n\nCo-authored-by:
Sergi Massaneda
<sergi.massaneda@elastic.co>"}},{"branch":"main","label":"v9.1.0","branchLabelMappingKey":"^v9.1.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/211469","number":211469,"mergeCommit":{"message":"[Security
Solution][Siem migrations] Implement rate limit backoff (#211469)\n\n##
Summary\n\nImplements an exponential backoff retry strategy when the LLM
API throws\nrate limit (`429`) errors.\n\n### Backoff
implementation\n\n- The `run` method from the `RuleMigrationsTaskClient`
has been moved to\nthe new `RuleMigrationTaskRunner` class.\n- The
settings for the backoff are defined in this class with:\n```ts\n/**
Exponential backoff configuration to handle rate limit errors */\nconst
RETRY_CONFIG = {\n initialRetryDelaySeconds: 1,\n backoffMultiplier:
2,\n maxRetries: 8,\n // max waiting time 4m15s (1*2^8 = 256s)\n} as
const;\n```\n- Only one rule will be retried at a time, the rest of the
concurrent\nrule translations blocked by the rate limit will await for
the API to\nrecover before attempting the translation
again.\n\n```ts\n/** Executor sleep configuration\n * A sleep time
applied at the beginning of each single rule translation in the
execution pool,\n * The objective of this sleep is to spread the load of
concurrent translations, and prevent hitting the rate limit
repeatedly.\n * The sleep time applied is a random number between
[0-value]. Every time we hit rate limit the value is increased by the
multiplier, up to the limit.\n */\nconst EXECUTOR_SLEEP = {\n
initialValueSeconds: 3,\n multiplier: 2,\n limitSeconds: 96, // 1m36s (5
increases)\n} as const;\n```\n\n### Migration batching
changes\n\n```ts\n/** Number of concurrent rule translations in the pool
*/\nconst TASK_CONCURRENCY = 10 as const;\n/** Number of rules loaded in
memory to be translated in the pool */\nconst TASK_BATCH_SIZE = 100 as
const;\n```\n\n#### Before \n\n- Batches of 15 rules were retrieved and
executed in a `Promise.all`,\nrequiring all of them to be completed
before proceeding to the next\nbatch.\n- A \"batch sleep\" of 10s was
executed at the end of each iteration.\n\n#### In this PR\n\n- Batches
of 100 rules are retrieved and kept in memory. The execution\nis
performed in a task pool with a concurrency of 10 rules. This
ensures\nthere are always 10 rules executing at a time.\n- The \"batch
sleep\" has been removed in favour of an \"execution sleep\"\nof
rand[1-3]s at the start of each single rule migration. This\nindividual
sleep serves two goals:\n - Spread the load when the migration is first
launched.\n- Prevent hitting the rate limit consistently: The sleep
duration is\nincreased every time we hit a rate
limit.\n\n---------\n\nCo-authored-by: kibanamachine
<42973632+kibanamachine@users.noreply.github.com>","sha":"64426b2b4d99901a01ecef66a17db01049b05f1a"}},{"branch":"8.x","label":"v8.19.0","branchLabelMappingKey":"^v8.19.0$","isSourceBranch":false,"state":"NOT_CREATED"}]}]
BACKPORT-->

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
  • Loading branch information
semd and elasticmachine authored Feb 24, 2025
1 parent bedc0d2 commit 6084d34
Show file tree
Hide file tree
Showing 13 changed files with 921 additions and 329 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@
* 2.0.
*/

import { mockRuleMigrationsDataClient } from '../data/__mocks__/mocks';
import { mockRuleMigrationsTaskClient } from '../task/__mocks__/mocks';
import { createRuleMigrationsDataClientMock } from '../data/__mocks__/mocks';
import { createRuleMigrationsTaskClientMock } from '../task/__mocks__/mocks';

export const createRuleMigrationDataClient = jest
.fn()
.mockImplementation(() => mockRuleMigrationsDataClient);
.mockImplementation(() => createRuleMigrationsDataClientMock());

export const createRuleMigrationTaskClient = jest
.fn()
.mockImplementation(() => mockRuleMigrationsTaskClient);
.mockImplementation(() => createRuleMigrationsTaskClientMock());

export const createRuleMigrationClient = () => ({
data: createRuleMigrationDataClient(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,28 @@
* 2.0.
*/

import type { RuleMigrationsDataIntegrationsClient } from '../rule_migrations_data_integrations_client';
import type { RuleMigrationsDataLookupsClient } from '../rule_migrations_data_lookups_client';
import type { RuleMigrationsDataPrebuiltRulesClient } from '../rule_migrations_data_prebuilt_rules_client';
import type { RuleMigrationsDataResourcesClient } from '../rule_migrations_data_resources_client';
import type { RuleMigrationsDataRulesClient } from '../rule_migrations_data_rules_client';

// Rule migrations data rules client
export const mockRuleMigrationsDataRulesClient = {
create: jest.fn().mockResolvedValue(undefined),
get: jest.fn().mockResolvedValue([]),
get: jest.fn().mockResolvedValue({ data: [], total: 0 }),
searchBatches: jest.fn().mockReturnValue({
next: jest.fn().mockResolvedValue([]),
all: jest.fn().mockResolvedValue([]),
}),
takePending: jest.fn().mockResolvedValue([]),
saveProcessing: jest.fn().mockResolvedValue(undefined),
saveCompleted: jest.fn().mockResolvedValue(undefined),
saveError: jest.fn().mockResolvedValue(undefined),
releaseProcessing: jest.fn().mockResolvedValue(undefined),
updateStatus: jest.fn().mockResolvedValue(undefined),
getStats: jest.fn().mockResolvedValue(undefined),
getAllStats: jest.fn().mockResolvedValue([]),
} as unknown as RuleMigrationsDataRulesClient;
} as unknown as jest.Mocked<RuleMigrationsDataRulesClient>;
export const MockRuleMigrationsDataRulesClient = jest
.fn()
.mockImplementation(() => mockRuleMigrationsDataRulesClient);
Expand All @@ -35,30 +39,42 @@ export const mockRuleMigrationsDataResourcesClient = {
next: jest.fn().mockResolvedValue([]),
all: jest.fn().mockResolvedValue([]),
}),
};
} as unknown as jest.Mocked<RuleMigrationsDataResourcesClient>;
export const MockRuleMigrationsDataResourcesClient = jest
.fn()
.mockImplementation(() => mockRuleMigrationsDataResourcesClient);

export const mockRuleMigrationsDataIntegrationsClient = {
populate: jest.fn().mockResolvedValue(undefined),
retrieveIntegrations: jest.fn().mockResolvedValue([]),
};
} as unknown as jest.Mocked<RuleMigrationsDataIntegrationsClient>;

export const mockRuleMigrationsDataPrebuiltRulesClient = {
populate: jest.fn().mockResolvedValue(undefined),
search: jest.fn().mockResolvedValue([]),
} as unknown as jest.Mocked<RuleMigrationsDataPrebuiltRulesClient>;
export const mockRuleMigrationsDataLookupsClient = {
create: jest.fn().mockResolvedValue(undefined),
indexData: jest.fn().mockResolvedValue(undefined),
} as unknown as jest.Mocked<RuleMigrationsDataLookupsClient>;

// Rule migrations data client
export const mockRuleMigrationsDataClient = {
export const createRuleMigrationsDataClientMock = () => ({
rules: mockRuleMigrationsDataRulesClient,
resources: mockRuleMigrationsDataResourcesClient,
integrations: mockRuleMigrationsDataIntegrationsClient,
};
prebuiltRules: mockRuleMigrationsDataPrebuiltRulesClient,
lookups: mockRuleMigrationsDataLookupsClient,
});

export const MockRuleMigrationsDataClient = jest
.fn()
.mockImplementation(() => mockRuleMigrationsDataClient);
.mockImplementation(() => createRuleMigrationsDataClientMock());

// Rule migrations data service
export const mockIndexName = 'mocked_siem_rule_migrations_index_name';
export const mockInstall = jest.fn().mockResolvedValue(undefined);
export const mockCreateClient = jest.fn().mockReturnValue(mockRuleMigrationsDataClient);
export const mockCreateClient = jest.fn(() => createRuleMigrationsDataClientMock());

export const MockRuleMigrationsDataService = jest.fn().mockImplementation(() => ({
createAdapter: jest.fn(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,45 +151,19 @@ export class RuleMigrationsDataRulesClient extends RuleMigrationsDataBaseClient
}
}

/**
* Retrieves `pending` rule migrations with the provided id and updates their status to `processing`.
* This operation is not atomic at migration level:
* - Multiple tasks can process different migrations simultaneously.
* - Multiple tasks should not process the same migration simultaneously.
*/
async takePending(migrationId: string, size: number): Promise<StoredRuleMigration[]> {
/** Updates one rule migration status to `processing` */
async saveProcessing(id: string): Promise<void> {
const index = await this.getIndexName();
const profileId = await this.getProfileUid();
const query = this.getFilterQuery(migrationId, { status: SiemMigrationStatus.PENDING });

const storedRuleMigrations = await this.esClient
.search<RuleMigration>({ index, query, sort: '_doc', size })
.then((response) =>
this.processResponseHits(response, { status: SiemMigrationStatus.PROCESSING })
)
.catch((error) => {
this.logger.error(`Error searching rule migrations: ${error.message}`);
throw error;
});

await this.esClient
.bulk({
refresh: 'wait_for',
operations: storedRuleMigrations.flatMap(({ id, status }) => [
{ update: { _id: id, _index: index } },
{
doc: { status, updated_by: profileId, updated_at: new Date().toISOString() },
},
]),
})
.catch((error) => {
this.logger.error(
`Error updating for rule migrations status to processing: ${error.message}`
);
throw error;
});

return storedRuleMigrations;
const doc = {
status: SiemMigrationStatus.PROCESSING,
updated_by: profileId,
updated_at: new Date().toISOString(),
};
await this.esClient.update({ index, id, doc, refresh: 'wait_for' }).catch((error) => {
this.logger.error(`Error updating rule migration status to processing: ${error.message}`);
throw error;
});
}

/** Updates one rule migration with the provided data and sets the status to `completed` */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* 2.0.
*/

export const mockRuleMigrationsTaskClient = {
export const createRuleMigrationsTaskClientMock = () => ({
start: jest.fn().mockResolvedValue({ started: true }),
stop: jest.fn().mockResolvedValue({ stopped: true }),
getStats: jest.fn().mockResolvedValue({
Expand All @@ -19,15 +19,15 @@ export const mockRuleMigrationsTaskClient = {
},
}),
getAllStats: jest.fn().mockResolvedValue([]),
};
});

export const MockRuleMigrationsTaskClient = jest
.fn()
.mockImplementation(() => mockRuleMigrationsTaskClient);
.mockImplementation(() => createRuleMigrationsTaskClientMock());

// Rule migrations task service
export const mockStopAll = jest.fn();
export const mockCreateClient = jest.fn().mockReturnValue(mockRuleMigrationsTaskClient);
export const mockCreateClient = jest.fn(() => createRuleMigrationsTaskClientMock());

export const MockRuleMigrationsTaskService = jest.fn().mockImplementation(() => ({
createClient: mockCreateClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ export interface RuleMigrationsRetrieverClients {
savedObjects: SavedObjectsClientContract;
}

/**
* RuleMigrationsRetriever is a class that is responsible for retrieving all the necessary data during the rule migration process.
* It is composed of multiple retrievers that are responsible for retrieving specific types of data.
* Such as rule integrations, prebuilt rules, and rule resources.
*/
export class RuleMigrationsRetriever {
public readonly resources: RuleResourceRetriever;
public readonly integrations: IntegrationRetriever;
Expand Down
Loading

0 comments on commit 6084d34

Please sign in to comment.