Skip to content

Commit

Permalink
feat: don't store duplicate events in the notice queue (#1372)
Browse files Browse the repository at this point in the history
When a new event is emitted, check if there is already an exact
duplicate (the same observer, same event type, and same event snapshot)
in the storage from a deferral in a previous run. If a duplicate does
exist, then don't store a new notice and snapshot.

There are performance implications:
* For every event emitted, there's a full iteration through the notices
queue. If there are no deferred events, then the notices queue will be
empty, so in the majority of cases this should only be one additional
storage query (to get the empty result). If the deferral queue is very
large, then this may be noticeable, although the point of the change is
to reduce the length of the queue, because there are already issues when
it's large.
* For each notice in the queue (again: normally none) the snapshot is
loaded (a storage query; currently all snapshots are quite small in
size) for any events that have matching observers and kinds. If the
queue had a lot of events of the same type, it's likely the observer
would match, and if the snapshots were different (for example, many
`secret-changed` events but for different secrets) then there is a cost
to doing the comparison.
* In cases where the queue currently builds up with a lot of duplicates,
there will be a significant performance improvement, because only one of
the (notice+snapshot) events will be processed each run.

There is also a (deliberate) behaviour change that does impact event
ordering. For example, consider this sequence of events:
 1. `config-changed` deferred
 2. `secret-changed` deferred
 3. `config-changed`

Currently, this would result in:
1. `ConfigChangedEvent`
2. `ConfigChangedEvent`, `SecretChangedEvent`
3. `ConfigChangedEvent`, `SecretChangedEvent`, `ConfigChangedEvent`

With this change, this would result in:
1. `ConfigChangedEvent`
2. `ConfigChangedEvent`, `SecretChangedEvent`
3. `ConfigChangedEvent`, `SecretChangedEvent`

More generally, there could currently be duplicate (notice+snapshot)
events intermixed throughout the queue, and each run they will be
re-emitted in the order in which they originally occurred. With this
change, they will be re-emitted in the order in which they originally
occurred, *except if they have already emitted this run*. The
particularly noticeable change is that the Juju event that triggered the
run may not be the last event (if it was a duplicate of one in the
queue).

We could potentially do this differently - for example, updating the
sequence so that when a duplicate occurs it moves the event to the end
of the queue (by dropping and adding the notice+snapshot, or explicitly
setting the `sequence` field for SQL and by just reordering the list for
Juju). This would add complexity and have a performance penalty,
however, and it seems more correct to have the original order.

For unit tests:

* Harness: this change is incompatible with some Harness use -
specifically, if the test code emits the same event more than once,
where it's deferred at least once, there is no `reemit()` call like
there would be in production, and the test expects the handler to be
called more than once. For this reason, the skipping is disabled for
Harness.
* Scenario: Scenario is more explicit with deferred events, so if you
want to have had the 'skipping' behaviour occur before the event you are
`ctx.run`ing then you need to manage that in the list of deferred events
passed into the State. We could add a consistency check to alert if
there are duplicates in that list (this would be easier to do when the
Scenario code is in this repo). However, the Scenario behaviour does
still change: if the event is deferred in the `ctx.run` and there's
already a match in the state's deferred list, then the new (deferred)
event does not get added, and the output state doesn't change (which is
what we want). We get this behaviour automatically because Scenario
mimics the runtime behaviour more closely, actually running the
framework emitting/reemitting. So: Scenario tests are both safer, and
can be used to match the new behaviour.

This can be tested manually with a charm that optionally defers events,
e.g. with the code below.

<details>

<summary>charm.py and charmcraft.yaml content</summary>

```python
class NoticeQueueCharm(ops.CharmBase):
    def __init__(self, framework: ops.Framework):
        super().__init__(framework)
        framework.observe(self.on.config_changed, self._on_config_changed)
        framework.observe(self.on.secret_changed, self._on_secret_changed)

    def _on_config_changed(self, event):
        logger.info("Running config-changed")
        if self.config.get("secretopt"):
            secret = self.model.get_secret(id=self.config["secretopt"])
            # Get the content so we are 'subscribed' to the updates.
            secret.get_content()
        if self.config.get("secretopt2"):
            secret = self.model.get_secret(id=self.config["secretopt2"])
            # Get the content so we are 'subscribed' to the updates.
            secret.get_content()
        if self.config.get("opt", "").startswith("defer"):
            event.defer()

    def _on_secret_changed(self, event):
        logger.info("Running secret-changed")
        if self.config.get("opt", "").startswith("defer"):
            event.defer()
```

```yaml
config:
  options:
    opt:
      description: dummy option to trigger config-changed
    secretopt:
      type: secret
      description: a user secret
    secretopt2:
      type: secret
      description: a user secret
```

If you want to see the queue while you're doing this, you can use code
like this:

```python
        store = self.framework._storage
        for event_path, observer_path, method_name in store.notices(None):
            handle = ops.Handle.from_path(event_path)
            snapshot_data = store.load_snapshot(handle.path)
            logger.info(
                "event_path: %s, observer_path: %s, method_name: %s, snapshot data: %r",
                event_path, observer_path, method_name, snapshot_data,
            )
```

</details>

If you set `opt` to anything not starting with "defer" then you should
get a `config-changed` event every time. If you set it to something
starting with "defer", then it will run exactly once each time you set
the config (remember to change the value, or Juju will skip the event) -
with ops@main you'll instead get a `config-changed` event for every time
that you change the config, every time you change it (ie. the queue will
build up).

You can also check it with an event that has a snapshot, such as
`secret-changed`. If the config is set to defer, every time you change
the content of the first secret, you'll get one `secret-changed` event
(but with ops@main, each time you'll get multiple, depending on how many
times you've done it). If you also change the second secret, you'll get
two `secret-changed` events, one for each secret (because the snapshots
differ).

You can intermix the different events and should always have exactly
zero or one (event type+snapshot) in the queue. If you change the `opt`
value back to something not starting with "defer", then you should see
all the events complete and have an empty queue.

<details>

<summary>Scenario test that shows the behaviour</summary>

```python
import ops
from ops import testing

class MyCharm(ops.CharmBase):
    def __init__(self, framework):
        super().__init__(framework)
        framework.observe(self.on.secret_changed, self._on_sc)
        framework.observe(self.on.update_status, self._on_us)

    def _on_us(self, event):
        print("update-status", event)

    def _on_sc(self, event):
        print("secret-changed", event)
        event.defer()


ctx = testing.Context(MyCharm, meta={"name": "foo"})
secret = testing.Secret({"foo": "bar"})
devent1 = ctx.on.update_status().deferred(MyCharm._on_us)
devent2 = ctx.on.secret_changed(secret).deferred(MyCharm._on_sc)
state_in = testing.State(secrets={secret}, deferred=[devent1, devent2])
state_out = ctx.run(ctx.on.secret_changed(secret), state_in)

assert state_out.unit_status == testing.UnknownStatus()
print(state_out.deferred)
```

Note that this requires fixing a small bug in Secret snapshoting
([PR](tonyandrewmeyer/ops-scenario#7)). That's
unrelated to this change - it also occurs if you use deferred secrets in
main.

</details>

Fixes #935

---------

Co-authored-by: Dima Tisnek <dima.tisnek@canonical.com>
Co-authored-by: Ben Hoyt <benhoyt@gmail.com>
Co-authored-by: Dima Tisnek <dimaqq@gmail.com>
  • Loading branch information
4 people authored Jan 22, 2025
1 parent 97a8f6d commit f12df01
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 12 deletions.
6 changes: 6 additions & 0 deletions ops/_private/harness.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,12 @@ def __init__(
self._meta,
self._model,
juju_debug_at=self._juju_context.debug_at,
# Harness tests will often have defer() usage without 'purging' the
# deferred handler with reemit(), but still expect the next emit()
# to result in a call, so we can't safely skip duplicate events.
# When behaviour matching production is required, Scenario tests
# should be used instead.
skip_duplicate_events=False,
)

warnings.warn(
Expand Down
63 changes: 57 additions & 6 deletions ops/framework.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,7 @@ def __init__(
model: 'Model',
event_name: Optional[str] = None,
juju_debug_at: Optional[Set[str]] = None,
skip_duplicate_events: bool = True,
):
super().__init__(self, None)

Expand All @@ -624,6 +625,7 @@ def __init__(
self._event_name = event_name
self.meta = meta
self.model = model
self.skip_duplicate_events = skip_duplicate_events
# [(observer_path, method_name, parent_path, event_key)]
self._observers: _ObserverPath = []
# {observer_path: observing Object}
Expand Down Expand Up @@ -719,15 +721,15 @@ def register_type(
self._type_registry[parent_path, kind_] = cls
self._type_known.add(cls)

def save_snapshot(self, value: Union['StoredStateData', 'EventBase']):
"""Save a persistent snapshot of the provided value."""
def _validate_snapshot_data(
self, value: Union['StoredStateData', 'EventBase'], data: Dict[str, Any]
):
if type(value) not in self._type_known:
raise RuntimeError(
f'cannot save {type(value).__name__} values before registering that type'
)
data = value.snapshot()

# Use marshal as a validator, enforcing the use of simple types, as we later the
# Use marshal as a validator, enforcing the use of simple types, as later the
# information is really pickled, which is too error-prone for future evolution of the
# stored data (e.g. if the developer stores a custom object and later changes its
# class name; when unpickling the original class will not be there and event
Expand All @@ -738,6 +740,10 @@ def save_snapshot(self, value: Union['StoredStateData', 'EventBase']):
msg = 'unable to save the data for {}, it must contain only simple types: {!r}'
raise ValueError(msg.format(value.__class__.__name__, data)) from None

def save_snapshot(self, value: Union['StoredStateData', 'EventBase']):
"""Save a persistent snapshot of the provided value."""
data = value.snapshot()
self._validate_snapshot_data(value, data)
self._storage.save_snapshot(value.handle.path, data)

def load_snapshot(self, handle: Handle) -> Serializable:
Expand Down Expand Up @@ -831,6 +837,35 @@ def _next_event_key(self) -> str:
self._stored['event_count'] += 1
return str(self._stored['event_count'])

def _event_is_in_storage(
self, observer_path: str, method_name: str, event_path: str, event_data: Dict[str, Any]
) -> bool:
"""Check if there is already a notice with the same snapshot in the storage."""
# Check all the notices to see if there is one that is the same other
# than the event ID.
for (
existing_event_path,
existing_observer_path,
existing_method_name,
) in self._storage.notices():
if (
existing_observer_path != observer_path
or existing_method_name != method_name
# The notices all have paths that include [id] at the end. If one
# was somehow missing, then the split would be the empty string and
# match anyway.
or existing_event_path.split('[')[0] != event_path.split('[')[0]
):
continue
# Check if the snapshot for this notice is the same.
try:
existing_event_data = self._storage.load_snapshot(existing_event_path)
except NoSnapshotError:
existing_event_data = {}
if event_data == existing_event_data:
return True
return False

def _emit(self, event: EventBase):
"""See BoundEvent.emit for the public way to call this."""
saved = False
Expand All @@ -839,17 +874,33 @@ def _emit(self, event: EventBase):
parent = event.handle.parent
assert isinstance(parent, Handle), 'event handle must have a parent'
parent_path = parent.path
this_event_data = event.snapshot()
self._validate_snapshot_data(event, this_event_data)
# TODO Track observers by (parent_path, event_kind) rather than as a list of
# all observers. Avoiding linear search through all observers for every event
# all observers. Avoiding linear search through all observers for every event
for observer_path, method_name, _parent_path, _event_kind in self._observers:
if _parent_path != parent_path:
continue
if _event_kind and _event_kind != event_kind:
continue
if self.skip_duplicate_events and self._event_is_in_storage(
observer_path, method_name, event_path, this_event_data
):
logger.info(
'Skipping notice (%s/%s/%s) - already in the queue.',
event_path,
observer_path,
method_name,
)
# We don't need to save a new notice and snapshot, but we do
# want the event to run, because it has been saved previously
# and not completed.
saved = True
continue
if not saved:
# Save the event for all known observers before the first notification
# takes place, so that either everyone interested sees it, or nobody does.
self.save_snapshot(event)
self._storage.save_snapshot(event.handle.path, this_event_data)
saved = True
# Again, only commit this after all notices are saved.
self._storage.save_notice(event_path, observer_path, method_name)
Expand Down
127 changes: 121 additions & 6 deletions test/test_framework.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,15 @@ def test_defer_and_reemit(self, request: pytest.FixtureRequest):
framework = create_framework(request)

class MyEvent(ops.EventBase):
pass
def __init__(self, handle: ops.Handle, data: str):
super().__init__(handle)
self.data: str = data

def restore(self, snapshot: typing.Dict[str, typing.Any]):
self.data = typing.cast(str, snapshot['data'])

def snapshot(self) -> typing.Dict[str, typing.Any]:
return {'data': self.data}

class MyNotifier1(ops.Object):
a = ops.EventSource(MyEvent)
Expand Down Expand Up @@ -404,18 +412,18 @@ def on_any(self, event: ops.EventBase):
framework.observe(pub1.b, obs2.on_any)
framework.observe(pub2.c, obs2.on_any)

pub1.a.emit()
pub1.b.emit()
pub2.c.emit()
pub1.a.emit('a')
pub1.b.emit('b')
pub2.c.emit('c')

# Events remain stored because they were deferred.
# Events remain stored because they were deferred (and distinct).
ev_a_handle = ops.Handle(pub1, 'a', '1')
framework.load_snapshot(ev_a_handle)
ev_b_handle = ops.Handle(pub1, 'b', '2')
framework.load_snapshot(ev_b_handle)
ev_c_handle = ops.Handle(pub2, 'c', '3')
framework.load_snapshot(ev_c_handle)
# make sure the objects are gone before we reemit them
# Make sure the objects are gone before we reemit them.
gc.collect()

framework.reemit()
Expand All @@ -439,6 +447,113 @@ def on_any(self, event: ops.EventBase):
pytest.raises(NoSnapshotError, framework.load_snapshot, ev_b_handle)
pytest.raises(NoSnapshotError, framework.load_snapshot, ev_c_handle)

def test_repeated_defer(self, request: pytest.FixtureRequest):
framework = create_framework(request)

class MyEvent(ops.EventBase):
data: typing.Optional[str] = None

class MyDataEvent(MyEvent):
def __init__(self, handle: ops.Handle, data: str):
super().__init__(handle)
self.data: typing.Optional[str] = data

def restore(self, snapshot: typing.Dict[str, typing.Any]):
self.data = typing.cast(typing.Optional[str], snapshot['data'])

def snapshot(self) -> typing.Dict[str, typing.Any]:
return {'data': self.data}

class ReleaseEvent(ops.EventBase):
pass

class MyNotifier(ops.Object):
n = ops.EventSource(MyEvent)
d = ops.EventSource(MyDataEvent)
r = ops.EventSource(ReleaseEvent)

class MyObserver(ops.Object):
def __init__(self, parent: ops.Object, key: str):
super().__init__(parent, key)
self.defer_all = True

def stop_deferring(self, _: MyEvent):
self.defer_all = False

def on_any(self, event: MyEvent):
if self.defer_all:
event.defer()

pub = MyNotifier(framework, 'n')
obs1 = MyObserver(framework, '1')
obs2 = MyObserver(framework, '2')

framework.observe(pub.n, obs1.on_any)
framework.observe(pub.n, obs2.on_any)
framework.observe(pub.d, obs1.on_any)
framework.observe(pub.d, obs2.on_any)
framework.observe(pub.r, obs1.stop_deferring)

# Emit an event, which will be deferred.
pub.d.emit('foo')
notices = tuple(framework._storage.notices())
assert len(notices) == 2 # One per observer.
assert framework._storage.load_snapshot(notices[0][0]) == {'data': 'foo'}

# Emit the same event, and we'll still just have the single notice.
pub.d.emit('foo')
assert len(tuple(framework._storage.notices())) == 2

# Emit the same event kind but with a different snapshot, and we'll get a new notice.
pub.d.emit('bar')
notices = tuple(framework._storage.notices())
assert len(notices) == 4
assert framework._storage.load_snapshot(notices[2][0]) == {'data': 'bar'}

# Emit a totally different event, and we'll get a new notice.
pub.n.emit()
notices = tuple(framework._storage.notices())
assert len(notices) == 6
assert framework._storage.load_snapshot(notices[2][0]) == {'data': 'bar'}
assert framework._storage.load_snapshot(notices[4][0]) == {}

# Even though these events are far back in the queue, since they're
# duplicates, they will get skipped.
pub.d.emit('foo')
pub.d.emit('bar')
pub.n.emit()
assert len(tuple(framework._storage.notices())) == 6

def notices_for_observer(n: int):
return [
notice for notice in framework._storage.notices() if notice[1].endswith(f'[{n}]')
]

# Stop deferring on the first observer, and all those events will be
# completed and the notices removed, while the second observer will
# still have them queued.
pub.r.emit()
assert len(tuple(framework._storage.notices())) == 6
pub.n.emit()
framework.reemit()
assert len(notices_for_observer(1)) == 0
assert len(notices_for_observer(2)) == 3

# Without the defer active, the first observer always ends up with an
# empty queue, while the second observer's queue continues to skip
# duplicates and add new events.
pub.d.emit('foo')
pub.d.emit('foo')
pub.d.emit('bar')
pub.n.emit()
pub.d.emit('foo')
pub.d.emit('bar')
pub.n.emit()
pub.d.emit('baz')
framework.reemit()
assert len(notices_for_observer(1)) == 0
assert len(notices_for_observer(2)) == 4

def test_custom_event_data(self, request: pytest.FixtureRequest):
framework = create_framework(request)

Expand Down

0 comments on commit f12df01

Please sign in to comment.