Skip to content

Commit

Permalink
docs: add event-bus and integration docs
Browse files Browse the repository at this point in the history
Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com>
  • Loading branch information
tiagolobocastro committed Feb 7, 2025
1 parent 9f6db63 commit eb89d30
Show file tree
Hide file tree
Showing 11 changed files with 773 additions and 25 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ $ io-engine-client pool destroy tpool
- [Testing](./doc/contributor.md#testing)
- [Building](./doc/build-all.md)
- [CSI Workflow](./doc/csi.md)
- [Design Docs](./doc/design/)

## Features

Expand Down
2 changes: 1 addition & 1 deletion doc/design/control-plane-behaviour.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ decision accordingly. An in-memory registry is used to store such information.

Because the registry is stored in memory, it is volatile - meaning all information is lost if the service is restarted.
As a consequence critical information must be backed up to a highly available persistent store (for more detailed
information see [persistent-store.md](./persistent-store.md)).
information see [persistent-store.md](./control-plane.md#persistent-store-kvstore-for-configuration-data)).

The types of data that need persisting broadly fall into 3 categories:

Expand Down
149 changes: 125 additions & 24 deletions doc/design/control-plane.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,56 @@ sequenceDiagram
<br>

### Internal Communication

<br>

```mermaid
graph LR;
subgraph Agents[" "]
HACluster["HA Cluster Agent"]
Core["Core Agent"]
REST
end
subgraph StorageNode["Storage Node"]
subgraph DataPlane["I/O Engine"]
Nexus
Replicas
Pools
end
end
subgraph AppNode["Application Node"]
HANode["HA Node Agent"]
CSINode["CSI Node Plugin"]
end
REST["Rest Server"] -->|gRPC| Core
Core <-->|gRPC| DataPlane
HACluster <-->|gRPC| HANode
HACluster -->|gRPC| Core
HANode -.->|UNIX Socket| CSINode
CSIController -->|HTTP/REST| REST
```

<br>

As shown, there's many p2p connections between the different services.

The control-plane agents talk to each other via [gRPC] using well-defined APIs consisting of different service definitions.
As we're using [gRPC], API is described in [protobuf] which is the default definition language for `gRPC`.

Here's a table containing all protobuf definitions for mayastor:

| API | Protobuf Definitions |
|------------|--------------------------------------------------------------------------------------------------|
| I/O Engine | <https://github.com/openebs/mayastor-dependencies/tree/HEAD/apis/io-engine/protobuf/v1> |
| Agents | <https://github.com/openebs/mayastor-control-plane/tree/HEAD/control-plane/grpc/proto/v1> |
| Events | <https://github.com/openebs/mayastor-dependencies/blob/HEAD/apis/events/protobuf/v1/event.proto> |

<br>

## Reconcilers

Reconcilers implement the logic that drives the desired state to the actual state. In principle it's the same model as the operator framework provided by K8s, however as mentioned, it's tailored towards storage rather than stateless containers.
Expand Down Expand Up @@ -334,9 +384,10 @@ The value add is not the ANA feature itself, rather what you do with it.

## NATS & Fault management

We used to use NATS as a message bus within mayastor as a whole, but as since switched for gRPC for p2p communications. \
We will continue to use NATS for async notifications. Async in the sense that we send a message, but we do NOT wait for a reply. This mechanism does not
do any form of "consensus," retries, and the likes. Information transported over NATS will typically be error telemetry that is used to diagnose problems. No work has started yet on this subject.
We used to use [NATS] as a message bus within mayastor as a whole, but later switched to [gRPC] for p2p communications. \
As of today, we use [NATS] for [events](./events.md) as async notifications via the [Publish/Subscribe Model](https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern). Our current mechanism does not do any form of "consensus," retries, and the likes. Information transported over NATS will typically be error telemetry that is used to diagnose problems.

> NOTE: We don't have any subscribers as things stand.
At a high level, error detectors are placed in code parts where makes sense; for example, consider the following:

Expand Down Expand Up @@ -391,7 +442,7 @@ err.io.nvme.transport.* = {}
err.io.nexus.* = {}
```

Subscribes to these events will keep track of payloads and apply corrective actions. In its most simplistic form, it results in a model where one can
Subscribers to these events will keep track of payloads and apply corrective actions. In its most simplistic form, it results in a model where one can
define a per class for error an action that needs to be taken. This error handling can be applied to IO but also agents.

The content of the event can vary, containing some general metadata fields, as well as event specific information.
Expand All @@ -411,21 +462,22 @@ message EventMessage {
}
```

Read more about eventing [here](./events.md).
An up to date API of the event format can be fetched
[here](https://github.com/openebs/mayastor-dependencies/blob/develop/apis/events/protobuf/v1/event.proto).
[here](https://github.com/openebs/mayastor-dependencies/blob/HEAD/apis/events/protobuf/v1/event.proto).

## Distributed Tracing
## Tracing and Telemetry

Tracing means different things at different levels. In this case, we are referring to tracing component boundary tracing.

Tracing is by default implemented using open telemetry and, by default, we have provided a subscriber for jaeger. From jaeger, the information can be
forwarded to, Elastic Search, Cassandra, Kafka, or whatever. In order to achieve full tracing support, all the gRPC requests and replies should add
HTTP headers such that we can easily tie them together in whatever tooling is used. This is standard practice but requires a significant amount of work.
The key reason is to ensure that all requests and responses pass along the headers, from REST to the scheduling pipeline.
Tracing is by default implemented using open telemetry and, by default, we have provided a subscriber for [Jaeger]. From [Jaeger], the information can be
forwarded to, Elastic Search, Cassandra, Kafka, etc. In order to achieve full tracing support, all the [gRPC] requests and replies should add
`HTTP` headers such that we can easily tie them together in whatever tooling is used. This is standard practice but requires a significant amount of work.
The key reason is to ensure that all requests and responses pass along the headers, from `REST` to the scheduling pipeline.

We also need to support several types of transport and serialization mechanisms. For example, HTTP/1.1 REST requests to HTTP/2 gRCP request to
We also need to support several types of transport and serialization mechanisms. For example, `HTTP/1.1 REST` requests to `HTTP/2 gRPC` request to
a KV store operation to etcd. For this, we will use [Tower]. \
[Tower] provides a not-so-easy to use an abstraction of Request to Response mapping.
[Tower] provides a not-so-easy to use an abstraction of request to response mapping.

```rust
pub trait Service<Request> {
Expand All @@ -447,9 +499,9 @@ The provided services can then be layered with additional functions that add the

```rust
pub trait Layer<S> {
/// The service for which we want to insert a new layer
/// The service for which we want to insert a new layer.
type Service;
/// the implementation of the layer itself
/// the implementation of the layer itself.
fn layer(&self, inner: S) -> Self::Service;
}
```
Expand All @@ -458,23 +510,72 @@ An example where a `REST` client sets the open tracing key/values on the request

```rust
let layer = TraceLayer::new_for_http().make_span_with(|request: &Request<Body>| {
tracing::debug_span!(
"HTTP",
http.method = %request.method(),
http.url = %request.uri(),
http.status_code = tracing::field::Empty,
// otel is a mandatory key/value
otel.name = %format!("HTTP {}", request.method()),
otel.kind = %SpanKind::Client,
otel.status_code = tracing::field::Empty,
)
tracing::debug_span!(
"HTTP",
http.method = %request.method(),
http.url = %request.uri(),
http.status_code = tracing::field::Empty,
// otel is a mandatory key/value
otel.name = %format!("HTTP {}", request.method()),
otel.kind = %SpanKind::Client,
otel.status_code = tracing::field::Empty,
)
})
```

On the server-side we extract the trace id from the `HTTP` headers, and we inject it on the next call stack, which means it also gets eventually
injected in the next transport hop. Specifically for `REST`, this means we inject it on the inter-service `gRPC`. Again, we use the same `Service`
trait!

```rust
impl tower::Service<TonicClientRequest> for OpenTelClientService<Channel> {
...

fn call(&mut self, mut request: TonicClientRequest) -> Self::Future {
let tracer = global::tracer("grpc-client");
let context = tracing::Span::current().context();
...

let context = context.with_span(span);
global::get_text_map_propagator(|propagator| {
propagator.inject_context(&context, &mut HeaderInjector(request.headers_mut()))
});
trace_http_service_call(&mut self.service, request, context)
}
}
```

How do these traces get sent to [Jaeger] (or any other telemetry sink)? We setup an opentelemetry exporter on **every** service which is receiving and/or sending the tracing id's:

```rust
opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter().tonic().with_endpoint(endpoint)
)
.with_trace_config(
sdktrace::Config::default().with_resource(Resource::new(tracing_tags))
)
.install_batch(opentelemetry_sdk::runtime::TokioCurrentThread)
.expect("Should be able to initialise the exporter");
```

<br>

Here's how a 2-replica volume creation can be traced via [Jaeger]:

![alt text](../img/jaeger.png)

> _NOTE_: In this example the client did not use tracing/telemetry which is why you can only see from the rest-server onwards
[MOAC]: https://github.com/openebs/moac
[K8s]: https://kubernetes.io/
[CSI]: https://github.com/container-storage-interface/spec
[Mayastor]: ./mayastor.md
[CAS]: https://openebs.io/docs/2.12.x/concepts/cas
[Tower]: https://docs.rs/tower/latest/tower/
[etcd]: https://etcd.io/
[Jaeger]: https://www.jaegertracing.io/
[NATS]: https://nats.io/
[gRPC]: https://grpc.io/
[protobuf]: https://protobuf.dev/
155 changes: 155 additions & 0 deletions doc/design/events.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# Event Bus

As part of mayastor we wanted to have event driven capabilities which can allow us to respond to certain events and perform specific actions. \
A message bus ([NATS]) was initially used in early versions of mayastor, but due to several reasons such as some bugs in the client libraries and how we used them (to be fair!), we ended up temporarily moving away from it in favour of p2p [gRPC] (yes we were probably using the wrong stick if many cases). \
As a result of it, we ended up with high coupling between components, such as the io-engine and the core-agent.

With that out of the way, we still believe a message bus is a good solution for many use cases within mayastor:

1. Event driven reconcilers
2. Event accruing for metrics
3. Fault diagnostics system
4. etc

> **NOTE**: What's a message bus after all? It's a messaging system that allows applications to communicate with each other by sending and receiving messages. It acts as a broker that routes messages between senders and receivers which are loosely coupled.
## Enter NATS Jetstream

We've compared several options and ended up selecting [NATS] (again!) as the message bus for our eventing system.

"NATS has a built-in persistence engine called [Jetstream] which enables messages to be stored and replayed at a later time. Unlike NATS Core which requires you to have an active subscription to process messages as they happen, JetStream allows the NATS server to capture messages and replay them to consumers as needed. This functionality enables a different quality of service for your NATS messages, and enables fault-tolerant and high-availability configurations."

### Pros of NATS

- Always on and available (Highly Available)
- Low CPU-consuming
- Fast: A high-velocity communication bus
- High scalability
- Light-weight
- Supports wildcard-based subjects subscription

### Cons of NATS

- Fire and forget in the case of Core NATS but with JetStream it provides ‘at least once’ and ‘exactly once’ delivery guarantees
- No persistence in the Core NATS but it is possible with JetStream

---

We don't currently have a requirement for a messaging queue where order is important, nor do we rely or this information to be persistent. \
However, for optimum functionality we prefer a highly available deployment ensuring smooth operation of the event consumers.

We deploy a highly available Nats with Jetstream enabled, but with an in-memory storage configuration.
Here's how we configure via its helm chart:

```yaml
nats:
jetstream:
enabled: true
memStorage:
enabled: true
size: "5Mi"
fileStorage:
enabled: false
cluster:
enabled: true
replicas: 3
```
## Events
Here we list the events which we're currently publishing on the event bus.
### Volume Events
| Category | Action | Source | Description |
|----------|--------|---------------|--------------------------------------------------|
| Volume | Create | Control plane | Generated when a volume is successfully created |
| Volume | Delete | Control plane | Generated when a volume is successfully deleted |
### Replica Events
| Category | Action | Source | Description |
|----------|--------------|------------|--------------------------------------------------|
| Replica | Create | Data plane | Generated when a replica is successfully created |
| Replica | Delete | Data plane | Generated when a replica is successfully deleted |
| Replica | StateChange | Data plane | Created upon a change in replica state |
### Pool Events
| Category | Action | Source | Description |
|----------|--------|------------|------------------------------------------------|
| Pool | Create | Data plane | Generated when a pool is successfully created |
| Pool | Delete | Data plane | Generated when a pool is successfully deleted |
### Nexus Events
| Category | Action | Source | Description |
|-------------------|-------------------|------------|------------------------------------------------------|
| Nexus | Create | Data plane | Created when a nexus is successfully created |
| Nexus | Delete | Data plane | Created when a nexus is successfully deleted |
| Nexus | StateChange | Data plane | Created upon a change in nexus state |
| Nexus | RebuildBegun | Data plane | Created when a nexus child rebuild operation begins |
| Nexus | RebuildEnd | Data plane | Created when a nexus child rebuild operation ends |
| Nexus | AddChild | Data plane | Created when a child is added to nexus |
| Nexus | RemoveChild | Data plane | Created when a child is removed from nexus |
| Nexus | OnlineChild | Data plane | Created when a nexus child becomes online |
| Nexus | SubsystemPause | Data plane | Created when an I/O subsystem is paused |
| Nexus | SubsystemResume | Data plane | Created when an I/O subsystem is resumed |
| Nexus | Init | Data plane | Created when nexus enters into init state |
| Nexus | Reconfiguring | Data plane | Created when nexus enters into reconfiguring state |
| Nexus | Shutdown | Data plane | Created when a nexus is shutdown |
### Node Events
| Category | Action | Source | Description |
|-----------|-------------|---------------|----------------------------------------------|
| Node | StateChange | Control plane | Created upon a change in node state |
### High Availability Events
| Category | Action | Source | Description |
|--------------------|-------------|---------------|------------------------------------------------------------------------|
| HighAvailability | SwitchOver | Control plane | Created when a volume switch over operation starts, fails or completes |
### Nvme Path Events
| Category | Action | Source | Description |
|------------|-----------------|---------------|---------------------------------------------------------|
| NvmePath | NvmePathSuspect | Control plane | Created when an NVMe path enters into suspect state |
| NvmePath | NvmePathFail | Control plane | Created when an NVMe path transitions into failed state |
| NvmePath | NvmePathFix | Control plane | Created when an NVMe controller reconnects to a nexus |
### Host Initiator Events
| Category | Action | Source | Description |
|----------------|-----------------------|------------|----------------------------------------------------------|
| HostInitiator | NvmeConnect | Data plane | Created upon a host connection to a nexus |
| HostInitiator | NvmeDisconnect | Data plane | Created upon a host disconnection to a nexus |
| HostInitiator | NvmeKeepAliveTimeout | Data plane | Created upon a host keep alive timeout (KATO) on a nexus |
### IO-Engine Events
| Category | Action | Source | Description |
|-------------------|-----------------|------------|----------------------------------------------------|
| IoEngineCategory | Start | Data plane | Created when io-engine initializes |
| IoEngineCategory | Shutdown | Data plane | Created when io-engine shutdown starts |
| IoEngineCategory | Stop | Data plane | Created when an io-engine is stopped |
| IoEngineCategory | ReactorUnfreeze | Data plane | Created when an io-engine reactor is healthy again |
| IoEngineCategory | ReactorFreeze | Data plane | Created when an io-engine reactor is frozen |
### Snapshot and Clone Events
| Category | Action | Source | Description |
|-----------|--------|------------|-------------------------------------------------|
| Snapshot | Create | Data plane | Created when a snapshot is successfully created |
| Clone | Create | Data plane | Created when a clone is successfully created |
## Consumers
- [x] call-home
- [x] e2e testing
- [ ] support dump (kubectl-plugin)
[NATS]: https://nats.io/
[Jetstream]: https://docs.nats.io/nats-concepts/jetstream
[gRPC]: https://grpc.io/
Loading

0 comments on commit eb89d30

Please sign in to comment.