Skip to content

Commit

Permalink
Fix noise, disturbance and filters
Browse files Browse the repository at this point in the history
  • Loading branch information
joseph-hellerstein committed May 19, 2024
1 parent dac3e3b commit e255e3c
Show file tree
Hide file tree
Showing 9 changed files with 1,260 additions and 360 deletions.
1,259 changes: 974 additions & 285 deletions examples/Analysis-of-Immune-Response-Pneumocci.ipynb

Large diffs are not rendered by default.

174 changes: 164 additions & 10 deletions notebooks/differential_control.ipynb

Large diffs are not rendered by default.

57 changes: 40 additions & 17 deletions src/controlSBML/antimony_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ def _makeInputOutputName(self, prefix, suffix):
self.closed_loop_symbols.append(name_ot)
return name_in, name_ot

def makeFilterElement(self, kF:float, prefix:str="filter", suffix:str=""):
def makeFilterElement(self, kF:float, prefix:str="filter", suffix:str="", kF_nofilter=100):
"""
Makes a filter. prefix + suffix + IN is the input and prefix + suffix + OT is the output.
Prefix is used to scope within a control loop. Suffix is used to scope between control loops.
Expand All @@ -348,6 +348,7 @@ def makeFilterElement(self, kF:float, prefix:str="filter", suffix:str=""):
kF: float
prefix: str (beginning of the name)
suffix: str (ending of the name)
kF_nofilter: float (value of kF when no filter is used)
Returns:
str: name of the filter input
str: name of the filter output
Expand All @@ -358,17 +359,18 @@ def makeFilterElement(self, kF:float, prefix:str="filter", suffix:str=""):
self.makeAddition(name_in, "S3") # S3 is the output of the system
self.makeAddition("control_error", setpoint, "-"+name_ot)
"""
#
self.addStatement("")
self.makeComment("Make filter: kF=%s" % (str(kF)))
kF_name = self._makeScopedName("kF", suffix)
name_in, name_ot = self._makeInputOutputName(prefix, suffix)
if (kF is None) or np.isclose(kF, 0):
kF = 10e6 # Use a large value to approximate a filter with no effect
comment = "Filter with no effect"
new_kF = kF_nofilter
else:
comment = ""
self.makeAdditionStatement(kF_name, kF, is_assignment=False, comment=comment)
calculation = "-%s*%s + %s*%s" % (kF_name, name_ot, kF_name, name_in)
new_kF = kF
self.makeAdditionStatement(kF_name, new_kF, is_assignment=False)
# Construct the filter calculation
calculation = f"-{kF_name}*{name_ot} + {kF_name}*{name_in}"
# Use a dummy reaction to integrate instead of "'" to avoid antimony limitations with
# combining rate rules and assignment rules
statement = " -> %s; %s " % (name_ot, calculation)
Expand Down Expand Up @@ -433,29 +435,42 @@ def makePIDControllerElement(self,
str: name of the controller output
"""
base_name = prefix + "_" + "%s" + suffix # type: ignore
def makeControllerScopedName(name):
scoped_name = base_name % name
self.closed_loop_symbols.append(scoped_name)
return scoped_name
#
self.addStatement("")
self.makeComment("Make the PID controller")
name_in, name_ot = self._makeInputOutputName(prefix, suffix)
# Constants for parameters
kP_name = base_name % "kP"
kI_name = base_name % "kI"
kD_name = base_name % "kD"
if kP is not None:
kP_name = makeControllerScopedName("kP")
self.makeAdditionStatement(kP_name, str(kP), is_assignment=False)
self.closed_loop_symbols.append(kP_name)
if kI is not None:
kI_name = makeControllerScopedName("kI")
self.makeAdditionStatement(kI_name, str(kI), is_assignment=False)
self.closed_loop_symbols.append(kI_name)
if kD is not None:
kD_name = makeControllerScopedName("kD")
self.makeAdditionStatement(kD_name, str(kD), is_assignment=False)
self.closed_loop_symbols.append(kD_name)
# Make the derivative of the control error
if kD is not None:
derivative_error_name = base_name % "derivative_error"
self.closed_loop_symbols.append(derivative_error_name)
if filter_calculation is None:
filter_calculation = "rateOf(%s)" % (output_name) # type: ignore
sign_filter_calculation = "%d*(%s)" % (sign, filter_calculation) # type: ignore
# Step 1: Make a dummy reaction to get the rate of the output
# J: S->S; output_name
# S = 1
# filter_calculation = rateOf(J)
reaction_name = makeControllerScopedName("reaction")
species_name = makeControllerScopedName("species")
reaction_stmt = f"{reaction_name}: {species_name} -> {species_name}"
reaction_stmt += f"; {output_name}"
self.addStatement(reaction_stmt)
self.makeAdditionStatement(species_name, 1, is_assignment=False) # rate_law = 0
# Step 2: Make the rate law
filter_calculation = f"rateOf({reaction_name})"
sign_filter_calculation = f"{sign}*{filter_calculation}"
statement = "%s := %s" % (derivative_error_name, sign_filter_calculation) # type: ignore
self.addStatement(statement)
# Make the integral of the control error
Expand All @@ -481,7 +496,8 @@ def makeSISOClosedLoopSystem(self, input_name, output_name, kP=None, kI=None, kD
noise_spec=cn.NoiseSpec(), disturbance_spec=cn.DisturbanceSpec(),
initial_output_value=None, sign=-1):
"""
Creates a closed loop system with a single input and a single output.
Creates a closed loop system with a single input and a single output. Does not create a filter
if kF is in [0, None]
Args:
input_name: str (input to system)
Expand Down Expand Up @@ -512,14 +528,21 @@ def makeSISOClosedLoopSystem(self, input_name, output_name, kP=None, kI=None, kD
controller_in, controller_ot = self.makePIDControllerElement(output_name,
filter_calculation=filter_calculation,
kP=kP, kI=kI, kD=kD, prefix="controller", suffix=suffix, sign=sign)
control_error_name = self.makeControlErrorSignal(setpoint_name, filter_ot, sign, prefix="control_error",
if filter_ot is None:
comparison_signal_str = "(" + output_name + " + " + noise_ot + ")"
else:
comparison_signal_str = filter_ot
control_error_name = self.makeControlErrorSignal(setpoint_name,
comparison_signal_str,
sign, prefix="control_error",
suffix=suffix)
# Connect the pieces by specifying assignment statements
self.addStatement("")
self.makeComment("Connect the elements of the closed loop")
self.makeAdditionStatement(controller_in, control_error_name)
self.makeAdditionStatement(input_name, controller_ot, disturbance_ot)
self.makeAdditionStatement(filter_in, output_name, noise_ot)
if filter_in is not None:
self.makeAdditionStatement(filter_in, output_name, noise_ot)

def getInputManipulationName(self, input_name):
"""
Expand Down
2 changes: 2 additions & 0 deletions src/controlSBML/control_sbml.py
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,8 @@ def plotGridDesign(self,

@staticmethod
def setSpec(val):
if isinstance(val, bool):
return val
if isinstance(val, int):
return float(val)
else:
Expand Down
12 changes: 7 additions & 5 deletions src/controlSBML/point_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,13 @@ def _calculateMse(self, **parameter_dct:dict)->Tuple[str, object]:
return cn.DESIGN_RESULT_CANNOT_SIMULATE, None
outputs = response_ts[self.output_name].values
max_value = np.max([np.max(outputs), np.abs(np.min(outputs))])
if max_value > max_output:
return cn.DESIGN_RESULT_OUTPUT_TOO_LARGE, None
min_value = np.min([np.max(outputs), np.abs(np.min(outputs))])
if min_value < min_output:
return cn.DESIGN_RESULT_OUTPUT_TOO_SMALL, None
if False:
# Disable these checks
if max_value > max_output:
return cn.DESIGN_RESULT_OUTPUT_TOO_LARGE, None
min_value = np.min([np.max(outputs), np.abs(np.min(outputs))])
if min_value < min_output:
return cn.DESIGN_RESULT_OUTPUT_TOO_SMALL, None
#
residuals = self.setpoint - response_ts[self.output_name].values
mse = np.mean(residuals**2)
Expand Down
10 changes: 8 additions & 2 deletions src/controlSBML/siso_closed_loop_designer.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,12 +322,13 @@ def getValue(val):
parallel_search.search()
search_result_df = parallel_search.getSearchResults()
if len(search_result_df) == 0:
df = pd.DataFrame([[None, None, None, None, cn.DESIGN_RESULT_CANNOT_SIMULATE]],
df = pd.DataFrame([[None, None, None, None, None, cn.DESIGN_RESULT_CANNOT_SIMULATE]],
columns=[CP_kP, CP_kI, CP_kD, CP_kF, cn.SCORE, cn.REASON])
return DesignResult(dataframe=df, max_count=0)
# Merge the results and sort by score
search_result_df = search_result_df.sort_values(cn.SCORE, ascending=True) # type: ignore
search_result_df = search_result_df.reset_index(drop=True)
result_df = search_result_df.copy()
# Handle replications of the same design parameters and select successful designs
search_result_df = search_result_df[search_result_df[cn.SCORE].notna()]
del search_result_df[cn.REASON]
Expand All @@ -342,6 +343,11 @@ def getValue(val):
sorted_mean_df = threshold_mean_df.reset_index()
sorted_mean_df = sorted_mean_df.sort_values(cn.SCORE, ascending=True)
sorted_mean_df = sorted_mean_df.reset_index()
# Check for no result
if len(sorted_mean_df) == 0:
df = pd.DataFrame([[None, None, None, None, None, cn.DESIGN_RESULT_CANNOT_SIMULATE]],
columns=[CP_kP, CP_kI, CP_kD, CP_kF, cn.SCORE, cn.REASON])
return DesignResult(dataframe=df, max_count=0)
# Record the result
self.residual_mse = sorted_mean_df.loc[0, cn.SCORE]
if CP_kP in sorted_mean_df.columns:
Expand All @@ -356,7 +362,7 @@ def getValue(val):
if self.save_path is not None:
sorted_mean_df.to_csv(self.save_path, index=False)
self._design_result_df = sorted_mean_df
return DesignResult(dataframe=sorted_mean_df, max_count=max_count)
return DesignResult(dataframe=result_df, max_count=max_count)

@property
def design_result_df(self):
Expand Down
30 changes: 26 additions & 4 deletions tests/test_antimony_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,18 @@ def setUp(self):
def init(self):
self.builder = ab.AntimonyBuilder(LINEAR_MDL, symbol_dct=SYMBOL_DCT)

def check(self, builder=None):
def check(self, builder=None, times=None):
if builder is None:
builder = self.builder
if times is None:
times = np.linspace(0, 20, 2000)
rr = te.loada(str(builder))
selections = ["time", "S1", "S2", "S3"]
if "setpoint_S1_S3" in rr.keys():
selections.append("setpoint_S1_S3")
if "noise_S1_S3_ot" in rr.keys():
selections.append("noise_S1_S3_ot")
data = rr.simulate(0,20, 2000, selections=selections)
data = rr.simulate(times[0], times[-1], len(times), selections=selections)
self.assertTrue(len(data) > 0)
if IS_PLOT:
rr.plot()
Expand Down Expand Up @@ -141,6 +143,16 @@ def testMakeFilterElement(self):
self.builder.makeAdditionStatement(filter_in, sin_ot)
self.check()

def testMakeFilterElementNofilter(self):
if IGNORE_TEST:
return
self.init()
filter_in, filter_ot, calculation = self.builder.makeFilterElement(0, suffix="_S1_S3")
noise_spec = cn.NoiseSpec(sine_amp=1, sine_freq=2)
sin_ot = self.builder.makeNoiseElement(noise_spec, suffix="_S1_S2")
self.builder.makeAdditionStatement(filter_in, sin_ot)
self.check()

def testMakeControlErrorSignal(self):
if IGNORE_TEST:
return
Expand All @@ -166,6 +178,16 @@ def testMakePIDController(self):
self.builder.makeAdditionStatement("S1", name_ot)
self.builder.makeAdditionStatement(name_in, 3, "-"+"S3")
self.check()

def testMakePIDController4(self):
if IGNORE_TEST:
return
self.init()
name_in, name_ot = self.builder.makePIDControllerElement("S3", kP=7, suffix="_S1_S3")
self.builder.makeBoundarySpecies("S1")
self.builder.makeAdditionStatement("S1", name_ot)
self.builder.makeAdditionStatement(name_in, 3, "-"+"S3")
self.check()

def testMakePIDController3(self):
# Filter without differential control
Expand Down Expand Up @@ -275,8 +297,8 @@ def testMakeSISClosedLoop(self):
return
self.init()
self.builder.makeBoundarySpecies("S1")
self.builder.makeSISOClosedLoopSystem("S1", "S3", kP=10000, kI=1, setpoint=4)
data = self.check()
self.builder.makeSISOClosedLoopSystem("S1", "S3", kP=10, kI=1, setpoint=4)
data = self.check(times=np.linspace(0, 100, 1000))
self.assertTrue(np.isclose(data["S3"][-1], 4, atol=0.01))

def testCopyAndEqual(self):
Expand Down
Loading

0 comments on commit e255e3c

Please sign in to comment.