-
Notifications
You must be signed in to change notification settings - Fork 27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Performance analysis of cascade operations #660
Comments
For the environment configuration, I'm using a modified version of the agents index event generator utility ( The groups' generation is set to a max of 500 groups, from which each agent will have assigned 128 different groups # Generate 500 unique group names
unique_groups = [f'group{i}' for i in range(500)]
random.shuffle(unique_groups) And the OS distribution is calculated based on the number of events to generate, in this case we will use 50k events def generate_random_data(number):
data = []
num_windows = int(0.5 * number)
num_macos = int(0.15 * number)
num_linux = number - num_windows - num_macos
... Some other modifications were made to meet these requirements, I'm sharing the complete script below event generator script#!/bin/python3
import datetime
import json
import logging
import random
import requests
import urllib3
# Constants and Configuration
LOG_FILE = 'generate_data.log'
GENERATED_DATA_FILE = 'generatedData.json'
DATE_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
# Default values
INDEX_NAME = "wazuh-agents"
DEFAULT_USERNAME = "admin"
DEFAULT_PASSWORD = "admin"
DEFAULT_IP = "127.0.0.1"
DEFAULT_PORT = "9200"
# Configure logging
logger = logging.getLogger("DataGenerator")
logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler = logging.FileHandler(LOG_FILE)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
# Suppress warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Generate 500 unique group names
unique_groups = [f'group{i}' for i in range(500)]
random.shuffle(unique_groups)
def generate_random_date():
"""Generate a random date within the last 10 days."""
start_date = datetime.datetime.now()
end_date = start_date - datetime.timedelta(days=10)
random_date = start_date + (end_date - start_date) * random.random()
return random_date.strftime(DATE_FORMAT)
def generate_random_groups():
"""Return a list of randomly sampled groups."""
return random.sample(unique_groups, 128)
def generate_random_host(agent_type):
"""Generate a random host configuration."""
os_families = {
'linux': ['debian', 'ubuntu', 'centos', 'redhat'],
'windows': ['windows'],
'macos': ['macos', 'ios']
}
family = random.choice(os_families[agent_type])
version = f'{random.randint(0, 99)}.{random.randint(0, 99)}'
return {
'architecture': random.choice(['x86_64', 'arm64']),
'boot': {'id': f'boot{random.randint(0, 9999)}'},
'cpu': {'usage': random.uniform(0, 100)},
'disk': {
'read': {'bytes': random.randint(0, 1_000_000)},
'write': {'bytes': random.randint(0, 1_000_000)}
},
'domain': f'domain{random.randint(0, 999)}',
'geo': {
'city_name': random.choice(['San Francisco', 'New York', 'Berlin', 'Tokyo']),
'continent_code': random.choice(['NA', 'EU', 'AS']),
'continent_name': random.choice(['North America', 'Europe', 'Asia']),
'country_iso_code': random.choice(['US', 'DE', 'JP']),
'country_name': random.choice(['United States', 'Germany', 'Japan']),
'location': {
'lat': round(random.uniform(-90.0, 90.0), 6),
'lon': round(random.uniform(-180.0, 180.0), 6)
},
'postal_code': f'{random.randint(10000, 99999)}',
'region_name': f'Region {random.randint(0, 999)}',
'timezone': random.choice(['PST', 'EST', 'CET', 'JST'])
},
'hostname': f'host{random.randint(0, 9999)}',
'id': f'hostid{random.randint(0, 9999)}',
'ip': ".".join(str(random.randint(1, 255)) for _ in range(4)),
'mac': ":".join(f'{random.randint(0, 255):02x}' for _ in range(6)),
'os': {
'family': family,
'full': f'{family} {version}',
'kernel': f'kernel{random.randint(0, 999)}',
'name': family,
'platform': agent_type,
'type': agent_type,
'version': version
},
'uptime': random.randint(0, 1_000_000)
}
def generate_random_agent(agent_type):
"""Generate a random agent configuration."""
agent_id = random.randint(0, 99999)
return {
'id': f'agent{agent_id}',
'name': f'Agent{agent_id}',
'type': agent_type,
'version': f'v{random.randint(0, 9)}-stable',
'status': random.choice(['active', 'inactive']),
'last_login': generate_random_date(),
'groups': generate_random_groups(),
'key': f'key{agent_id}',
'host': generate_random_host(agent_type)
}
def generate_random_data(number):
"""Generate a list of random agent events."""
data = []
num_windows = int(0.5 * number)
num_macos = int(0.15 * number)
num_linux = number - num_windows - num_macos
for _ in range(num_windows):
data.append({'agent': generate_random_agent('windows')})
for _ in range(num_macos):
data.append({'agent': generate_random_agent('macos')})
for _ in range(num_linux):
data.append({'agent': generate_random_agent('linux')})
return data
def inject_events(cluster_url, username, password, data):
"""Send generated data to the indexer."""
url = f'{cluster_url}/{INDEX_NAME}/_doc'
session = requests.Session()
session.auth = (username, password)
session.verify = False
headers = {'Content-Type': 'application/json'}
try:
for event_data in data:
response = session.post(url, json=event_data, headers=headers)
if response.status_code != 201:
logger.error(f'Failed to inject event. Status Code: {response.status_code}')
logger.error(response.text)
break
logger.info('Data injection completed successfully.')
except requests.RequestException as e:
logger.error(f'Error during data injection: {e}')
def save_generated_data(data):
"""Save generated data to a file."""
try:
with open(GENERATED_DATA_FILE, 'w') as outfile:
json.dump(data, outfile, indent=2)
logger.info("Generated data saved successfully.")
except IOError as e:
logger.error(f"Error saving data to file: {e}")
def get_user_input(prompt, default):
"""Get user input with a default fallback."""
return input(f"{prompt} (default: '{default}'): ") or default
def main():
"""Main function to generate and inject data."""
try:
number = int(input("How many events do you want to generate? "))
except ValueError:
logger.error("Invalid input. Please enter a valid number.")
return
ip = get_user_input("Enter the IP of your Indexer", DEFAULT_IP)
port = get_user_input("Enter the port of your Indexer", DEFAULT_PORT)
username = get_user_input("Username", DEFAULT_USERNAME)
password = get_user_input("Password", DEFAULT_PASSWORD)
logger.info(f"Generating {number} events...")
cluster_url = f"http://{ip}:{port}"
data = generate_random_data(number)
save_generated_data(data)
inject_events(cluster_url, username, password, data)
if __name__ == "__main__":
main()
The documents are generated with a modified version of the Script to use for the events generationimport datetime
import json
import uuid
import requests
import logging
# Configuration
INDEX_AGENTS = "wazuh-agents"
INDEX_DOCUMENT = "wazuh-states-inventory-packages/_bulk"
DEFAULT_USERNAME = "admin"
DEFAULT_PASSWORD = "admin"
DEFAULT_IP = "127.0.0.1"
DEFAULT_PORT = "9200"
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.FileHandler(
"agent_documents.log"), logging.StreamHandler()]
)
def get_agents_ids(cluster_url, user, password) -> list:
"""Fetch agent IDs from the Wazuh index."""
agents_url = f"{cluster_url}/{INDEX_AGENTS}/_search"
try:
response = requests.get(agents_url, auth=(user, password))
response.raise_for_status()
agents_data = response.json()
agents = [hit['_source']['agent'] for hit in agents_data.get('hits', {}).get('hits', [])]
logging.info(f"Retrieved {len(agents)} agents.")
return agents
except requests.exceptions.RequestException as e:
logging.error(f"Error fetching agents: {e}")
return []
def prepare_bulk_payload(documents):
payload = ""
for doc in documents:
metadata = {"index": {"_index": INDEX_DOCUMENT, "_id": doc.get('agent', {}).get('id')}}
payload += json.dumps(metadata) + "\n"
payload += json.dumps(doc) + "\n"
return payload
def send_documents(cluster_url, user, password, agents, num_documents):
"""Send restart documents to a list of agent IDs."""
documents_url = f"{cluster_url}/{INDEX_DOCUMENT}"
print(agents)
documents = [
{
"agent": {
"id": agent.get('id'),
"name": agent.get('name'),
"groups": agent.get('groups'),
"type": agent.get('type'),
"version": agent.get('version'),
"host": {
"architecture": agent.get('host', {}).get('architecture'),
"hostname": agent.get('host', {}).get('hostname'),
"ip": agent.get('host', {}).get('ip'),
"os": {
"name": agent.get('host', {}).get('os', {}).get('name'),
"type": agent.get('host', {}).get('os', {}).get('type'),
"version": agent.get('host', {}).get('os', {}).get('version')
}
},
},
"@timestamp": datetime.datetime.now().isoformat(),
"package": {
"architecture": agent.get('host', {}).get('architecture'),
"description": "tmux is a \"terminal multiplexer.\" It enables a number of terminals (or \
windows) to be accessed and controlled from a single terminal. tmux is \
intended to be a simple, modern, BSD-licensed alternative to programs such \
as GNU Screen.",
"installed": "1738151465",
"name": "tmux",
"path": " ",
"size": 1166902,
"type": "rpm",
"version": "3.2a-5.el9"
}
}
for agent in agents
for _ in range(num_documents)
]
if not documents:
logging.warning("No documents generated to send.")
return
headers = {'Content-Type': 'application/json'}
try:
response = requests.post(documents_url,
data=prepare_bulk_payload(documents),
headers=headers,
auth=(user, password))
response.raise_for_status()
logging.info(f"Successfully sent {len(documents)} documents.")
except requests.exceptions.RequestException as e:
logging.error(f"Error sending documents: {e}")
logging.error("response: %s", response.text)
return documents
def main():
"""Main function to retrieve agents and send documents."""
ip = input(f"Enter the IP of your Indexer (default: '{DEFAULT_IP}'): ") or DEFAULT_IP
port = input(f"Enter the port of your Indexer (default: '{DEFAULT_PORT}'): ") or DEFAULT_PORT
username = input(f"Enter username (default: '{DEFAULT_USERNAME}'): ") or DEFAULT_USERNAME
password = input(f"Enter password (default: '{DEFAULT_PASSWORD}'): ") or DEFAULT_PASSWORD
cluster_url = f"http://{ip}:{port}"
try:
num_documents = int(
input("Enter the number of documents to generate for each agent: "))
except ValueError:
logging.error("Invalid input. Please enter a valid number.")
return
agent_ids = get_agents_ids(cluster_url, username, password)
if agent_ids:
send_documents(cluster_url, username, password, agent_ids, num_documents)
else:
logging.warning("No agents found to send documents to.")
if __name__ == "__main__":
main() |
Currently facing some issues while trying to run the
|
@QU3B1M the request needs a body
|
Due to the large amount of documents required, I've tried with different approaches to generate that amount of data without having the system crashing, the currently used approach is to index the documents in batches, anyway it takes some extra time. The generation of 300.000 documents for each agent ( So, the number of generated events is reduced, and for the generation we are using this code that let us generate events in parallel. |
Requested cloud environment to perform the tests, currently working on the performance tests execution. |
Test results on cloud environmentFirst execution with 3,000 documents per agentIndexer documents setup
Status of the % curl "https://172.31.40.38:9200/_cat/indices/wazuh-states-inventory-packages/" -u admin:admin -k
green open wazuh-states-inventory-packages 94segLMyQRKBRs521pSW9Q 1 0 150000000 0 71.8gb 71.8gb Documents updateEach group was updated with a variation of this following request % curl -X POST "https://<CLUSTER_IP>:9200/wazuh-states-inventory-packages/_update_by_query?conflicts=proceed&slices=auto&wait_for_active_shards=all" -u admin:admin -k -H "Content-Type: application/json" -d '{
"profile": true,
"timeout": "60m",
"query": {
"match": {
"agent.groups": "XXX"
}
},
"script": {
"source": "ctx._source.agent.groups = params.newValue",
"lang": "painless",
"params": {
"newValue": "new_XXX"
}
}
}'
Windows documents update
Linux documents update
macOS documents update
Cluster status during the The size of the
Important The documents amount were reduced more than 10x since the last execution to avoid reproducing the previously experienced system crashes. Environmentx3 Wazuh Indexer nodes 5.0 running on AWS EC2 systems with the same specs:
Indexer documents setup
Nodes % curl -k -u admin:admin https://172.31.37.89:9200/_cat/nodes?v
ip heap.percent ram.percent cpu load_1m load_5m load_15m node.role node.roles cluster_manager name
172.31.40.38 37 75 0 0.00 0.00 0.00 dimr cluster_manager,data,ingest,remote_cluster_client - node-3
172.31.37.89 21 75 0 0.00 0.02 0.01 dimr cluster_manager,data,ingest,remote_cluster_client * node-2
172.31.43.69 52 73 21 0.30 1.17 1.77 dimr cluster_manager,data,ingest,remote_cluster_client - node-1 Packages index before % curl -k -u admin:admin https://172.31.37.89:9200/_cat/indices/wazuh-states-inventory-packages/
green open wazuh-states-inventory-packages bM3m6eeGRJmXhiND6H2F4w 1 0 10000000 0 5.3gb 5.3gb Documents updateFor the documents' groups update, we used the {
"profile": true,
"query": {
"match": {
"agent.groups": "[windows|linux|macos]"
}
},
"script": {
"source": "ctx._source.agent.groups = params.newValue",
"lang": "painless",
"params": {
"newValue": "[new_group]"
}
}
} Results:
Note All the queries were executed using the parameters |
We consider this issue complete as the performance analysis has concluded. The poor performance results lead us to explore other alternatives, such as Index Transforms. See #694. |
Description
As a preliminary step towards migrating Wazuh's RBAC from the Server to the Indexer, we need to be aware about the performance of the Indexer on cascade operations involving the change of agents' groups.
A single agent can generate hundreds to thousands of events that end up on indexes. These documents (events) are tied to a single agent, comprising a one-to-one relationship, meaning that a document in an index can only belong to an agent. In order to depict this relationship in the indices, every document contains the
agent.id
as a primary key that allows these entities to be correlated. Every document also has the fieldagent.groups
to:The main drawback of this design is that when any agent changes its groups, all the data belonging to that agent until that moment needs to be updated with the new groups of the agent.
To better understand the problem, let's imagine an environment with 50K agents and 20K documents per agent.
Update:
Over a month, such an environment would have 1K million documents. On a hypothetical, but possible, update of every agent's group, the Indexer would need to perform 1K million update operations as a result.
Environment details
Plan
The text was updated successfully, but these errors were encountered: