Skip to content

Commit

Permalink
feat(core): adding channels to eventcatalog (#935)
Browse files Browse the repository at this point in the history
* feat(core): adding channels to eventcatalog

* feat(core): adding channels to eventcatalog

* refactor

* channel update

* channel update

* adding more channel support

* adding more channel support

* adding more channel support

* adding more channel support

* adding more channel support

* Create ninety-donuts-fold.md
  • Loading branch information
boyney123 authored Nov 12, 2024
1 parent f42de19 commit 003c1f2
Show file tree
Hide file tree
Showing 104 changed files with 2,504 additions and 784 deletions.
5 changes: 5 additions & 0 deletions .changeset/ninety-donuts-fold.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@eventcatalog/core": minor
---

feat(core): adding channels to eventcatalog
157 changes: 157 additions & 0 deletions examples/default/channels/inventory.{env}.events/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
---
id: inventory.{env}.events
name: Inventory Events Channel
version: 1.0.0
summary: |
Central event stream for all inventory-related events including stock updates, allocations, and adjustments
owners:
- dboyne
address: inventory.{env}.events
protocols:
- kafka

parameters:
env:
enum:
- dev
- sit
- prod
description: 'Environment to use'
---

### Overview
The Inventory Events channel is the central stream for all inventory-related events across the system. This includes stock level changes, inventory allocations, adjustments, and stocktake events. Events for a specific SKU are guaranteed to be processed in sequence when using productId as the partition key.

<ChannelInformation />

### Publishing and Subscribing to Events

#### Publishing Example
```python
from kafka import KafkaProducer
import json
from datetime import datetime

# Kafka configuration
bootstrap_servers = ['localhost:9092']
topic = f'inventory.{env}.events'

# Create a Kafka producer
producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Example inventory update event
inventory_event = {
"eventType": "STOCK_LEVEL_CHANGED",
"timestamp": datetime.utcnow().isoformat(),
"version": "1.0",
"payload": {
"productId": "PROD-456",
"locationId": "WH-123",
"previousQuantity": 100,
"newQuantity": 95,
"changeReason": "ORDER_FULFILLED",
"unitOfMeasure": "EACH",
"batchInfo": {
"batchId": "BATCH-789",
"expiryDate": "2025-12-31"
}
},
"metadata": {
"source": "warehouse_system",
"correlationId": "inv-xyz-123",
"userId": "john.doe"
}
}

# Send the message - using productId as key for partitioning
producer.send(
topic,
key=inventory_event['payload']['productId'].encode('utf-8'),
value=inventory_event
)
producer.flush()

print(f"Inventory event sent to topic {topic}")

```

### Subscription example

```python
from kafka import KafkaConsumer
import json
from datetime import datetime

class InventoryEventConsumer:
def __init__(self):
# Kafka configuration
self.topic = f'inventory.{env}.events'
self.consumer = KafkaConsumer(
self.topic,
bootstrap_servers=['localhost:9092'],
group_id='inventory-processor-group',
auto_offset_reset='earliest',
enable_auto_commit=False,
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
key_deserializer=lambda x: x.decode('utf-8') if x else None
)

def process_event(self, event):
"""Process individual inventory events based on type"""
event_type = event.get('eventType')

if event_type == 'STOCK_LEVEL_CHANGED':
self.handle_stock_level_change(event)
elif event_type == 'LOW_STOCK_ALERT':
self.handle_low_stock_alert(event)
# Add more event type handlers as needed

def handle_stock_level_change(self, event):
"""Handle stock level change events"""
payload = event['payload']
print(f"Stock level change detected for product {payload['productId']}")
print(f"New quantity: {payload['newQuantity']}")
# Add your business logic here

def handle_low_stock_alert(self, event):
"""Handle low stock alert events"""
payload = event['payload']
print(f"Low stock alert for product {payload['productId']}")
print(f"Current quantity: {payload['currentQuantity']}")
# Add your business logic here

def start_consuming(self):
"""Start consuming messages from the topic"""
try:
print(f"Starting consumption from topic: {self.topic}")
for message in self.consumer:
try:
# Process the message
event = message.value
print(f"Received event: {event['eventType']} for product: {event['payload']['productId']}")

# Process the event
self.process_event(event)

# Commit the offset after successful processing
self.consumer.commit()

except Exception as e:
print(f"Error processing message: {str(e)}")
# Implement your error handling logic here
# You might want to send to a DLQ (Dead Letter Queue)

except Exception as e:
print(f"Consumer error: {str(e)}")
finally:
# Clean up
self.consumer.close()

if __name__ == "__main__":
# Create and start the consumer
consumer = InventoryEventConsumer()
consumer.start_consuming()
```
82 changes: 82 additions & 0 deletions examples/default/channels/orders.{env}.events/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
---
id: orders.{env}.events
name: Order Events Channel
version: 1.0.1
summary: |
Central event stream for all order-related events in the order processing lifecycle
owners:
- dboyne
address: orders.{env}.events
protocols:
- kafka

parameters:
env:
enum:
- dev
- sit
- prod
description: 'Environment to use'
---

### Overview
The Orders Events channel is the central stream for all order-related events across the order processing lifecycle. This includes order creation, updates, payment status, fulfillment status, and customer communications. All events related to a specific order are guaranteed to be processed in sequence when using orderId as the partition key.

<ChannelInformation />

### Publishing a message using Kafka

Here is an example of how to publish an order event using Kafka:

```python
from kafka import KafkaProducer
import json
from datetime import datetime

# Kafka configuration
bootstrap_servers = ['localhost:9092']
topic = f'orders.{env}.events'

# Create a Kafka producer
producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Example order created event
order_event = {
"eventType": "ORDER_CREATED",
"timestamp": datetime.utcnow().isoformat(),
"version": "1.0",
"payload": {
"orderId": "12345",
"customerId": "CUST-789",
"items": [
{
"productId": "PROD-456",
"quantity": 2,
"price": 29.99
}
],
"totalAmount": 59.98,
"shippingAddress": {
"street": "123 Main St",
"city": "Springfield",
"country": "US"
}
},
"metadata": {
"source": "web_checkout",
"correlationId": "abc-xyz-123"
}
}

# Send the message - using orderId as key for partitioning
producer.send(
topic,
key=order_event['payload']['orderId'].encode('utf-8'),
value=order_event
)
producer.flush()

print(f"Order event sent to topic {topic}")
88 changes: 88 additions & 0 deletions examples/default/channels/payment.{env}.events/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
---
id: payments.{env}.events
name: Payment Events Channel
version: 1.0.0
summary: |
All events contain payment ID for traceability and ordered processing.
owners:
- dboyne
address: payments.{env}.events
protocols:
- kafka

parameters:
env:
enum:
- dev
- sit
- prod
description: 'Environment to use for payment events'
---

### Overview
The Payments Events channel is the central stream for all payment lifecycle events. This includes payment initiation, authorization, capture, completion and failure scenarios. Events for a specific payment are guaranteed to be processed in sequence when using paymentId as the partition key.

<ChannelInformation />

### Publishing Events Using Kafka

Here's an example of publishing a payment event:

```python
from kafka import KafkaProducer
import json
from datetime import datetime

# Kafka configuration
bootstrap_servers = ['localhost:9092']
topic = f'payments.{env}.events'

# Create Kafka producer
producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Example payment processed event
payment_event = {
"eventType": "PAYMENT_PROCESSED",
"timestamp": datetime.utcnow().isoformat(),
"version": "1.0",
"payload": {
"paymentId": "PAY-123-456",
"orderId": "ORD-789",
"amount": {
"value": 99.99,
"currency": "USD"
},
"status": "SUCCESS",
"paymentMethod": {
"type": "CREDIT_CARD",
"last4": "4242",
"expiryMonth": "12",
"expiryYear": "2025",
"network": "VISA"
},
"transactionDetails": {
"processorId": "stripe_123xyz",
"authorizationCode": "AUTH123",
"captureId": "CAP456"
}
},
"metadata": {
"correlationId": "corr-123-abc",
"merchantId": "MERCH-456",
"source": "payment_service",
"environment": "prod",
"idempotencyKey": "PAY-123-456-2024-11-11-99.99"
}
}

# Send message - using paymentId as key for partitioning
producer.send(
topic,
key=payment_event['payload']['paymentId'].encode('utf-8'),
value=payment_event
)
producer.flush()
```
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ badges:
- content: Recently updated!
backgroundColor: green
textColor: green
channels:
- id: inventory.{env}.events
parameters:
env: staging
schemaPath: 'schema.json'
---

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ badges:
- content: Recently updated!
backgroundColor: green
textColor: green
channels:
- id: inventory.{env}.events
parameters:
env: staging
schemaPath: "schema.json"
---

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ owners:
- asmith
- full-stack
- mobile-devs
channels:
- id: inventory.{env}.events
badges:
- content: Recently updated!
backgroundColor: green
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ owners:
- asmith
- full-stack
- mobile-devs
channels:
- id: inventory.{env}.events
badges:
- content: Recently updated!
backgroundColor: green
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,8 @@ receives:
- id: UpdateInventory
version: 0.0.3
- id: AddInventory
version: x
- id: GetInventoryStatus
sends:
- id: OrderConfirmed
version: 0.0.1
- id: OrderAmended
- id: InventoryAdjusted
version: 0.0.4
- id: OutOfStock
Expand Down
Loading

0 comments on commit 003c1f2

Please sign in to comment.