diff --git a/sner/server/scheduler/commands.py b/sner/server/scheduler/commands.py index 7690e442..c1ff9035 100644 --- a/sner/server/scheduler/commands.py +++ b/sner/server/scheduler/commands.py @@ -10,7 +10,7 @@ from flask import current_app from flask.cli import with_appcontext -from sner.server.scheduler.core import enumerate_network, QueueManager +from sner.server.scheduler.core import enumerate_network, QueueManager, SchedulerService from sner.server.scheduler.models import Queue @@ -90,3 +90,12 @@ def queue_prune_command(queue_name): QueueManager.prune(queue) sys.exit(0) + + +@command.command(name='readynet-recount', help='refresh readynets for current heatmap_hot_level') +@with_appcontext +def readynet_recount_command(): + """refresh readynets for current heatmap_hot_level""" + + SchedulerService.readynet_recount() + sys.exit(0) diff --git a/sner/server/scheduler/core.py b/sner/server/scheduler/core.py index f083a493..573bf5fb 100644 --- a/sner/server/scheduler/core.py +++ b/sner/server/scheduler/core.py @@ -564,3 +564,35 @@ def job_output(cls, job, retval, output): cls.heatmap_pop(cls.hashval(target)) cls.release_lock() + + @classmethod + def readynet_recount(cls): + """ + rescan targets and update readynets table for new heatmap hot level + """ + + cls.get_lock() + conn = db.session.connection() + + if current_app.config['SNER_HEATMAP_HOT_LEVEL']: + hot_hashvals = set(conn.execute( + select(Heatmap.hashval).filter(Heatmap.count >= current_app.config['SNER_HEATMAP_HOT_LEVEL']) + ).scalars().all()) + + # all heatmap hashvals over limit remove from readynet + conn.execute(delete(Readynet).filter(Readynet.hashval.in_(hot_hashvals))) + else: + hot_hashvals = set() + + # for all target hashvals except over limit insert as readynet for all queues + all_hashvals = set(conn.execute(select(func.distinct(Target.hashval))).scalars().all()) + for thashval in (all_hashvals - hot_hashvals): + for queue_id in conn.execute(select(func.distinct(Target.queue_id)).filter(Target.hashval == thashval)).scalars().all(): + conn.execute( + pg_insert(Readynet) + .values(queue_id=queue_id, hashval=thashval) + .on_conflict_do_nothing(constraint='readynet_pkey') + ) + + db.session.commit() + cls.release_lock() diff --git a/tests/server/scheduler/test_commands.py b/tests/server/scheduler/test_commands.py index 95176f98..e85790a3 100644 --- a/tests/server/scheduler/test_commands.py +++ b/tests/server/scheduler/test_commands.py @@ -83,3 +83,10 @@ def test_queue_prune_command(runner, job_completed): assert not Job.query.filter(Job.queue_id == job_completed.queue_id).all() assert not Path(job_completed.output_abspath).exists() + + +def test_readynet_recount_command(runner): + """test readynet_recount command""" + + result = runner.invoke(command, ['readynet-recount']) + assert result.exit_code == 0 diff --git a/tests/server/scheduler/test_core.py b/tests/server/scheduler/test_core.py index 67104243..bc7cce77 100644 --- a/tests/server/scheduler/test_core.py +++ b/tests/server/scheduler/test_core.py @@ -3,13 +3,14 @@ scheduler core tests """ -from ipaddress import ip_network +from ipaddress import ip_address, ip_network import pytest +from flask import current_app from sner.server.extensions import db from sner.server.scheduler.core import ExclMatcher, SchedulerService -from sner.server.scheduler.models import Excl, ExclFamily, Heatmap, Readynet +from sner.server.scheduler.models import Excl, ExclFamily, Heatmap, Job, Readynet def test_model_excl_validation(): @@ -84,6 +85,39 @@ def test_schedulerservice_readynetupdates(app, queue, target_factory): # pylint assert Readynet.query.count() == 1 +def test_schedulerservice_morereadynetupdates(app, queue, target_factory): # pylint: disable=unused-argument + """ + test scheduler service readynet manipulation + + used to analyze and reason about heatmap_pop readynet sql queries for update readynet lists. + using readynet updates `if heat < level` provides automatic updates for readynets when hot_level changes + in runtime, but produces extra queries for every returning job. result: on hot_level change + readynets map must be manually recounted. + """ + + current_app.config['SNER_HEATMAP_HOT_LEVEL'] = 5 + queue.group_size = 2 + + for addr in range(10): + tmp = str(ip_address(addr)) + target_factory.create(queue=queue, target=tmp, hashval=SchedulerService.hashval(tmp)) + db.session.commit() + + assignment1 = SchedulerService.job_assign(None, []) + assignment2 = SchedulerService.job_assign(None, []) + assignment3 = SchedulerService.job_assign(None, []) + + assert len(assignment3['targets']) == 1 + assert Readynet.query.count() == 0 + + SchedulerService.job_output(Job.query.get(assignment3['id']), 0, b'') + assert Readynet.query.count() == 1 + + SchedulerService.job_output(Job.query.get(assignment2['id']), 0, b'') + SchedulerService.job_output(Job.query.get(assignment1['id']), 0, b'') + assert Readynet.query.count() == 1 + + def test_schedulerservice_hashvalprocessing(app, queue, target_factory): # pylint: disable=unused-argument """test scheduler service hashvalsreadynet manipulation""" @@ -94,3 +128,33 @@ def test_schedulerservice_hashvalprocessing(app, queue, target_factory): # pyli assert assignment assert Heatmap.query.one().hashval == '127.0.0.0/24' + + +def test_schedulerservice_readynetrecount(app, queue, target_factory): # pylint: disable=unused-argument + """test scheduler service readynet_recount""" + + current_app.config['SNER_HEATMAP_HOT_LEVEL'] = 5 + queue.group_size = 2 + + for addr in range(10): + tmp = str(ip_address(addr)) + target_factory.create(queue=queue, target=tmp, hashval=SchedulerService.hashval(tmp)) + db.session.commit() + + SchedulerService.job_assign(None, []) + SchedulerService.job_assign(None, []) + + assignment3 = SchedulerService.job_assign(None, []) + assert len(assignment3['targets']) == 1 + assert Readynet.query.count() == 0 + + current_app.config['SNER_HEATMAP_HOT_LEVEL'] = 7 + SchedulerService.readynet_recount() + assert Readynet.query.count() == 1 + + assignment4 = SchedulerService.job_assign(None, []) + assert len(assignment4['targets']) == 2 + + current_app.config['SNER_HEATMAP_HOT_LEVEL'] = 3 + SchedulerService.readynet_recount() + assert Readynet.query.count() == 0