diff --git a/pyproject.toml b/pyproject.toml index 97afece..0e8110a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,27 +7,28 @@ name = "blop" description = "Beamline optimization with machine learning" readme = { file = "README.rst", content-type = "text/x-rst" } authors = [ - { name = "Brookhaven National Laboratory", email = "tmorris@bnl.gov" } + { name = "Brookhaven National Laboratory", email = "tmorris@bnl.gov" }, ] maintainers = [ - { name = "Brookhaven National Laboratory", email = "tmorris@bnl.gov" } + { name = "Brookhaven National Laboratory", email = "tmorris@bnl.gov" }, ] requires-python = ">=3.9" dependencies = [ - "area-detector-handlers", - "bluesky", - "botorch", - "databroker", - "gpytorch", - "h5py", - "matplotlib", - "numpy", - "ophyd", - "ortools", - "scipy", - "tables", - "torch", + "area-detector-handlers", + "bluesky", + "botorch", + "databroker", + "gpytorch", + "h5py", + "matplotlib", + "numpy", + "ophyd", + "ortools", + "scipy", + "tables", + "torch", + "tiled", ] classifiers = [ @@ -44,39 +45,33 @@ dynamic = ["version"] [project.optional-dependencies] -sirepo = [ - "sirepo-bluesky" -] +sirepo = ["sirepo-bluesky"] -napari = [ - "napari" -] +napari = ["napari"] -gui = [ - "nicegui" -] +gui = ["nicegui"] dev = [ - "black", - "pytest-codecov", - "coverage", - "flake8", - "furo", - "isort", - "nbstripout", - "pre-commit", - "pre-commit-hooks", - "pytest", - "sphinx", - "twine", - "ipython", - "jupyter", - "matplotlib", - "nbsphinx", - "numpydoc", - "pandoc", - "sphinx-copybutton", - "sphinx_rtd_theme", + "black", + "pytest-codecov", + "coverage", + "flake8", + "furo", + "isort", + "nbstripout", + "pre-commit", + "pre-commit-hooks", + "pytest", + "sphinx", + "twine", + "ipython", + "jupyter", + "matplotlib", + "nbsphinx", + "numpydoc", + "pandoc", + "sphinx-copybutton", + "sphinx_rtd_theme", ] diff --git a/src/blop/agent.py b/src/blop/agent.py index c3fcdce..83dfc43 100644 --- a/src/blop/agent.py +++ b/src/blop/agent.py @@ -3,9 +3,10 @@ import pathlib import time as ttime import warnings +from abc import ABC, abstractmethod from collections import OrderedDict from collections.abc import Mapping -from typing import Callable, Optional, Sequence, Tuple +from typing import Callable, Dict, List, Optional, Sequence, Tuple, Union import bluesky.plan_stubs as bps # noqa F401 import bluesky.plans as bp # noqa F401 @@ -22,6 +23,7 @@ from botorch.models.model_list_gp_regression import ModelListGP from botorch.models.transforms.input import Normalize from databroker import Broker +from numpy.typing import ArrayLike from ophyd import Signal from . import plotting, utils @@ -61,14 +63,32 @@ def _validate_dofs_and_objs(dofs: DOFList, objs: ObjectiveList): ) -class Agent: +class BlueskyAdaptiveBaseAgent(ABC): + """Placeholder for inheritance for clarity, while bluesky-adaptive deps is a moving target. + E.g. some versions of databroker v2 will break the current implementation. + """ + + @abstractmethod + def measurement_plan(self, point: ArrayLike) -> Tuple[str, List, dict]: ... # noqa: E704 + + @staticmethod + @abstractmethod + def unpack_run(run) -> Tuple[Union[float, ArrayLike], Union[float, ArrayLike]]: ... # noqa: E704 + + @abstractmethod + def tell(self, x, y) -> Dict[str, ArrayLike]: ... # noqa: E704 + + @abstractmethod + def ask(self, batch_size: int) -> Tuple[Sequence[Dict[str, ArrayLike]], Sequence[ArrayLike]]: ... # noqa: E704 + + +class BaseAgent: def __init__( self, + *, dofs: Sequence[DOF], objectives: Sequence[Objective], - db: Broker = None, - detectors: Sequence[Signal] = None, - acquistion_plan=default_acquisition_plan, + acquistion_plan: Optional[Union[Callable, str]] = default_acquisition_plan, digestion: Callable = default_digestion_function, digestion_kwargs: dict = {}, verbose: bool = False, @@ -77,63 +97,41 @@ def __init__( model_inactive_objectives: bool = False, tolerate_acquisition_errors: bool = False, sample_center_on_init: bool = False, - trigger_delay: float = 0, - train_every: int = 1, ): - """ - A Bayesian optimization agent. + """_summary_ Parameters ---------- - dofs : iterable of DOF objects + dofs : Sequence[DOF] The degrees of freedom that the agent can control, which determine the output of the model. - objectives : iterable of Objective objects + objectives : Sequence[Objective] The objectives which the agent will try to optimize. - detectors : iterable of ophyd objects - Detectors to trigger during acquisition. - acquisition_plan : optional - A plan that samples the beamline for some given inputs. - digestion : - A function to digest the output of the acquisition, taking a DataFrame as an argument. - digestion_kwargs : - Some kwargs for the digestion function. - db : optional - A databroker instance. - verbose : bool - To be verbose or not. - tolerate_acquisition_errors : bool - Whether to allow errors during acquistion. If `True`, errors will be caught as warnings. - sample_center_on_init : bool - Whether to sample the center of the DOF limits when the agent has no data yet. - trigger_delay : float - How many seconds to wait between moving DOFs and triggering detectors. + acquistion_plan : Callable, optional + A plan that samples the beamline for some given inputs, by default default_acquisition_plan. + Called directly in Agent, used only by __name__ in BlueskyAdaptiveAgent. + digestion : Callable, optional + A function to digest the output of the acquisition, taking a DataFrame as an argument, + by default default_digestion_function + digestion_kwargs : dict, optional + Some kwargs for the digestion function, by default {} + verbose : bool, optional + To be verbose or not, by default False + enforce_all_objectives_valid : bool, optional # TODO + _description_, by default True + exclude_pruned : bool, optional # TODO + _description_, by default True + model_inactive_objectives : bool, optional # TODO + _description_, by default False + tolerate_acquisition_errors : bool, optional + Whether to allow errors during acquistion. If `True`, errors will be caught as warnings, by default False + sample_center_on_init : bool, optional + Whether to sample the center of the DOF limits when the agent has no data yet, by default False """ - - # DOFs are parametrized by whether they are active and whether they are read-only - # - # below are the behaviors of DOFs of each kind and mode: - # - # 'read': the agent will read the input on every acquisition (all dofs are always read) - # 'move': the agent will try to set and optimize over these (there must be at least one of these) - # 'input' means that the agent will use the value to make its posterior - # - # - # not read-only read-only - # +---------------------+---------------+ - # active | read, input, move | read, input | - # +---------------------+---------------+ - # inactive | read | read | - # +---------------------+---------------+ - # - # - self.dofs = DOFList(list(np.atleast_1d(dofs))) self.objectives = ObjectiveList(list(np.atleast_1d(objectives))) - self.detectors = list(np.atleast_1d(detectors or [])) _validate_dofs_and_objs(self.dofs, self.objectives) - self.db = db self.acquisition_plan = acquistion_plan self.digestion = digestion self.digestion_kwargs = digestion_kwargs @@ -145,8 +143,6 @@ def __init__( self.enforce_all_objectives_valid = enforce_all_objectives_valid self.exclude_pruned = exclude_pruned - self.train_every = train_every - self.trigger_delay = trigger_delay self.sample_center_on_init = sample_center_on_init self._table = pd.DataFrame() @@ -156,67 +152,224 @@ def __init__( self.n_last_trained = 0 - @property - def table(self): - return self._table + def raw_inputs(self, index=None, **subset_kwargs): + """ + Get the raw, untransformed inputs for a DOF (or for a subset). + """ + if index is None: + return torch.cat([self.raw_inputs(dof.name) for dof in self.dofs(**subset_kwargs)], dim=-1) + return torch.tensor(self._table.loc[:, self.dofs[index].name].values, dtype=torch.double).unsqueeze(-1) - def unpack_run(self): - return + def train_inputs(self, index=None, **subset_kwargs): + """A two-dimensional tensor of all DOF values.""" - def measurement_plan(self): - return + if index is None: + return torch.cat([self.train_inputs(index=dof.name) for dof in self.dofs(**subset_kwargs)], dim=-1) - def __iter__(self): - for index in range(len(self)): - yield self.dofs[index] + dof = self.dofs[index] + raw_inputs = self.raw_inputs(index=index, **subset_kwargs) - def __getattr__(self, attr): - acqf_config = acquisition.parse_acqf_identifier(attr, strict=False) - if acqf_config is not None: - acqf, _ = _construct_acqf(agent=self, acqf_name=acqf_config["name"]) - return acqf - raise AttributeError(f"No attribute named '{attr}'.") + return dof._transform(raw_inputs) - def refresh(self): - self._construct_all_models() - self._train_all_models() + def raw_targets(self, index=None, **subset_kwargs): + """ + Get the raw, untransformed inputs for an objective (or for a subset). + """ + values = {} - def redigest(self): - self._table = self.digestion(self._table, **self.digestion_kwargs) + for obj in self.objectives(**subset_kwargs): + # return torch.cat([self.raw_targets(index=obj.name) for obj in self.objectives(**subset_kwargs)], dim=-1) + values[obj.name] = torch.tensor(self._table.loc[:, obj.name].values, dtype=torch.double) - def sample(self, n: int = DEFAULT_MAX_SAMPLES, normalize: bool = False, method: str = "quasi-random") -> torch.Tensor: + return values + + def train_targets(self, concatenate=False, **subset_kwargs): + """Returns the values associated with an objective name.""" + + targets_dict = {} + raw_targets_dict = self.raw_targets(**subset_kwargs) + + for obj in self.objectives(**subset_kwargs): + y = raw_targets_dict[obj.name] + + targets_dict[obj.name] = obj._transform(y) + + if self.enforce_all_objectives_valid: + all_valid_mask = True + + for name, values in targets_dict.items(): + all_valid_mask &= ~values.isnan() + + for name in targets_dict.keys(): + targets_dict[name] = targets_dict[name].where(all_valid_mask, np.nan) + + if concatenate: + return torch.cat([values.unsqueeze(-1) for values in targets_dict.values()], axis=-1) + + return targets_dict + + def _latent_dim_tuples(self, obj_index=None): + """ + For the objective indexed by 'obj_index', return a list of tuples, where each tuple represents + a group of DOFs to fit a latent representation to. """ - Returns a (..., 1, n_active_dofs) tensor of points sampled within the parameter space. - Parameters - ---------- - n : int - How many points to sample. - method : str - How to sample the points. Must be one of 'quasi-random', 'random', or 'grid'. - normalize: bool - If True, sample the unit hypercube. If False, sample the parameter space of the agent. + if obj_index is None: + return {obj.name: self._latent_dim_tuples(obj_index=obj.name) for obj in self.objectives} + + obj = self.objectives[obj_index] + + latent_group_index = {} + for dof in self.dofs(active=True): + latent_group_index[dof.name] = dof.name + for group_index, latent_group in enumerate(obj.latent_groups): + if dof.name in latent_group: + latent_group_index[dof.name] = group_index + + u, uinv = np.unique(list(latent_group_index.values()), return_inverse=True) + return [tuple(np.where(uinv == i)[0]) for i in range(len(u))] + + # @property + def pruned_mask(self): + if self.exclude_pruned and "prune" in self._table.columns: + return torch.tensor(self._table.prune.values.astype(bool)) + return torch.zeros(len(self._table)).bool() + + @property + def input_normalization(self): """ + Suitably transforms model inputs to the unit hypercube. - active_dofs = self.dofs(active=True) + For modeling: - if method == "quasi-random": - X = utils.normalized_sobol_sampler(n, d=len(active_dofs)) + Always normalize between min and max values. This is always inside the trust domain, sometimes smaller. - elif method == "random": - X = torch.rand(size=(n, 1, len(active_dofs))) + For sampling: - elif method == "grid": - n_side_if_settable = int(np.power(n, 1 / np.sum(~active_dofs.read_only))) - sides = [ - torch.linspace(0, 1, n_side_if_settable) if not dof.read_only else torch.zeros(1) for dof in active_dofs - ] - X = torch.cat([x.unsqueeze(-1) for x in torch.meshgrid(sides, indexing="ij")], dim=-1).unsqueeze(-2).double() + Settable: normalize between search bounds + Read-only: constrain to the readback value + """ + + return Normalize(d=self.dofs.active.sum()) + + def _construct_model(self, obj, skew_dims=None): + """ + Construct an untrained model for an objective. + """ + + skew_dims = skew_dims if skew_dims is not None else self._latent_dim_tuples(obj.name) + + train_inputs = self.train_inputs(active=True) + train_targets = self.train_targets()[obj.name].unsqueeze(-1) + + inputs_are_trusted = ~torch.isnan(train_inputs).any(axis=1) + targets_are_trusted = ~torch.isnan(train_targets).any(axis=1) + + trusted = inputs_are_trusted & targets_are_trusted & ~self.pruned_mask() + + obj._model = construct_single_task_model( + X=train_inputs[trusted], + y=train_targets[trusted], + min_noise=obj.min_noise, + max_noise=obj.max_noise, + skew_dims=self._latent_dim_tuples()[obj.name], + ) + + obj.model_dofs = set(self.dofs(active=True).names) # if these change, retrain the model on self.ask() + + if trusted.all(): + obj.validity_conjugate_model = None + obj.validity_constraint = GenericDeterministicModel(f=lambda x: torch.ones(size=x.size())[..., -1]) else: - raise ValueError("'method' argument must be one of ['quasi-random', 'random', 'grid'].") + dirichlet_likelihood = gpytorch.likelihoods.DirichletClassificationLikelihood( + trusted.long(), learn_additional_noise=True + ) - return X.double() if normalize else self.dofs(active=True).untransform(X).double() + obj.validity_conjugate_model = models.LatentDirichletClassifier( + train_inputs=train_inputs[inputs_are_trusted], + train_targets=dirichlet_likelihood.transformed_targets.transpose(-1, -2)[inputs_are_trusted].double(), + skew_dims=skew_dims, + likelihood=dirichlet_likelihood, + input_transform=self.input_normalization, + ) + + obj.validity_constraint = GenericDeterministicModel( + f=lambda x: obj.validity_conjugate_model.probabilities(x)[..., -1] + ) + + def update_models( + self, + train: Optional[bool] = None, + ): + objectives_to_model = self.objectives if self.model_inactive_objectives else self.objectives(active=True) + for obj in objectives_to_model: + t0 = ttime.monotonic() + + cached_hypers = obj.model.state_dict() if hasattr(obj, "_model") else None + n_before_tell = obj.n_valid + self._construct_model(obj) + n_after_tell = obj.n_valid + + if train is None: + train = int(n_after_tell / self.train_every) > int(n_before_tell / self.train_every) + + if len(obj.model.train_targets) >= 4: + if train: + t0 = ttime.monotonic() + train_model(obj.model) + if self.verbose: + logger.debug(f"trained model '{obj.name}' in {1e3 * (ttime.monotonic() - t0):.00f} ms") + + else: + train_model(obj.model, hypers=cached_hypers) + + def tell( + self, + data: Optional[Mapping] = {}, + x: Optional[Mapping] = {}, + y: Optional[Mapping] = {}, + metadata: Optional[Mapping] = {}, + append: bool = True, + update_models: bool = True, + train: Optional[bool] = None, + ): + """ + Inform the agent about new inputs and targets for the model. + + If run with no arguments, it will just reconstruct all the models. + + Parameters + ---------- + x : dict + A dict keyed by the name of each DOF, with a list of values for each DOF. + y : dict + A dict keyed by the name of each objective, with a list of values for each objective. + append: bool + If `True`, will append new data to old data. If `False`, will replace old data with new data. + train: bool + Whether to train the models on construction. + hypers: + A dict of hyperparameters for the model to assume a priori, instead of training. + """ + + if not data: + if not x and y: + raise ValueError("Must supply either x and y, or data.") + data = {**x, **y, **metadata} + + data = {k: list(np.atleast_1d(v)) for k, v in data.items()} + unique_field_lengths = {len(v) for v in data.values()} + + if len(unique_field_lengths) > 1: + raise ValueError("All supplies values must be the same length!") + + # TODO: This is an innefficient approach to caching data. Keep a list, make table at update model time. + new_table = pd.DataFrame(data) + self._table = pd.concat([self._table, new_table]) if append else new_table + self._table.index = np.arange(len(self._table)) + if update_models: + self.update_models(train=train) def ask(self, acqf="qei", n=1, route=True, sequential=True, upsample=1, **acqf_kwargs): """Ask the agent for the best point to sample, given an acquisition function. @@ -288,106 +441,374 @@ def ask(self, acqf="qei", n=1, route=True, sequential=True, upsample=1, **acqf_k # p = self.posterior(candidates) if hasattr(self, "model") else None - active_dofs = self.dofs(active=True) + active_dofs = self.dofs(active=True) + + points = candidates[..., ~active_dofs.read_only] + read_only_values = candidates[..., active_dofs.read_only] + + duration = 1e3 * (ttime.monotonic() - start_time) + + if route and n > 1: + current_points = np.array([dof.readback for dof in active_dofs if not dof.read_only]) + travel_expenses = np.array([dof.travel_expense for dof in active_dofs if not dof.read_only]) + routing_index = utils.route(current_points, points, dim_weights=travel_expenses) + points = points[routing_index] + + if upsample > 1: + if n == 1: + raise ValueError("Cannot upsample points unless n > 1.") + idx = np.arange(len(points)) + upsampled_idx = np.linspace(0, len(idx) - 1, upsample * len(idx) - 1) + points = sp.interpolate.interp1d(idx, points, axis=0)(upsampled_idx) + + res = { + "points": {dof.name: list(points[..., i]) for i, dof in enumerate(active_dofs(read_only=False))}, + "acqf_name": acqf_config["name"], + "acqf_obj": list(np.atleast_1d(acqf_obj.numpy())), + "acqf_kwargs": acqf_kwargs, + "duration_ms": duration, + "sequential": sequential, + "upsample": upsample, + "read_only_values": read_only_values, + # "posterior": p, + } + + return res + + +class BlueskyAdaptiveAgent(BaseAgent, BlueskyAdaptiveBaseAgent): + + def __init__( + self, *, acqf_string, route, sequential, upsample, acqf_kwargs, detector_names: Optional[List[str]] = (), **kwargs + ): + super().__init__(**kwargs) + self._acqf_string = acqf_string + self._route = route + self._sequential = sequential + self._upsample = upsample + self._acqf_kwargs = acqf_kwargs + self._detector_names = list(detector_names) + + @property + def detector_names(self): + return [str(name) for name in self._detector_names] + + @detector_names.setter + def detector_names(self, names): + self._detector_names = list(names) + + @property + def acquisition_function(self): + return str(self._acqf_string) + + @acquisition_function.setter + def acquisition_function(self, acqf_string): + self._acqf_string = str(acqf_string) + + @property + def route(self): + return bool(self._route) + + @route.setter + def route(self, route): + self._route = route + + @property + def sequential(self): + return bool(self._sequential) + + @sequential.setter + def sequential(self, sequential): + self._sequential = sequential + + @property + def upsample(self): + return int(self._upsample) + + @upsample.setter + def upsample(self, upsample): + self._upsample = int(upsample) + + @property + def acqf_kwargs(self): + return {str(k): str(v) for k, v in self._acqf_kwargs.items()} + + def update_acqf_kwargs(self, **kwargs): + self._acqf_kwargs.update(kwargs) + + @property + def detector_names(self): + return [str(name) for name in self._detector_names] + + @detector_names.setter + def detector_names(self, names): + self._detector_names = list(names) + + def server_registrations(self): + """This is how we make these avaialble to the REST API.""" + self._register_method("Update Acquistion Function Kwargs", self.update_acqf_kwargs) + self._register_property("Acquisition Function", self.acquisition_function, self.acquisition_function) + self._register_property("Route Points", self.route, self.route) + self._register_property("Sequential Points", self.sequential, self.sequential) + self._register_property("Upsample Points", self.upsample, self.upsample) + return super().server_registrations() + + def ask(self, batch_size) -> Tuple[Sequence[Dict[str, ArrayLike]], Sequence[ArrayLike]]: + default_result = super().ask( + n=batch_size, + acqf=self._acqf_string, + route=self._route, + sequential=self._sequential, + upsample=self._upsample, + **self._acqf_kwargs, + ) + + """res = { + "points": {dof.name: list(points[..., i]) for i, dof in enumerate(active_dofs(read_only=False))}, + "acqf_name": acqf_config["name"], + "acqf_obj": list(np.atleast_1d(acqf_obj.numpy())), + "acqf_kwargs": acqf_kwargs, + "duration_ms": duration, + "sequential": sequential, + "upsample": upsample, + "read_only_values": read_only_values, + # "posterior": p, + } + """ + + points: Dict[str, List[ArrayLike]] = default_result.pop("points") + acqf_obj: List[ArrayLike] = default_result.pop("acqf_obj") + # Turn dict of list of points into list of consistently sized points + points: List[Tuple[ArrayLike]] = list(zip(*[value for _, value in points.items()])) + dicts = [] + for point, obj in zip(points, acqf_obj): + d = default_result.copy() + d["point"] = point + d["acqf_obj"] = obj + dicts.append(d) + return points, dicts + + def tell(self, x, y): + x = {key: x_i for x_i, key in zip(x, self.dofs.names)} + y = {key: y_i for y_i, key in zip(y, self.objectives.names)} + super().tell(data={**x, **y}) + return {**x, **y} + + def report(): + raise NotImplementedError("Report is not implmented for BlueskyAdaptiveAgent") + + def unpack_run(self, run): + """Use my DOFs to convert the run into an independent array, and my objectives to create the dependent array. + In practice for shape management, we will use lists not np.arrays at this stage. + Parameters + ---------- + run : BlueskyRun + + Returns + ------- + independent_var : + The independent variable of the measurement + dependent_var : + The measured data, processed for relevance + """ + if not self.digestion or self.digestion == default_digestion_function: + # Assume all raw data is available in primary stream as keys + return ( + [run.primary.data[key].read() for key in self.dofs.names], + [run.primary.data[key].read() for key in self.objectives.names], + ) + else: + # Hope and pray that the digestion function designed for DataFrame can handle the XArray + data: pd.DataFrame = self.digestion(run.primary.data.read(), **self.digestion_kwargs) + return [data.loc[:, key] for key in self.dofs.names], [data.loc[:, key] for key in self.objectives.names] + + def measurement_plan(self, point: ArrayLike) -> Tuple[str, List, dict]: + """Fetch the string name of a registered plan, as well as the positional and keyword + arguments to pass that plan. + + Args/Kwargs is a common place to transform relative into absolute motor coords, or + other device specific parameters. + + By default, this measurement plan attempts to use in the built in functionality in a QueueServer compatible way. + Signals and Devices are not passed as objects, but serialized as strings for the RE as a service to use. + + Parameters + ---------- + point : ArrayLike + Next point to measure using a given plan + + Returns + ------- + plan_name : str + plan_args : List + List of arguments to pass to plan from a point to measure. + plan_kwargs : dict + Dictionary of keyword arguments to pass the plan, from a point to measure. + """ + if isinstance(self.acquisition_plan, Callable): + plan_name = self.acquisition_plan.__name__ + else: + plan_name = self.acquisition_plan + if plan_name == "default_acquisition_plan": + # Convert point back to dict form for the sake of compatability with default plan + acquisition_dofs = self.dofs(active=True, read_only=False) + + return self.acquisition_plan.__name__, [ + acquisition_dofs, + {dof.name: point[i] for i, dof in enumerate(acquisition_dofs)}, + [*self.detector_names, *[dev.__name__ for dev in self.dofs.devices]], + ] + else: + raise NotImplementedError("Only default_acquisition_plan is implemented") + + +class Agent(BaseAgent): + def __init__( + self, + dofs: Sequence[DOF], + objectives: Sequence[Objective], + db: Broker = None, + detectors: Sequence[Signal] = None, + acquistion_plan=default_acquisition_plan, + digestion: Callable = default_digestion_function, + digestion_kwargs: dict = {}, + verbose: bool = False, + enforce_all_objectives_valid: bool = True, + exclude_pruned: bool = True, + model_inactive_objectives: bool = False, + tolerate_acquisition_errors: bool = False, + sample_center_on_init: bool = False, + trigger_delay: float = 0, + train_every: int = 1, + ): + """ + A Bayesian optimization agent. + + Parameters + ---------- + dofs : iterable of DOF objects + The degrees of freedom that the agent can control, which determine the output of the model. + objectives : iterable of Objective objects + The objectives which the agent will try to optimize. + detectors : iterable of ophyd objects + Detectors to trigger during acquisition. + acquisition_plan : optional + A plan that samples the beamline for some given inputs. + digestion : + A function to digest the output of the acquisition, taking a DataFrame as an argument. + digestion_kwargs : + Some kwargs for the digestion function. + db : optional + A databroker instance. + verbose : bool + To be verbose or not. + tolerate_acquisition_errors : bool + Whether to allow errors during acquistion. If `True`, errors will be caught as warnings. + sample_center_on_init : bool + Whether to sample the center of the DOF limits when the agent has no data yet. + trigger_delay : float + How many seconds to wait between moving DOFs and triggering detectors. + """ + + # DOFs are parametrized by whether they are active and whether they are read-only + # + # below are the behaviors of DOFs of each kind and mode: + # + # 'read': the agent will read the input on every acquisition (all dofs are always read) + # 'move': the agent will try to set and optimize over these (there must be at least one of these) + # 'input' means that the agent will use the value to make its posterior + # + # + # not read-only read-only + # +---------------------+---------------+ + # active | read, input, move | read, input | + # +---------------------+---------------+ + # inactive | read | read | + # +---------------------+---------------+ + # + # + + super().__init__( + dofs=dofs, + objectives=objectives, + acquistion_plan=acquistion_plan, + digestion=digestion, + digestion_kwargs=digestion_kwargs, + verbose=verbose, + enforce_all_objectives_valid=enforce_all_objectives_valid, + exclude_pruned=exclude_pruned, + model_inactive_objectives=model_inactive_objectives, + tolerate_acquisition_errors=tolerate_acquisition_errors, + sample_center_on_init=sample_center_on_init, + ) + + self.detectors = list(np.atleast_1d(detectors or [])) + + self.db = db + + self.train_every = train_every + self.trigger_delay = trigger_delay + + self.initialized = False + self.a_priori_hypers = None - points = candidates[..., ~active_dofs.read_only] - read_only_values = candidates[..., active_dofs.read_only] + self.n_last_trained = 0 - duration = 1e3 * (ttime.monotonic() - start_time) + @property + def table(self): + return self._table - if route and n > 1: - current_points = np.array([dof.readback for dof in active_dofs if not dof.read_only]) - travel_expenses = np.array([dof.travel_expense for dof in active_dofs if not dof.read_only]) - routing_index = utils.route(current_points, points, dim_weights=travel_expenses) - points = points[routing_index] + def __iter__(self): + for index in range(len(self)): + yield self.dofs[index] - if upsample > 1: - if n == 1: - raise ValueError("Cannot upsample points unless n > 1.") - idx = np.arange(len(points)) - upsampled_idx = np.linspace(0, len(idx) - 1, upsample * len(idx) - 1) - points = sp.interpolate.interp1d(idx, points, axis=0)(upsampled_idx) + def __getattr__(self, attr): + acqf_config = acquisition.parse_acqf_identifier(attr, strict=False) + if acqf_config is not None: + acqf, _ = _construct_acqf(agent=self, acqf_name=acqf_config["name"]) + return acqf + raise AttributeError(f"No attribute named '{attr}'.") - res = { - "points": {dof.name: list(points[..., i]) for i, dof in enumerate(active_dofs(read_only=False))}, - "acqf_name": acqf_config["name"], - "acqf_obj": list(np.atleast_1d(acqf_obj.numpy())), - "acqf_kwargs": acqf_kwargs, - "duration_ms": duration, - "sequential": sequential, - "upsample": upsample, - "read_only_values": read_only_values, - # "posterior": p, - } + def refresh(self): + self._construct_all_models() + self._train_all_models() - return res + def redigest(self): + self._table = self.digestion(self._table, **self.digestion_kwargs) - def tell( - self, - data: Optional[Mapping] = {}, - x: Optional[Mapping] = {}, - y: Optional[Mapping] = {}, - metadata: Optional[Mapping] = {}, - append: bool = True, - update_models: bool = True, - train: bool = None, - ): + def sample(self, n: int = DEFAULT_MAX_SAMPLES, normalize: bool = False, method: str = "quasi-random") -> torch.Tensor: """ - Inform the agent about new inputs and targets for the model. - - If run with no arguments, it will just reconstruct all the models. + Returns a (..., 1, n_active_dofs) tensor of points sampled within the parameter space. Parameters ---------- - x : dict - A dict keyed by the name of each DOF, with a list of values for each DOF. - y : dict - A dict keyed by the name of each objective, with a list of values for each objective. - append: bool - If `True`, will append new data to old data. If `False`, will replace old data with new data. - train: bool - Whether to train the models on construction. - hypers: - A dict of hyperparameters for the model to assume a priori, instead of training. + n : int + How many points to sample. + method : str + How to sample the points. Must be one of 'quasi-random', 'random', or 'grid'. + normalize: bool + If True, sample the unit hypercube. If False, sample the parameter space of the agent. """ - if not data: - if not x and y: - raise ValueError("Must supply either x and y, or data.") - data = {**x, **y, **metadata} - - data = {k: list(np.atleast_1d(v)) for k, v in data.items()} - unique_field_lengths = {len(v) for v in data.values()} - - if len(unique_field_lengths) > 1: - raise ValueError("All supplies values must be the same length!") - - new_table = pd.DataFrame(data) - self._table = pd.concat([self._table, new_table]) if append else new_table - self._table.index = np.arange(len(self._table)) + active_dofs = self.dofs(active=True) - if update_models: - objectives_to_model = self.objectives if self.model_inactive_objectives else self.objectives(active=True) - for obj in objectives_to_model: - t0 = ttime.monotonic() + if method == "quasi-random": + X = utils.normalized_sobol_sampler(n, d=len(active_dofs)) - cached_hypers = obj.model.state_dict() if hasattr(obj, "_model") else None - n_before_tell = obj.n_valid - self._construct_model(obj) - n_after_tell = obj.n_valid + elif method == "random": + X = torch.rand(size=(n, 1, len(active_dofs))) - if train is None: - train = int(n_after_tell / self.train_every) > int(n_before_tell / self.train_every) + elif method == "grid": + n_side_if_settable = int(np.power(n, 1 / np.sum(~active_dofs.read_only))) + sides = [ + torch.linspace(0, 1, n_side_if_settable) if not dof.read_only else torch.zeros(1) for dof in active_dofs + ] + X = torch.cat([x.unsqueeze(-1) for x in torch.meshgrid(sides, indexing="ij")], dim=-1).unsqueeze(-2).double() - if len(obj.model.train_targets) >= 4: - if train: - t0 = ttime.monotonic() - train_model(obj.model) - if self.verbose: - logger.debug(f"trained model '{obj.name}' in {1e3 * (ttime.monotonic() - t0):.00f} ms") + else: + raise ValueError("'method' argument must be one of ['quasi-random', 'random', 'grid'].") - else: - train_model(obj.model, hypers=cached_hypers) + return X.double() if normalize else self.dofs(active=True).untransform(X).double() def learn( self, @@ -701,52 +1122,6 @@ def all_objectives_valid(self): """A mask of whether all objectives are valid for each data point.""" return ~torch.isnan(self.scalarized_fitnesses()) - def _construct_model(self, obj, skew_dims=None): - """ - Construct an untrained model for an objective. - """ - - skew_dims = skew_dims if skew_dims is not None else self._latent_dim_tuples(obj.name) - - train_inputs = self.train_inputs(active=True) - train_targets = self.train_targets()[obj.name].unsqueeze(-1) - - inputs_are_trusted = ~torch.isnan(train_inputs).any(axis=1) - targets_are_trusted = ~torch.isnan(train_targets).any(axis=1) - - trusted = inputs_are_trusted & targets_are_trusted & ~self.pruned_mask() - - obj._model = construct_single_task_model( - X=train_inputs[trusted], - y=train_targets[trusted], - min_noise=obj.min_noise, - max_noise=obj.max_noise, - skew_dims=self._latent_dim_tuples()[obj.name], - ) - - obj.model_dofs = set(self.dofs(active=True).names) # if these change, retrain the model on self.ask() - - if trusted.all(): - obj.validity_conjugate_model = None - obj.validity_constraint = GenericDeterministicModel(f=lambda x: torch.ones(size=x.size())[..., -1]) - - else: - dirichlet_likelihood = gpytorch.likelihoods.DirichletClassificationLikelihood( - trusted.long(), learn_additional_noise=True - ) - - obj.validity_conjugate_model = models.LatentDirichletClassifier( - train_inputs=train_inputs[inputs_are_trusted], - train_targets=dirichlet_likelihood.transformed_targets.transpose(-1, -2)[inputs_are_trusted].double(), - skew_dims=skew_dims, - likelihood=dirichlet_likelihood, - input_transform=self.input_normalization, - ) - - obj.validity_constraint = GenericDeterministicModel( - f=lambda x: obj.validity_conjugate_model.probabilities(x)[..., -1] - ) - def _construct_all_models(self): """Construct a model for each objective.""" objectives_to_construct = self.objectives if self.model_inactive_objectives else self.objectives(active=True) @@ -774,27 +1149,6 @@ def _get_acquisition_function(self, identifier, return_metadata=False): return acquisition._construct_acqf(self, identifier=identifier, return_metadata=return_metadata) - def _latent_dim_tuples(self, obj_index=None): - """ - For the objective indexed by 'obj_index', return a list of tuples, where each tuple represents - a group of DOFs to fit a latent representation to. - """ - - if obj_index is None: - return {obj.name: self._latent_dim_tuples(obj_index=obj.name) for obj in self.objectives} - - obj = self.objectives[obj_index] - - latent_group_index = {} - for dof in self.dofs(active=True): - latent_group_index[dof.name] = dof.name - for group_index, latent_group in enumerate(obj.latent_groups): - if dof.name in latent_group: - latent_group_index[dof.name] = group_index - - u, uinv = np.unique(list(latent_group_index.values()), return_inverse=True) - return [tuple(np.where(uinv == i)[0]) for i in range(len(u))] - @property def sample_domain(self): """ @@ -804,23 +1158,6 @@ def sample_domain(self): """ return self.dofs(active=True).transform(self.dofs(active=True).search_domain.T).clone() - @property - def input_normalization(self): - """ - Suitably transforms model inputs to the unit hypercube. - - For modeling: - - Always normalize between min and max values. This is always inside the trust domain, sometimes smaller. - - For sampling: - - Settable: normalize between search bounds - Read-only: constrain to the readback value - """ - - return Normalize(d=self.dofs.active.sum()) - def save_data(self, path="./data.h5"): """ Save the sampled inputs and targets of the agent to a file, which can be used @@ -923,70 +1260,6 @@ def all_acqfs(self): """ return acquisition.all_acqfs() - def raw_inputs(self, index=None, **subset_kwargs): - """ - Get the raw, untransformed inputs for a DOF (or for a subset). - """ - if index is None: - return torch.cat([self.raw_inputs(dof.name) for dof in self.dofs(**subset_kwargs)], dim=-1) - return torch.tensor(self._table.loc[:, self.dofs[index].name].values, dtype=torch.double).unsqueeze(-1) - - def train_inputs(self, index=None, **subset_kwargs): - """A two-dimensional tensor of all DOF values.""" - - if index is None: - return torch.cat([self.train_inputs(index=dof.name) for dof in self.dofs(**subset_kwargs)], dim=-1) - - dof = self.dofs[index] - raw_inputs = self.raw_inputs(index=index, **subset_kwargs) - - # check that inputs values are inside acceptable values - valid = (raw_inputs >= dof._trust_domain[0]) & (raw_inputs <= dof._trust_domain[1]) - raw_inputs = torch.where(valid, raw_inputs, np.nan) - - return dof._transform(raw_inputs) - - def raw_targets(self, index=None, **subset_kwargs): - """ - Get the raw, untransformed inputs for an objective (or for a subset). - """ - values = {} - - for obj in self.objectives(**subset_kwargs): - # return torch.cat([self.raw_targets(index=obj.name) for obj in self.objectives(**subset_kwargs)], dim=-1) - values[obj.name] = torch.tensor(self._table.loc[:, obj.name].values, dtype=torch.double) - - return values - - def train_targets(self, concatenate=False, **subset_kwargs): - """Returns the values associated with an objective name.""" - - targets_dict = {} - raw_targets_dict = self.raw_targets(**subset_kwargs) - - for obj in self.objectives(**subset_kwargs): - y = raw_targets_dict[obj.name] - - # check that targets values are inside acceptable values - valid = (y >= obj._trust_domain[0]) & (y <= obj._trust_domain[1]) - y = torch.where(valid, y, np.nan) - - targets_dict[obj.name] = obj._transform(y) - - if self.enforce_all_objectives_valid: - all_valid_mask = True - - for name, values in targets_dict.items(): - all_valid_mask &= ~values.isnan() - - for name in targets_dict.keys(): - targets_dict[name] = targets_dict[name].where(all_valid_mask, np.nan) - - if concatenate: - return torch.cat([values.unsqueeze(-1) for values in targets_dict.values()], axis=-1) - - return targets_dict - @property def best(self): """Returns all data for the best point.""" @@ -1100,9 +1373,3 @@ def prune(self, pruning_objs=[], thresholds=[]): ) self.refresh() # return self._table["prune"] - - # @property - def pruned_mask(self): - if self.exclude_pruned and "prune" in self._table.columns: - return torch.tensor(self._table.prune.values.astype(bool)) - return torch.zeros(len(self._table)).bool()