Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 101 additions & 4 deletions actor-model/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,21 +65,61 @@ public abstract class Actor implements Runnable {
@Setter @Getter private String actorId;
private final BlockingQueue<Message> mailbox = new LinkedBlockingQueue<>();
private volatile boolean active = true;
@Setter @Getter private ActorSystem actorSystem;


/**
* Sends a message to this actor.
*/
public void send(Message message) {
mailbox.add(message);
}

/**
* Stops this actor.
*/
public void stop() {
active = false;
}

/**
* Creates a new child actor supervised by this actor.
* This implements the capability for actors to create new actors.
*/
protected String createChildActor(Actor childActor) {
if (actorSystem == null) {
throw new IllegalStateException("Actor system not set");
}
childActor.setActorSystem(actorSystem);
return actorSystem.startChildActor(childActor, actorId);
}

/**
* Handles errors that occur during message processing.
* By default, it restarts the actor, but child classes can override this.
*/
protected void handleError(Throwable error) {
if (actorSystem != null) {
actorSystem.restartActor(actorId, error);
}
}

@Override
public void run() {

while (active) {
try {
Message message = mailbox.take(); // Wait for a message
try {
onReceive(message); // Process it
} catch (Exception e) {
handleError(e); // Handle any errors
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

// Child classes must define what to do with a message
protected abstract void onReceive(Message message);
}

Expand All @@ -102,16 +142,74 @@ public class Message {

```java
public class ActorSystem {
public void startActor(Actor actor) {
private final ExecutorService executor = Executors.newCachedThreadPool();
private final ConcurrentHashMap<String, Actor> actorRegister = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, String> actorHierarchy = new ConcurrentHashMap<>();
private final AtomicInteger idCounter = new AtomicInteger(0);

/**
* Starts an actor without a parent (top-level actor).
*/
public String startActor(Actor actor) {
String actorId = "actor-" + idCounter.incrementAndGet(); // Generate a new and unique ID
actor.setActorId(actorId); // assign the actor it's ID
actorRegister.put(actorId, actor); // Register and save the actor with it's ID
executor.submit(actor); // Run the actor in a thread
return actorId;
}

/**
* Starts an actor with a parent actor (child actor).
* The parent actor will supervise this child actor.
*/
public String startChildActor(Actor actor, String parentId) {
String actorId = startActor(actor);
actorHierarchy.put(actorId, parentId); // Record parent-child relationship
return actorId;
}

/**
* Gets an actor by its ID.
*/
public Actor getActorById(String actorId) {
return actorRegister.get(actorId); // Find by Id
}

/**
* Gets the parent actor of an actor.
*/
public Actor getParentActor(String childId) {
String parentId = actorHierarchy.get(childId);
return parentId != null ? actorRegister.get(parentId) : null;
}

/**
* Restarts an actor that has failed.
* If the actor has a parent, the parent will be notified.
*/
public void restartActor(String actorId, Throwable error) {
Actor actor = actorRegister.get(actorId);
if (actor != null) {
// Stop the current actor
actor.stop();

// Notify parent if exists
String parentId = actorHierarchy.get(actorId);
if (parentId != null) {
Actor parent = actorRegister.get(parentId);
if (parent != null) {
parent.send(new Message("Child actor " + actorId + " failed: " + error.getMessage(), "system"));
}
}

// Restart the actor
executor.submit(actor);
}
}

/**
* Shuts down the actor system, stopping all actors.
*/
public void shutdown() {
executor.shutdownNow(); // Stop all threads
}
Expand Down Expand Up @@ -198,4 +296,3 @@ public class App {
- *Reactive Design Patterns*, Roland Kuhn
- *The Actor Model in 10 Minutes*, [InfoQ Article](https://www.infoq.com/articles/actor-model/)
- [Akka Documentation](https://doc.akka.io/docs/akka/current/index.html)

43 changes: 42 additions & 1 deletion actor-model/src/main/java/com/iluwatar/actormodel/Actor.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,61 @@ public abstract class Actor implements Runnable {

// rather than being cached in a thread's local memory. To make it consistent to all Actors

@Setter @Getter private ActorSystem actorSystem;

/**
* Sends a message to this actor.
*
* @param message The message to send
*/
public void send(Message message) {
mailbox.add(message); // Add message to queue
}

/**
* Stops this actor.
*/
public void stop() {
active = false; // Stop the actor loop
}

/**
* Creates a new child actor supervised by this actor.
* This implements the capability for actors to create new actors.
*
* @param childActor The actor to create as a child
* @return The ID of the created child actor
*/
protected String createChildActor(Actor childActor) {
if (actorSystem == null) {
throw new IllegalStateException("Actor system not set");
}
childActor.setActorSystem(actorSystem);
return actorSystem.startChildActor(childActor, actorId);
}

/**
* Handles errors that occur during message processing.
* By default, it restarts the actor, but child classes can override this.
*
* @param error The error that occurred
*/
protected void handleError(Throwable error) {
if (actorSystem != null) {
actorSystem.restartActor(actorId, error);
}
}

@Override
public void run() {
while (active) {
try {
Message message = mailbox.take(); // Wait for a message
onReceive(message); // Process it
try {
onReceive(message); // Process it
} catch (Exception e) {
handleError(e); // Handle any errors
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,88 @@
public class ActorSystem {
private final ExecutorService executor = Executors.newCachedThreadPool();
private final ConcurrentHashMap<String, Actor> actorRegister = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, String> actorHierarchy = new ConcurrentHashMap<>();
private final AtomicInteger idCounter = new AtomicInteger(0);

public void startActor(Actor actor) {
/**
* Starts an actor without a parent (top-level actor).
*
* @param actor The actor to start
* @return The ID of the started actor
*/
public String startActor(Actor actor) {
String actorId = "actor-" + idCounter.incrementAndGet(); // Generate a new and unique ID
actor.setActorId(actorId); // assign the actor it's ID
actorRegister.put(actorId, actor); // Register and save the actor with it's ID
executor.submit(actor); // Run the actor in a thread
return actorId;
}

/**
* Starts an actor with a parent actor (child actor).
* The parent actor will supervise this child actor.
*
* @param actor The actor to start
* @param parentId The ID of the parent actor
* @return The ID of the started actor
*/
public String startChildActor(Actor actor, String parentId) {
String actorId = startActor(actor);
actorHierarchy.put(actorId, parentId); // Record parent-child relationship
return actorId;
}

/**
* Gets an actor by its ID.
*
* @param actorId The ID of the actor to get
* @return The actor with the given ID, or null if not found
*/
public Actor getActorById(String actorId) {
return actorRegister.get(actorId); // Find by Id
}

/**
* Gets the parent actor of an actor.
*
* @param childId The ID of the child actor
* @return The parent actor, or null if the actor has no parent
*/
public Actor getParentActor(String childId) {
String parentId = actorHierarchy.get(childId);
return parentId != null ? actorRegister.get(parentId) : null;
}

/**
* Restarts an actor that has failed.
* If the actor has a parent, the parent will be notified.
*
* @param actorId The ID of the actor to restart
* @param error The error that caused the actor to fail
*/
public void restartActor(String actorId, Throwable error) {
Actor actor = actorRegister.get(actorId);
if (actor != null) {
// Stop the current actor
actor.stop();

// Notify parent if exists
String parentId = actorHierarchy.get(actorId);
if (parentId != null) {
Actor parent = actorRegister.get(parentId);
if (parent != null) {
parent.send(new Message("Child actor " + actorId + " failed: " + error.getMessage(), "system"));
}
}

// Restart the actor
executor.submit(actor);
}
}

/**
* Shuts down the actor system, stopping all actors.
*/
public void shutdown() {
executor.shutdownNow(); // Stop all threads
}
Expand Down
55 changes: 45 additions & 10 deletions actor-model/src/main/java/com/iluwatar/actormodel/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,56 @@
*/
package com.iluwatar.actormodel;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class App {
/**
* Main method to demonstrate the Actor Model pattern.
*
* @param args command line arguments (not used)
* @throws InterruptedException if thread is interrupted
*/
public static void main(String[] args) throws InterruptedException {
// Create the actor system
ActorSystem system = new ActorSystem();
Actor srijan = new ExampleActor(system);
Actor ansh = new ExampleActor2(system);

system.startActor(srijan);
system.startActor(ansh);
ansh.send(new Message("Hello ansh", srijan.getActorId()));
srijan.send(new Message("Hello srijan!", ansh.getActorId()));
// Create and start parent actors
ExampleActor parent = new ExampleActor(system);
String parentId = system.startActor(parent);
LOGGER.info("Started parent actor with ID: {}", parentId);

// Parent creates a child actor
String childId = parent.createChildExampleActor();
LOGGER.info("Parent created child actor with ID: {}", childId);

// Get the child actor
ExampleActor2 child = (ExampleActor2) system.getActorById(childId);

// Basic message passing
child.send(new Message("Hello from parent", parent.getActorId()));
parent.send(new Message("Hello from child", child.getActorId()));

Thread.sleep(500); // Give time for messages to process

// Demonstrate supervision - send a message that will cause an error
LOGGER.info("Sending message that will cause an error to demonstrate supervision");
child.send(new Message("This will cause an error", parent.getActorId()));

Thread.sleep(1000); // Give time for error handling and supervision

// Create another actor directly through the system
Actor anotherActor = new ExampleActor2(system);
system.startActor(anotherActor);
anotherActor.send(new Message("Hello from the system", "system"));

Thread.sleep(1000); // Give time for messages to process
Thread.sleep(500); // Give time for messages to process

srijan.stop(); // Stop the actor gracefully
ansh.stop();
system.shutdown(); // Stop the actor system
// Graceful shutdown
LOGGER.info("Shutting down actors and system");
parent.stop();
child.stop();
anotherActor.stop();
system.shutdown();
}
}
Loading
Loading