Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(anta): Limit concurrency #680

Open
wants to merge 53 commits into
base: main
Choose a base branch
from

Conversation

carl-baillargeon
Copy link
Contributor

@carl-baillargeon carl-baillargeon commented May 16, 2024

Description

This PR improves the test runner by introducing a generator-based approach for managing test coroutines and setting a configurable limit on the number of concurrent tests.

  • Instead of loading all test coroutines into a list, the runner now uses a generator to yield tests. This approach prevents memory overload and improves performance when dealing with a large number of tests.

  • A limit on the number of concurrent tests is introduced to avoid overwhelming the runner. This limit is configurable with an environement variable (hidden).

Implementation:
The generator yields test coroutines, ensuring that only a limited number of tests are scheduled and run concurrently.
Upon reaching the concurrency limit, the runner waits for some tests to complete before scheduling new ones from the generator.

Fixes: #713, #832

Checklist:

  • Update FAQ to reference Scaling ANTA documentation
  • Update Scaling ANTA documentation
  • Add section on JSON catalogs vs YAML (faster)
  • Need to retest on digital twin @dlobato
  • Support None timeouts
  • Confirm if PoolTimeout default should be set to None
  • Update benchmark + unit tests
  • ci: add codspeed to benchmark ANTA #826
  • Confirm that the number of open file descriptors never exceed device * max_connections (100 by default)

Copy link

Quality Gate Passed Quality Gate passed

Issues
0 New issues
0 Accepted issues

Measures
0 Security Hotspots
No data about Coverage
0.0% Duplication on New Code

See analysis details on SonarCloud

Copy link
Contributor

github-actions bot commented Jul 4, 2024

This pull request has conflicts, please resolve those before we can evaluate the pull request.

anta/runner.py Outdated Show resolved Hide resolved
anta/runner.py Outdated Show resolved Hide resolved
Copy link
Contributor

Conflicts have been resolved. A maintainer will review the pull request shortly.

Copy link

Copy link
Contributor

This pull request has conflicts, please resolve those before we can evaluate the pull request.

@mtache mtache changed the title refactor(anta): Refactor runner to use a generator with a limit feat(anta): Limit concurrency Oct 2, 2024
Copy link
Contributor

github-actions bot commented Oct 2, 2024

Conflicts have been resolved. A maintainer will review the pull request shortly.

Copy link

codspeed-hq bot commented Oct 2, 2024

CodSpeed Performance Report

Merging #680 will not alter performance

Comparing carl-baillargeon:refactor/runner_limit (17b244a) with main (bdc0e68)

Summary

✅ 22 untouched benchmarks
🆕 2 new benchmarks

Benchmarks breakdown

Benchmark BASE HEAD Change
🆕 test_setup_tests[1-device] N/A 15.7 ms N/A
🆕 test_setup_tests[2-devices] N/A 15.8 ms N/A

Copy link
Contributor

This pull request has conflicts, please resolve those before we can evaluate the pull request.

Copy link
Contributor

github-actions bot commented Nov 4, 2024

Conflicts have been resolved. A maintainer will review the pull request shortly.

anta/runner.py Outdated Show resolved Hide resolved
anta/settings.py Outdated Show resolved Hide resolved
anta/cli/utils.py Outdated Show resolved Hide resolved
- Developing ANTA tests: advanced_usages/custom-tests.md
- ANTA as a Python Library: advanced_usages/as-python-lib.md
- Caching in ANTA: advanced_usages/caching.md
- Scaling ANTA: advanced_usages/scaling.md
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a Settings section that render the BaseSettings models in anta.settings?


On POSIX systems, this value is used to set the soft limit for the current process.
The value cannot exceed the system's hard limit.
"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing ANTA_NOFILE attribute in the doctsring.



class MaxConcurrencySettings(BaseSettings):
"""Environment variable for configuring the maximum number of concurrent tests in the event loop."""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing ANTA_MAX_CONCURRENCY attribute in the docstring

class HttpxResourceLimitsSettings(BaseSettings):
"""Environment variables for configuring the underlying HTTPX client resource limits.

The limits are set using the following environment variables:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use attribute section to have a table in mkdocs?

Comment on lines +455 to +483
if self.max_concurrency <= 0:
msg = "Concurrency limit must be greater than 0."
raise RuntimeError(msg)

# NOTE: The `aiter` built-in function is not available in Python 3.9
tests = generator.__aiter__() # pylint: disable=unnecessary-dunder-call
tests_ended = False
tests_pending: set[Task[TestResult]] = set()

while tests_pending or not tests_ended:
# Add tests to the pending set until the limit is reached or no more tests are available
while len(tests_pending) < self.max_concurrency and not tests_ended:
try:
# NOTE: The `anext` built-in function is not available in Python 3.9
test = await tests.__anext__() # pylint: disable=unnecessary-dunder-call
except StopAsyncIteration: # noqa: PERF203
tests_ended = True
logger.debug("All tests have been added to the pending set.")
else:
# Ensure the coroutine is scheduled to run and add it to the pending set
tests_pending.add(asyncio.create_task(test))
logger.debug("Added a test to the pending set: %s", test)

if len(tests_pending) >= self.max_concurrency:
logger.debug("Concurrency limit reached: %s tests running. Waiting for tests to complete.", self.max_concurrency)

if not tests_pending:
logger.debug("No pending tests and all tests have been processed. Exiting.")
return
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if self.max_concurrency <= 0:
msg = "Concurrency limit must be greater than 0."
raise RuntimeError(msg)
# NOTE: The `aiter` built-in function is not available in Python 3.9
tests = generator.__aiter__() # pylint: disable=unnecessary-dunder-call
tests_ended = False
tests_pending: set[Task[TestResult]] = set()
while tests_pending or not tests_ended:
# Add tests to the pending set until the limit is reached or no more tests are available
while len(tests_pending) < self.max_concurrency and not tests_ended:
try:
# NOTE: The `anext` built-in function is not available in Python 3.9
test = await tests.__anext__() # pylint: disable=unnecessary-dunder-call
except StopAsyncIteration: # noqa: PERF203
tests_ended = True
logger.debug("All tests have been added to the pending set.")
else:
# Ensure the coroutine is scheduled to run and add it to the pending set
tests_pending.add(asyncio.create_task(test))
logger.debug("Added a test to the pending set: %s", test)
if len(tests_pending) >= self.max_concurrency:
logger.debug("Concurrency limit reached: %s tests running. Waiting for tests to complete.", self.max_concurrency)
if not tests_pending:
logger.debug("No pending tests and all tests have been processed. Exiting.")
return
if self.max_concurrency <= 0:
msg = "Concurrency limit must be greater than 0."
raise RuntimeError(msg)
tests_ended = False
tests_pending: set[Task[TestResult]] = set()
while tests_pending or not tests_ended:
async for test in generator:
# Add tests to the pending set until the limit is reached or no more tests are available
if len(tests_pending) < self.max_concurrency:
# Ensure the coroutine is scheduled to run and add it to the pending set
tests_pending.add(asyncio.create_task(test))
logger.debug("Added a test to the pending set: %s", test)
else:
logger.debug("Concurrency limit reached: %s tests running. Waiting for tests to complete.", self.max_concurrency)
break
if len(tests_pending) < self.max_concurrency and not tests_ended:
tests_ended = True
logger.debug("All tests have been added to the pending set.")
if not tests_pending:
logger.debug("No pending tests and all tests have been processed. Exiting.")
return

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks equivalent right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't troubleshoot much but coroutines are not awaited properly with this code:

RuntimeWarning: Enable tracemalloc to get the object allocation traceback
/home/cbaillar/git_projects/anta/anta/_runner.py:449: RuntimeWarning: coroutine 'VerifySSHStatus.test' was never awaited
  async for test in generator:
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
/home/cbaillar/git_projects/anta/anta/_runner.py:449: RuntimeWarning: coroutine 'VerifyRoutingProtocolModel.test' was never awaited
  async for test in generator:
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
/home/cbaillar/git_projects/anta/anta/_runner.py:449: RuntimeWarning: coroutine 'VerifyAcctConsoleMethods.test' was never awaited
  async for test in generator:

anta/device.py Outdated Show resolved Hide resolved
anta/device.py Outdated Show resolved Hide resolved
anta/device.py Outdated Show resolved Hide resolved
anta/settings.py Outdated Show resolved Hide resolved
@@ -37,6 +37,7 @@ def test_anta_dry_run(

results = session_results[request.node.callspec.id]

# TODO: Use AntaRunner in ANTA v2.0.0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed?
I think the anta.runner.main function uses the AntaRunner class already.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can consider removing main() altogether and use AntaRunner everywhere in v2.0.0.

@@ -69,6 +70,7 @@ def test_anta(

results = session_results[request.node.callspec.id]

# TODO: Use AntaRunner in ANTA v2.0.0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put this TODO everywhere we are using main().

Copy link
Collaborator

@mtache mtache left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default value code could be simplified/cleaned.
Good idea to have this anta.settings module, we should use it for ANTA_DEBUG, in another PR ;)

Copy link
Contributor

github-actions bot commented Feb 7, 2025

This pull request has conflicts, please resolve those before we can evaluate the pull request.

Copy link
Contributor

github-actions bot commented Feb 7, 2025

Conflicts have been resolved. A maintainer will review the pull request shortly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants