Skip to content

Latest commit

 

History

History
207 lines (153 loc) · 10.1 KB

File metadata and controls

207 lines (153 loc) · 10.1 KB

Akka

The actor model, as described by its inventor Joe Armstrong, is based on the principle that processes that work in parallel and exchange data and information to accomplish a higher-level goal should be programmed using a concurrent programming language

"The world is parallel. If we want to write programs that behave as other objects behave in the real world, then these programs will have a concurrent structure. Use a language that was designed for writing concurrent applications, and development becomes a lot easier. Erlang (actor-oriented language) programs model how we think and interact."

Akka concurrent programming model is an actor system. Actors are lightweight and independent thread-like object which respond to messages, create their own state, and communicate with other actors through message passing without any mediation by external entities or manipulation by channel semantics.

Actors

To define a new type of actor, the most straightforward approach is to inherit from the AbstractActor class. In doing so, you will need to define the createReceive() method to specify how each message should be processed. Additionally, you have the option to override other methods to further customize the behavior during different states in the lifecycle of the actor: prestart(), preRestart(), postRestart() or preStop().

The receiving behavior of the actor is defined by the createReceive():

  • It checks all the match clauses in the order in which they are defined
  • It uses the method associated to the first matching clause
  • If no matching clause exists, the message is discarded

The counter actor processes the message one by one in the order it received, without interleaving between the processing of different messages.

public class CounterActor extends AbstractActor {

	private int counter;

	public CounterActor() {
		this.counter = 0;
	}

	@Override
	public Receive createReceive() {
		return receiveBuilder().match(SimpleMessage.class, this::onMessage).build();
	}

	void onMessage(SimpleMessage msg) {
		++counter;
		System.out.println("Counter increased to " + counter);
	}

	static Props props() {
		return Props.create(CounterActor.class);
	}

}

ActorRefs can be passed as part of messages to inform other actors. ActorRefs can be used to send messages to actors using tell(). To make an actor class be used as an ActorRef is necessary to write a similar method:

static Props props() {
		return Props.create(CounterActor.class);
	}

It's used when creating a new actor with actorOf. The props() method is a static method that creates and returns a Props (a configuration class) class to specify options for the creation of actors. This Props instance is used when creating a new CounterActor with actorOf.

ActorRef counterActor = system.actorOf(CounterActor.props(), "counterActor");

tell()

  • Method Signature: void tell(Object message, ActorRef sender)
  • Asynchronous and Fire-and-Forget: When an actor uses tell, it sends a message to another actor asynchronously and does not wait for a response. It's a "fire-and-forget" method, meaning the sender continues its processing without pausing for the receiver's reply.
  • No Direct Way to Receive a Reply: If the sender needs a response, it must be handled through another message sent back to the sender's ActorRef.
  • Usage Example: actorRef.tell(message, getSelf())
  • Common Use Case: This is typically used for simple notifications or interactions where no immediate response is required.
  • Use tell when you don't need a response back from the receiver, or if the interaction is simply a notification or a one-way communication.

ask()

ask is used when you need a response from the receiver and next steps depend on the receiver's response. ask sends a message and returns a Future object. This future will be completed with the response from the receiver. It's still asynchronous, but it allows the sender to handle the response at a future point in time.

  • Patterns.ask(receiver, msg, timeout)
  • The receiver replies as usual sender().tell(reply, self())
  • The sender can block on the future to obtain a blocking/synchronous behavior Await.result(future, timeout)

The timeout specifies that if the response is not received within it, the future fails with a TimeoutException.

Stashing

Stashing in Akka is highlighted, allowing actors to temporarily hold messages for later processing, providing greater control over message handling. To do so, it is necessary to inherit from AbstractActorWithStash.

  • The stash() method saves the message for later processing in a different state
  • The unstashall() method extracts all the messages from the stash, in the same order in which they were added

Supervisor

Faults and exceptional behaviors in Akka are managed through supervision. Also is possible to organize supervision in a tree, where each supervisor responsible for handling failures in its directly supervised nodes. If a supervisor cannot handle a problem locally, it escalates the fault to the upper layer. This works because the supervisor oversees the actor's lifecycle, including decisions to terminate or restart a faulty actor. To instantiate a supervisor we need always the same convenience method for creating Props:

static Props props() { return Props.create(AddressBookSupervisorActor.class); }

we can create an instance of `AddressBookSupervisorActor:

final ActorRef supervisor = sys.actorOf(AddressBookSupervisorActor.props(), "supervisor");

After instantiating the supervisor, we can asks it to create the child actor and returns a reference (if needed):

scala.concurrent.Future<Object> waitingForCounter = ask(supervisor, Props.create(CounterActor.class), 5000);
counter = (ActorRef) waitingForCounter.result(timeout, null);

If not needed, it's not necessary to use an ask obviously, but it's possible to have just instantiated the references inside the supervisor:

private ActorRef workerEven;
private ActorRef workerOdd;
    
public BrokerActor() {
        workerEven = getContext().actorOf(WorkerActor.props());
        workerOdd = getContext().actorOf(WorkerActor.props());
}

For example we can use the supervisor to supervise the server (for fault tolerance purposes):

scala.concurrent.Future<Object> waitingForAddressBookServer = 
ask(supervisor, Props.create(AddressBookServerActor.class), 5000);

scala.concurrent.duration.Duration timeout = 
scala.concurrent.duration.Duration.create(5, SECONDS);

ActorRef server = null;
try {
    server = (ActorRef) waitingForAddressBookServer.result(timeout, null);
} catch (TimeoutException | InterruptedException e) {
    e.printStackTrace();
}

The ask returns the server since is calling this method:

@Override
public Receive createReceive() {
    return receiveBuilder()
              .match(
                  Props.class,
                  props -> {
                    getSender().tell(getContext().actorOf(props), getSelf());
                  })
              .build();
}

From this moment the supervisor is watching the server. Customizable Supervision Strategy: the supervision strategy can be tailored by overriding the supervisorStrategy().

//define the strategy
private static SupervisorStrategy strategy =
    new OneForOneStrategy(
        1, // Max no of retries
        Duration.ofMinutes(1), // Within what time period
        DeciderBuilder.match(Exception.class, e -> SupervisorStrategy.resume()) //or .restart()
            .build());


//overrides supervisorStrategy() method to set actually the strategy
@Override
public SupervisorStrategy supervisorStrategy() {
  return strategy;
}

In this case, it is a OneForOneStrategy, meaning it applies to individual child actors that encounter failures:

  • Max Number of Retries: the number of times an actor is allowed to be restarted, negative value means no limit
  • Time Frame: duration of the time window for maxNrOfRetries, Duration.Inf means no window
  • Decider Logic: When an Exception is thrown, the supervisor strategy's action is to resume the actor, meaning the actor continues processing new messages without restarting or stopping.

Clustering

An Akka application can be distributed over a cluster Each node hosts some part of the application.

  1. Clustering Terminology:
    • Node: A logical member of a cluster, possibly multiple on each physical machine, identified by a tuple (hostname:port:uid).
    • Cluster: A set of nodes joined through the membership service.
    • Leader: A role within the cluster where a single node acts as a leader, managing cluster convergence.
  2. Decentralized Membership Service:
    • Akka clustering offers a decentralized membership service without a single point of failure or bottleneck.
    • The implementation is peer-to-peer, using a gossip protocol and automatic failure detection.
  3. Clustering Basics:
    • An Akka application can be distributed over a cluster, with each node hosting parts of the application.
    • Cluster membership and the actors running on a member node are decoupled.
    • Nodes join a cluster by sending a join command to one of the cluster nodes.
  4. Clustering Protocol:
    • Nodes organize themselves into an overlay network, distributing information about cluster members.
    • Gossip protocol: Nodes propagate messages containing their view of the membership, updating their view based on received messages.
    • The state of nodes eventually converges, using vector clocks to record a partial order of events (nodes joining, leaving, etc.).
  5. Cluster Tools:
    • Cluster Singleton: Ensures that a single actor of a certain type exists in the cluster.
    • Cluster Sharding: Distributes actors across nodes of the cluster, facilitating communication without knowing their physical location.
    • Distributed Data: Creates a distributed key-value store across the nodes of the cluster.
    • Cluster Metrics: Publishes and collects health metrics about cluster members.