Check out the example module to get a better understanding on how all this works.
This is an attempt to create a complex event processing implementation in Akka. The idea is that processors subscribe to channels to receive events published on the channels. The event processor can then process the events in some way and publish new events on other event channels.
The project also houses a storage solution, based on Apache Cassandra, for auditing and statistics. An API exists for easy access to historic data.
TODO
A collector collects events from external event producers.
A processor is a component that performs operations on events. This could be to extract data, create new events based on aggregated events or examining a collection of events to find a particular pattern.
To receive events the processor subscribes to channels or to all events that flows through the system. When the processor creates a new event it can publish it to another channel.
TODO
Everything is configured in the application.conf
file.
TODO
Handles routing in the system
Just forwarding an event to a different channel (and category)
forwarderName {
type = forwarder
subscriptions = [ {channel = "foo"} ]
publication = { channel = "bar" }
}
Filtering out events that doesn't comply with a specified rule(expression)
filterName {
type = filter
subscriptions = [ {channel = "foo"} ]
publication = { channel = "bar" }
expression = { mvel = "value > 0" }
accept = false
}
Analysing sequences of events and produces new events with the result
Counts events within a specified time frame
filterName {
type = countAnalyser
subscriptions = [ {channel = "foo"} ]
publication = { channel = "bar" }
categorize = true
timeframe = 2 hours
}
Count average in a specified window (time or length)
averageAnalyserName {
type = averageAnalyser
subscriptions = [ {channel = "foo"} ]
publication = { channel = "bar" }
categorize = false
expression = { static = "foo" }
window = { time = 1 minute }
}
Calculates the regression coefficient within a specified time frame
regressionName {
type = regressionAnalyser
subscriptions = [ {channel = "foo"} ]
publication = { channel = "bar" }
categorize = true
minSize = 25
timeframe = 15 minutes
}
Alerts when events that doesn't comply with a specified rule(expression) and informs when state is back to normal.
type = alerter
subscriptions = [ {channel = "foo"} ]
publication = { channel = "bar" }
categorize = false
expression = { mvel = "true" }
TODO
Produces events to external consumers.
Writes events to log.
logger {
type = logProducer
subscriptions = [ {channel = "foo"} ]
loglevel = INFO
}
It's possible to create a custom processor and use it in the configuration by specifying the class instead of type.
customProcessor {
class = org.example.CustomProcessor
subscriptions = [ {channel = "foo"} ]
publication = { channel = "bar" }
arguments = ["foo", "bar"]
}
class SimpleProcessor (
override val subscriptions: List[Subscription],
val publication: Publication)
extends Processor(subscriptions) with Publisher {
def process(event: Event) {
// do stuff
publish(event)
}
}
TODO
TODO
TODO
Copyright 2012 Albert Örwall
Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0