Skip to content

Commit

Permalink
Revision 2 to Monte Carlo Extension (#92)
Browse files Browse the repository at this point in the history
* Tinkering with MC worker Queue

* Tinkering with MC worker Queue

* Tinkering with MC worker Queue

* Tinkering with MC worker Queue

- adding timing to post-processing

* Tinkering with MC worker Queue

- adding timing to post-processing

* Tinkering with MC worker Queue

- adding timing to post-processing

* Tinkering with MC worker Queue

- adding timing to post-processing

* Tinkering with MC worker Queue

* Tinkering with MC worker Queue

* Tinkering with MC worker Queue

* Tinkering with MC worker Queue

- adding timing to post-processing

* Tinkering with MC worker Queue

- adding timing to post-processing

* Tinkering with MC worker Queue

- adding timing to post-processing

* Tinkering with MC worker Queue

- adding timing to post-processing

* MC running with new MC Worker

- queues are now swapping data vs. models
- table_writer.py broken up into data pulling portion/writing portion

* Fixed bug in data_brick.py that was not actually returning an obj.

* adjust workers for TRACE

* adding some additional logging, reverting worker settings to 6 workers at 20 threads ea.

* adding some additional logging, reverting worker settings to 6 workers at 20 threads ea.

* adding some additional logging, reverting worker settings to 6 workers at 20 threads ea.

* Clean Up work on MC

- commented some functions
- removed some erroneous "sizeof" measures

* Adding T/S log entries

* Increase size of work queue

- was hanging while putting in shutdown signals

* Rev 2.0 of MC complete

- works with Utopia
- logging cleaned up

* Rev 2.0 of MC complete

- works with Utopia
- logging cleaned up
  • Loading branch information
jeff-ws authored Jan 6, 2025
1 parent 1f7444d commit 9a25aa5
Show file tree
Hide file tree
Showing 10 changed files with 943 additions and 433 deletions.
2 changes: 1 addition & 1 deletion data_files/my_configs/monte_carlo_utopia.toml
Original file line number Diff line number Diff line change
Expand Up @@ -110,5 +110,5 @@ activity_labels = []

[monte_carlo]
# a path from the PROJECT ROOT to the settings file that contains the run data.
run_settings = 'data_files/monte_carlo/run_settings_1.csv'
run_settings = 'data_files/monte_carlo/run_settings_4.csv'

12 changes: 8 additions & 4 deletions temoa/extensions/monte_carlo/MC_solver_options.toml
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
# A container for solver options
# the top level solver name in brackets should align with the solver name in the config.toml

num_workers = 6
num_workers = 11

[gurobi]

Method= 2 # Barrier ONLY
Threads= 20 # per solver instance
BarConvTol = 0.01 # Relative Barrier Tolerance primal-dual
FeasibilityTol= 1e-2 # pretty loose
BarConvTol = 1.0e-2 # Relative Barrier Tolerance primal-dual
FeasibilityTol= 1.0e-2 # pretty loose
Crossover= 0 # Disabled
TimeLimit= 18000 # 5 hrs

Expand All @@ -20,7 +20,11 @@ TimeLimit= 18000 # 5 hrs
# 'LPWarmStart': 2, # pass basis

[cbc]
primalT = 1e-3
dualT = 1e-3
# tbd

[appsi_highs]
# tbd
threads = 2
primal_feasibility_tolerance = 1e-3
dual_feasibility_tolerance = 1e-3
16 changes: 13 additions & 3 deletions temoa/extensions/monte_carlo/mc_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
from logging import getLogger
from pathlib import Path

from pyomo.dataportal import DataPortal

from definitions import PROJECT_ROOT
from temoa.temoa_model.hybrid_loader import HybridLoader
from temoa.temoa_model.temoa_config import TemoaConfig
Expand Down Expand Up @@ -168,7 +170,7 @@ def row_parser(self, row_number: int, row: str) -> RowData:

class MCRun:
"""
The data (and more?) to support a model build + run
A Container class to hold the data (and more?) to support a model build + run
"""

def __init__(
Expand All @@ -191,8 +193,15 @@ def change_records(self) -> list[ChangeRecord]:
return res

@property
def model(self) -> TemoaModel:
def model_dp(self) -> tuple[str, DataPortal]:
"""tuple of the indexed name for the scenario, and the DP"""
name = f'{self.scenario_name}-{self.run_index}'
dp = HybridLoader.data_portal_from_data(self.data_store)
return name, dp

@property
def model(self) -> TemoaModel:
dp = self.model_dp
model = TemoaModel()
instance = model.create_instance(data=dp)
# update the name to indexed...
Expand Down Expand Up @@ -329,7 +338,8 @@ def run_generator(self) -> Generator[MCRun, None, None]:
"""
ts_gen = self.tweak_set_generator()
for run, tweaks in ts_gen:
logger.info('Making run %d from %d tweaks: %s', run, len(tweaks), tweaks)
logger.info('Making run %d from %d tweaks', run, len(tweaks))
logger.debug('Run %d tweaks: %s', run, tweaks)

# need to make a DEEP copy of the orig, which holds other dictionaries...
data_store = {k: v.copy() for k, v in self.data_store.items()}
Expand Down
82 changes: 64 additions & 18 deletions temoa/extensions/monte_carlo/mc_sequencer.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@
from multiprocessing import Queue
from pathlib import Path

from pyomo.dataportal import DataPortal

from definitions import PROJECT_ROOT, get_OUTPUT_PATH
from temoa.extensions.modeling_to_generate_alternatives.worker import Worker
from temoa.extensions.monte_carlo.mc_run import MCRunFactory
from temoa.extensions.monte_carlo.mc_worker import MCWorker
from temoa.temoa_model.data_brick import DataBrick
from temoa.temoa_model.hybrid_loader import HybridLoader
from temoa.temoa_model.table_writer import TableWriter
from temoa.temoa_model.temoa_config import TemoaConfig
from temoa.temoa_model.temoa_model import TemoaModel

logger = getLogger(__name__)

Expand Down Expand Up @@ -113,11 +115,13 @@ def start(self):

# 4. Set up the workers
num_workers = self.num_workers
work_queue = Queue(1) # restrict the queue to hold just 1 models in it max
result_queue = Queue(
work_queue: Queue[tuple[str, DataPortal] | str] = Queue(
num_workers + 1
) # must be able to hold all shutdowns at once (could be changed later to not lock on insertion...)
result_queue: Queue[DataBrick | str] = Queue(
num_workers + 1
) # must be able to hold a shutdown signal from all workers at once!
log_queue = Queue(50)
log_queue = Queue()
# make workers
workers = []
kwargs = {
Expand All @@ -129,8 +133,8 @@ def start(self):
if not s_path.exists():
s_path.mkdir()
for i in range(num_workers):
w = Worker(
model_queue=work_queue,
w = MCWorker(
dp_queue=work_queue,
results_queue=result_queue,
log_root_name=__name__,
log_queue=log_queue,
Expand All @@ -148,34 +152,53 @@ def start(self):
mc_run = next(run_gen)
# capture the "tweaks"
self.writer.write_tweaks(iteration=mc_run.run_index, change_records=mc_run.change_records)
instance = mc_run.model
run_name, dp = mc_run.model_dp
iter_counter = 0
while more_runs:
try:
work_queue.put(instance, block=False) # put a log on the fire, if room
logger.info('Putting an instance in the work queue')
tic = datetime.now()
work_queue.put((run_name, dp), block=False) # put a log on the fire, if room
toc = datetime.now()

logger.info(
'Put a DataPortal in the work queue in work queue in %0.2f seconds',
(toc - tic).total_seconds(),
)
try:
tic = datetime.now()
mc_run = next(run_gen)
toc = datetime.now()
logger.info(
'Made mc_run from generator in %0.2f seconds', (toc - tic).total_seconds()
)
# capture the "tweaks"
self.writer.write_tweaks(
iteration=mc_run.run_index, change_records=mc_run.change_records
)
instance = mc_run.model
# ready the next one
run_name, dp = mc_run.model_dp
except StopIteration:
logger.debug('Pulled last run from run generator')
logger.info('Pulled last DP from run generator')
more_runs = False
except queue.Full:
# print('work queue is full')
pass
# see if there is a result ready to pick up, if not, pass
try:
tic = datetime.now()
next_result = result_queue.get_nowait()
toc = datetime.now()
logger.info(
'Pulled DataBrick from result_queue in %0.2f seconds',
(toc - tic).total_seconds(),
)
except queue.Empty:
next_result = None
# print('no result')
if next_result is not None:
self.process_solve_results(next_result)
logger.info('Solve count: %d', self.solve_count)
self.solve_count += 1
logger.info('Solve count: %d', self.solve_count)
if self.verbose or not self.config.silent:
print(f'MC Solve count: {self.solve_count}')
# pull anything from the logging queue and log it...
Expand All @@ -188,28 +211,45 @@ def start(self):
break
time.sleep(0.1) # prevent hyperactivity...

# check the queues...
if iter_counter % 6000 == 0: # about every 10 minutes...post the queue sizes
try:
logger.info('Work queue size: %d', work_queue.qsize())
logger.info('Result queue size: %d', result_queue.qsize())
except NotImplementedError:
pass
# not implemented on OSX
finally:
iter_counter = 0
iter_counter += 1

# 7. Shut down the workers and then the logging queue
if self.verbose:
print('shutting it down')
for _ in workers:
if self.verbose:
print('shutdown sent')
work_queue.put('ZEBRA') # shutdown signal
logger.debug('Put "ZEBRA" on work queue (shutdown signal)')

# 7b. Keep pulling results from the queue to empty it out
empty = 0
logger.debug('Starting the waiting process to wrap up...')
while True:
# print(f'{empty}-', end='')
# logger.debug('Polling result queue...')
try:
next_result = result_queue.get_nowait()
if next_result == 'COYOTE': # shutdown signal
logger.debug('Got COYOTE (shutdown received)')
empty += 1
except queue.Empty:
next_result = None
if next_result is not None and next_result != 'COYOTE':
logger.debug('bagged a result post-shutdown')
self.process_solve_results(next_result)
logger.info('Solve count: %d', self.solve_count)
self.solve_count += 1
logger.info('Solve count: %d', self.solve_count)
if self.verbose or not self.config.silent:
print(f'MC Solve count: {self.solve_count}')
while True:
Expand All @@ -228,6 +268,7 @@ def start(self):

log_queue.close()
log_queue.join_thread()
logger.debug('All queues closed')
if self.verbose:
print('log queue closed')
work_queue.close()
Expand All @@ -239,15 +280,20 @@ def start(self):
if self.verbose:
print('result queue joined')

def process_solve_results(self, instance: TemoaModel):
def process_solve_results(self, brick: DataBrick):
"""write the results as required"""
# get the instance number from the model name, if provided
if '-' not in instance.name:
if '-' not in brick.name:
raise ValueError(
'Instance name does not appear to contain a -idx value. The manager should be tagging/updating this'
)
idx = int(instance.name.split('-')[-1])
idx = int(brick.name.split('-')[-1])
if idx in self.seen_instance_indices:
raise ValueError(f'Instance index {idx} already seen. Likely coding error')
self.seen_instance_indices.add(idx)
self.writer.write_mc_results(M=instance, iteration=idx)
tic = datetime.now()
self.writer.write_mc_results(brick=brick, iteration=idx)
toc = datetime.now()
logger.info(
'Processed results for %s in %0.2f seconds', brick.name, (toc - tic).total_seconds()
)
Loading

0 comments on commit 9a25aa5

Please sign in to comment.