actor

package
v4.0.0-...-be71470 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 1, 2026 License: MIT Imports: 74 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DefaultPassivationTimeout defines the default passivation timeout
	DefaultPassivationTimeout = 2 * time.Minute
	// DefaultInitMaxRetries defines the default value for retrying actor initialization
	DefaultInitMaxRetries = 5
	// DefaultShutdownTimeout defines the default shutdown timeout
	DefaultShutdownTimeout = 5 * time.Minute
	// DefaultInitTimeout defines the default init timeout
	DefaultInitTimeout = time.Second
	// DefaultPublishStateTimeout defines the default state publication timeout
	// This is the maximum time to wait for a state to be published to the cluster
	// before timing out when the actor system is shutting down
	DefaultPublishStateTimeout = time.Minute
	// DefaultAskTimeout defines the default ask timeout
	DefaultAskTimeout = 5 * time.Second
	// DefaultMaxReadFrameSize defines the default HTTP maximum read frame size
	DefaultMaxReadFrameSize = 16 * size.MB
	// DefaultClusterBootstrapTimeout defines the default cluster bootstrap timeout
	DefaultClusterBootstrapTimeout = 10 * time.Second
	// DefaultClusterStateSyncInterval defines the default cluster state synchronization interval
	DefaultClusterStateSyncInterval = time.Minute
	// DefaultGrainRequestTimeout defines the default grain request timeout
	DefaultGrainRequestTimeout = 5 * time.Second

	// DefaultClusterBalancerInterval defines the default cluster balancer interval
	DefaultClusterBalancerInterval = time.Second
)
View Source
const (
	// DefaultShutdownRecoveryMaxRetries defines the default number of retries for coordinated shutdown hooks
	DefaultShutdownRecoveryMaxRetries = 3

	// DefaultShutdownHookRecoveryRetryInterval defines the default delay between retries for coordinated shutdown hooks when a retry policy is applied
	DefaultShutdownHookRecoveryRetryInterval = time.Second
)

Variables

View Source
var (
	// DefaultSupervisorDirective defines the default supervisory strategy directive
	DefaultSupervisorDirective = supervisor.StopDirective
)

Functions

func Ask

func Ask(ctx context.Context, to *PID, message any, timeout time.Duration) (response any, err error)

Ask sends a synchronous message to another actor and expect a response. This block until a response is received or timed out.

func BatchAsk

func BatchAsk(ctx context.Context, to *PID, timeout time.Duration, messages ...any) (responses chan any, err error)

BatchAsk sends a synchronous bunch of messages to the given PID and expect responses in the same order as the messages. The messages will be processed one after the other in the order they are sent This is a design choice to follow the simple principle of one message at a time processing by actors.

func BatchTell

func BatchTell(ctx context.Context, to *PID, messages ...any) error

BatchTell sends bulk asynchronous messages to an actor The messages will be processed one after the other in the order they are sent This is a design choice to follow the simple principle of one message at a time processing by actors.

func Tell

func Tell(ctx context.Context, to *PID, message any) error

Tell sends an asynchronous message to an actor

Types

type ActivationStrategy

type ActivationStrategy int

ActivationStrategy defines the algorithm used by the actor system to determine where a grain should be activated in a clustered environment.

This strategy is only relevant when cluster mode is enabled. It affects how grains are distributed across the nodes in the cluster.

const (
	// RoundRobinActivation distributes grains evenly across nodes
	// by cycling through the available nodes in a round-robin manner.
	// This strategy provides balanced load distribution over time.
	// ⚠️ Note: This strategy is subject to the cluster topology at the time of activation. For a stable cluster topology,
	// it ensures an even distribution of grains across all nodes.
	// ⚠️ Note: This strategy will only be applied if the given Grain does not exist yet when cluster mode is enabled.
	// ⚠️ Note: If the Grain already exists on another node, it will be activated there instead.
	RoundRobinActivation ActivationStrategy = iota

	// Random selects a node at random from the available pool of nodes.
	// This strategy is stateless and can help quickly spread grains across the cluster,
	// but may result in uneven load distribution.
	// ⚠️ Note: This strategy will only be applied if the given Grain does not exist yet when cluster mode is enabled.
	// ⚠️ Note: If the Grain already exists on another node, it will be activated there instead.
	RandomActivation

	// LocalActivation forces the grain to be activated on the local node.
	// Useful when locality is important (e.g., accessing local resources).
	// ⚠️ Note: This strategy will only be applied if the given Grain does not exist yet when cluster mode is enabled.
	// ⚠️ Note: If the Grain already exists on another node, it will be activated there instead.
	LocalActivation

	// LeastLoadActivation selects the node with the least current load to activate the grain.
	// This strategy aims to optimize resource utilization by placing grains
	// on nodes that are less busy, potentially improving performance and responsiveness.
	// ⚠️ Note: This strategy may require additional overhead when placing grains,
	// as it needs to get nodes load metrics depending on the cluster size.
	// ⚠️ Note: This strategy will only be applied if the given Grain does not exist yet when cluster mode is enabled.
	// ⚠️ Note: If the Grain already exists on another node, it will be activated there instead.
	LeastLoadActivation
)

type Actor

type Actor interface {
	// PreStart is called once before the actor starts processing messages.
	//
	// Use this method to initialize dependencies such as database clients,
	// caches, or external service connections and persistent state recovery. If PreStart returns an error,
	// the actor will not be started, and the failure will be handled by its supervisor.
	PreStart(ctx *Context) error

	// Receive handles all messages sent to the actor's mailbox.
	//
	// This is the heart of the actor's behavior. Messages can include user-defined
	// commands/events as well as internal system messages such as PostStart or lifecycle signals.
	//
	// Actors can reply to messages using async messaging patterns or configure replies inline
	// where supported. Avoid heavy synchronous workflows as they may degrade throughput in high-load scenarios.
	//
	// Tip: Use pattern matching or typed message handlers to organize complex message workflows.
	Receive(ctx *ReceiveContext)

	// PostStop is called when the actor is about to shut down.
	//
	// This lifecycle hook is invoked after the actor has finished processing all messages
	// in its mailbox and is guaranteed to run before the actor is fully terminated.
	//
	// Use this method to perform final cleanup actions such as:
	//   - Releasing resources (e.g., database connections, goroutines, open files)
	//   - Flushing logs or metrics
	//   - Notifying other systems of termination (e.g., via events or pub/sub)
	//
	// This method is especially important passivated actors, as it is also
	// called during passivation when an idle actor is stopped to free up resources.
	//
	// Note: If PostStop returns an error, the error is logged but does not prevent the actor
	// from being stopped. Keep PostStop logic fast and resilient to avoid delaying system shutdowns.
	PostStop(ctx *Context) error
}

Actor defines the interface that all actors in the system must implement.

Actors are lightweight, concurrent, and isolated units of computation that encapsulate both state and behavior. In this system, actors can be supervised, restarted on failure, or run in a distributed (remote) cluster-aware environment.

Any struct implementing this interface must be immutable—i.e., all fields should be private (unexported) to ensure thread safety. Initialization should occur in the PreStart hook, not via exported fields or constructors.

Actors follow a lifecycle composed of:

  • PreStart: initialization before message processing begins.
  • Receive: core behavior and message handling.
  • PostStop: cleanup when the actor shuts down.

**Supervision and Fault Tolerance** If an actor fails during message processing, its supervisor can decide how to handle the failure (e.g., restart, stop, escalate). If state recovery is needed (e.g., for persistent actors), it must be explicitly handled inside the PreStart hook.

**Clustering and Remoting** In a distributed deployment, actors can be remotely spawned, communicated with across nodes. Ensure that any resources initialized in PreStart are safe for clustered environments (e.g., stateless, replicated, or retryable).

type ActorChildCreated

type ActorChildCreated struct {
	// contains filtered or unexported fields
}

ActorChildCreated defines the child actor created event

func NewActorChildCreated

func NewActorChildCreated(address, parent string) *ActorChildCreated

NewActorChildCreated creates a new ActorChildCreated event stamped with the current UTC time.

func (*ActorChildCreated) Address

func (a *ActorChildCreated) Address() string

Address returns the child actor's address.

func (*ActorChildCreated) CreatedAt

func (a *ActorChildCreated) CreatedAt() time.Time

CreatedAt returns the time the child actor was created.

func (*ActorChildCreated) Parent

func (a *ActorChildCreated) Parent() string

Parent returns the parent actor's address.

type ActorMetric

type ActorMetric struct {
	// contains filtered or unexported fields
}

ActorMetric is a point-in-time view of a single actor’s counters and timings.

It is returned by PID.Metric(ctx) and reflects only the local actor state; no cluster aggregation is performed. Metrics are captured at the moment Metric is called and will not update afterwards. If the actor is not running when requested, PID.Metric returns nil.

func (ActorMetric) ChidrenCount

func (x ActorMetric) ChidrenCount() uint64

ChidrenCount returns the current number of child actors owned by this actor. Represents the actor’s immediate descendants (not a transitive count).

func (ActorMetric) DeadlettersCount

func (x ActorMetric) DeadlettersCount() uint64

DeadlettersCount returns the total number of messages sent to deadletters by this actor. Indicates local delivery failures or unhandled messages.

func (ActorMetric) FailureCount

func (x ActorMetric) FailureCount() uint64

FailureCount returns the cumulative number of failures observed by this actor. Typically increments on panics or errors that trigger supervision actions.

func (ActorMetric) LatestProcessedDuration

func (x ActorMetric) LatestProcessedDuration() time.Duration

LatestProcessedDuration returns the duration of the latest message processing. Unit: time.Duration (nanoseconds). Useful for latency observations on the last handled message.

func (ActorMetric) ProcessedCount

func (x ActorMetric) ProcessedCount() uint64

ProcessedCount returns the cumulative number of messages this actor has processed. Increments after successful handling; excludes stashed or dropped messages.

func (ActorMetric) ReinstateCount

func (x ActorMetric) ReinstateCount() uint64

ReinstateCount returns the cumulative number of reinstatements for this actor. A reinstatement occurs when an actor transitions from suspended to resumed state.

func (ActorMetric) RestartCount

func (x ActorMetric) RestartCount() uint64

RestartCount returns the cumulative number of restarts for this actor (PID). Increments when supervision restarts the actor. Does not include normal stops.

func (ActorMetric) StashSize

func (x ActorMetric) StashSize() uint64

StashSize returns the current number of messages stashed by this actor. Fluctuates as messages are stashed/unstashed under behaviors like become/unbecome.

func (ActorMetric) Uptime

func (x ActorMetric) Uptime() int64

Uptime returns the elapsed time this actor has been alive, in seconds. Resets to zero on actor restart.

type ActorPassivated

type ActorPassivated struct {
	// contains filtered or unexported fields
}

ActorPassivated defines the actor passivated event

func NewActorPassivated

func NewActorPassivated(address string) *ActorPassivated

NewActorPassivated creates a new ActorPassivated event stamped with the current UTC time.

func (*ActorPassivated) Address

func (a *ActorPassivated) Address() string

Address returns the actor's address.

func (*ActorPassivated) PassivatedAt

func (a *ActorPassivated) PassivatedAt() time.Time

PassivatedAt returns the time the actor was passivated.

type ActorReinstated

type ActorReinstated struct {
	// contains filtered or unexported fields
}

ActorReinstated is triggered when an actor is reinstated

func NewActorReinstated

func NewActorReinstated(address string) *ActorReinstated

NewActorReinstated creates a new ActorReinstated event stamped with the current UTC time.

func (*ActorReinstated) Address

func (a *ActorReinstated) Address() string

Address returns the actor's address.

func (*ActorReinstated) ReinstatedAt

func (a *ActorReinstated) ReinstatedAt() time.Time

ReinstatedAt returns the time the actor was reinstated.

type ActorRestarted

type ActorRestarted struct {
	// contains filtered or unexported fields
}

ActorRestarted defines the actor restarted event

func NewActorRestarted

func NewActorRestarted(address string) *ActorRestarted

NewActorRestarted creates a new ActorRestarted event stamped with the current UTC time.

func (*ActorRestarted) Address

func (a *ActorRestarted) Address() string

Address returns the actor's address.

func (*ActorRestarted) RestartedAt

func (a *ActorRestarted) RestartedAt() time.Time

RestartedAt returns the time the actor was restarted.

type ActorStarted

type ActorStarted struct {
	// contains filtered or unexported fields
}

ActorStarted defines the actor started event

func NewActorStarted

func NewActorStarted(address string) *ActorStarted

NewActorStarted creates a new ActorStarted event stamped with the current UTC time.

func (*ActorStarted) Address

func (a *ActorStarted) Address() string

Address returns the actor's address.

func (*ActorStarted) StartedAt

func (a *ActorStarted) StartedAt() time.Time

StartedAt returns the time the actor started.

type ActorStopped

type ActorStopped struct {
	// contains filtered or unexported fields
}

ActorStopped defines the actor stopped event

func NewActorStopped

func NewActorStopped(address string) *ActorStopped

NewActorStopped creates a new ActorStopped event stamped with the current UTC time.

func (*ActorStopped) Address

func (a *ActorStopped) Address() string

Address returns the actor's address.

func (*ActorStopped) StoppedAt

func (a *ActorStopped) StoppedAt() time.Time

StoppedAt returns the time the actor stopped.

type ActorSuspended

type ActorSuspended struct {
	// contains filtered or unexported fields
}

ActorSuspended defines the actor suspended event

func NewActorSuspended

func NewActorSuspended(address, reason string) *ActorSuspended

NewActorSuspended creates a new ActorSuspended event stamped with the current UTC time.

func (*ActorSuspended) Address

func (a *ActorSuspended) Address() string

Address returns the actor's address.

func (*ActorSuspended) Reason

func (a *ActorSuspended) Reason() string

Reason returns the suspension reason.

func (*ActorSuspended) SuspendedAt

func (a *ActorSuspended) SuspendedAt() time.Time

SuspendedAt returns the time the actor was suspended.

type ActorSystem

type ActorSystem interface {
	// Metric retrieves the current set of runtime metrics for the actor system.
	//
	// This includes local actor system metrics such as the number of actors,
	// mailbox sizes, and message throughput. It does not include metrics
	// from other nodes in a distributed or clustered environment.
	//
	// Use this method for monitoring and debugging purposes within a single node.
	//
	// The provided context can be used to control timeouts or cancellations
	// of any background operations involved in collecting the metrics.
	Metric(ctx context.Context) *Metric
	// Name returns the actor system name
	Name() string
	// Actors retrieves all active actors visible to this node.
	// Local actors are returned as live PIDs. When cluster mode is enabled,
	// actors on peer nodes are returned as lightweight remote PIDs that carry
	// only the address and a remoting handle, routing all messaging through
	// the remoting layer. Use pid.IsLocal() / pid.IsRemote() to distinguish them.
	// The timeout bounds the cluster scan; it is ignored when not in cluster mode.
	// Use this method cautiously as the cluster scan may impact system performance.
	Actors(ctx context.Context, timeout time.Duration) ([]*PID, error)
	// Start initializes the actor system.
	// To guarantee a clean shutdown during unexpected system terminations,
	// developers must handle SIGTERM and SIGINT signals appropriately and invoke Stop.
	Start(ctx context.Context) error
	// Stop stops the actor system and does not terminate the program.
	// One needs to explicitly call os.Exit to terminate the program.
	Stop(ctx context.Context) error
	// Spawn creates and starts a new actor in the local actor system.
	//
	// The actor will be registered under the given `name`, allowing other actors
	// or components to send messages to it using the returned *PID. If an actor
	// with the same name already exists in the local system, an error will be returned.
	//
	// This method is location-transparent: with options such as WithHostAndPort, the actor
	// may be spawned on a remote node when remoting is enabled; otherwise it is created locally.
	//
	// Parameters:
	//   - ctx: A context used to control cancellation and timeouts during the spawn process.
	//   - name: A unique identifier for the actor within the local actor system.
	//   - actor: An instance implementing the Actor interface, representing the behavior and lifecycle of the actor.
	//   - opts: Optional SpawnOptions to customize the actor's behavior (e.g., dependency, mailbox, supervisor strategy).
	//
	// Returns:
	//   - *PID: A pointer to the Process ID of the spawned actor, used for message passing.
	//   - error: An error if the actor could not be spawned (e.g., name conflict, invalid configuration).
	//
	// Example:
	//
	//   pid, err := system.Spawn(ctx, "user-service", NewUserActor())
	//   if err != nil {
	//       log.Fatalf("Failed to spawn actor: %v", err)
	//   }
	//
	// Note: For cluster placement strategies (e.g., Random, LeastLoad), use SpawnOn instead.
	Spawn(ctx context.Context, name string, actor Actor, opts ...SpawnOption) (*PID, error)
	// SpawnOn creates and starts an actor locally, on another node in the current cluster,
	// or on a node in a different data center, depending on options and actor system configuration.
	//
	// # Cross–data center placement
	//
	// When opts include WithDataCenter, the actor is spawned on a node in that data center.
	// Placement is a random node among the target DC's advertised remoting endpoints: SpawnOn
	// calls spawnOnDatacenter, which requires DataCenterReady(), looks up the target DC by the
	// given datacenter.DataCenter's ID() in the controller's active records, then sends a
	// RemoteSpawn to one of that record's Endpoints chosen at random. There is no leader
	// selection—which node runs the actor depends entirely on which addresses the target DC
	// registered (via datacenter.Config.Endpoints). If that DC advertises only its leader, every
	// cross-DC spawn goes to the leader; if it advertises all nodes, the actor is placed on a
	// random node in that DC. The actor kind must be registered on the target data center's
	// actor systems. See WithDataCenter and spawnOnDatacenter for details and errors
	// (e.g. ErrDataCenterNotReady, ErrDataCenterStaleRecords, ErrDataCenterRecordNotFound).
	//
	// # Same–data center placement
	//
	// When no target data center is specified, behavior depends on cluster mode:
	//
	//   - In cluster mode, the actor may be placed on any node in the local cluster according to
	//     the placement strategy and role filter. Supported strategies:
	//     RoundRobin, Random, Local, LeastLoad. Placement uses cluster.Members and thus stays
	//     within the current data center when the architecture is one-cluster-per-DC.
	//   - In non-cluster mode, the actor is created on the local actor system, like Spawn.
	//
	// Unlike Spawn, SpawnOn does not return a PID. Use ActorOf to resolve the actor's PID or
	// Address after it has been successfully created.
	//
	// Parameters:
	//   - ctx: Context for cancellation and timeouts during the spawn process.
	//   - name: A globally unique name for the actor in the cluster or across data centers.
	//   - actor: An instance implementing the Actor interface.
	//   - opts: Optional SpawnOptions: placement strategy, WithDataCenter for cross-DC spawn,
	//     role, dependencies, mailbox, supervisor strategy, etc.
	//
	// Returns:
	//   - error: An error if the actor could not be spawned (e.g., name conflict, network or
	//     remoting failure, misconfiguration; or, when using WithDataCenter, data center
	//     not ready, stale records, or target DC not found).
	//
	// Example (same-DC, cluster placement):
	//
	//	err := system.SpawnOn(ctx, "actor-1", NewCartActor(), WithPlacement(Random))
	//	if err != nil {
	//	    log.Fatalf("Failed to spawn actor: %v", err)
	//	}
	//
	// Example (cross-DC):
	//
	//	dc := &datacenter.DataCenter{Name: "dc-west", Region: "us-west-2"}
	//	err := system.SpawnOn(ctx, "actor-1", NewCartActor(), WithDataCenter(dc))
	//	if err != nil {
	//	    log.Fatalf("Failed to spawn actor in dc-west: %v", err)
	//	}
	//
	// ⚠️ Note: The created actor uses the default mailbox from the actor system unless overridden in opts.
	SpawnOn(ctx context.Context, name string, actor Actor, opts ...SpawnOption) (*PID, error)
	// SpawnFromFunc creates an actor with the given receive function. One can set the PreStart and PostStop lifecycle hooks
	// in the given optional options
	SpawnFromFunc(ctx context.Context, receiveFunc ReceiveFunc, opts ...FuncOption) (*PID, error)
	// SpawnNamedFromFunc creates an actor with the given receive function and provided name. One can set the PreStart and PostStop lifecycle hooks
	// in the given optional options
	SpawnNamedFromFunc(ctx context.Context, name string, receiveFunc ReceiveFunc, opts ...FuncOption) (*PID, error)
	// SpawnRouter creates and initializes a new router actor with the specified options.
	//
	// A router is a special type of actor designed to distribute messages of the same type
	// across a group of routee actors. This enables concurrent message processing and improves
	// throughput by leveraging multiple actors in parallel.
	//
	// Each individual actor in the group (a "routee") processes only one message at a time,
	// preserving the actor model’s single-threaded execution semantics.
	//
	// Note: Routers are **not** redeployable. If the host node of a router leaves the cluster
	// or crashes, the router and its routees will not be automatically re-spawned elsewhere.
	//
	// Use routers when you need to fan out work across multiple workers while preserving
	// the isolation and safety guarantees of the actor model.
	SpawnRouter(ctx context.Context, name string, poolSize int, routeesKind Actor, opts ...RouterOption) (*PID, error)
	// SpawnSingleton creates a singleton actor in the system.
	//
	// A singleton actor is instantiated when cluster mode is enabled.
	// A singleton actor like any other actor is created only once within the system and in the cluster.
	// A singleton actor is created with the default supervisor strategy and directive.
	// A singleton actor once created lives throughout the lifetime of the given actor system.
	//
	// The cluster singleton is automatically started on the oldest node in the cluster.
	// If the oldest node leaves the cluster, the singleton is restarted on the new oldest node.
	// This is useful for managing shared resources or coordinating tasks that should be handled by a single actor.
	SpawnSingleton(ctx context.Context, name string, actor Actor, opts ...ClusterSingletonOption) (*PID, error)
	// Kill stops a given actor in the system either locally or on a remote node(when clustering is enabled)
	Kill(ctx context.Context, name string) error
	// ReSpawn recreates a given actor in the system.
	//
	// During restart all messages that are in the mailbox and not yet processed will be ignored.
	// Only the direct alive children of the given actor will be shutdown and respawned with their initial state.
	// Bear in mind that restarting an actor will reinitialize the actor to initial state.
	// In case any of the direct child restart fails the given actor will not be started at all.
	//
	// This method is location-transparent: it works identically whether the actor is local or on a
	// remote node (when clustering/remoting is enabled).
	ReSpawn(ctx context.Context, name string) (*PID, error)
	// NumActors returns the total number of active actors on a given running node.
	// This does not account for the total number of actors in the cluster
	NumActors() uint64
	// ActorOf retrieves an existing actor within the local system or across the cluster if clustering is enabled.
	//
	// If the actor is found locally, its live PID is returned. If the actor resides on a remote node (cluster
	// mode enabled), a lightweight remote PID is returned; it carries only the actor's address and a remoting
	// handle, and routes all messaging operations through the remoting layer. If the actor is not found, an
	// error of type "actor not found" is returned.
	//
	// Use pid.IsLocal() / pid.IsRemote() to distinguish the two cases when location matters.
	ActorOf(ctx context.Context, actorName string) (*PID, error)
	// ActorExists checks whether an actor with the given name exists in the system,
	// either locally, or on another node in the cluster if clustering is enabled.
	ActorExists(ctx context.Context, actorName string) (exists bool, err error)
	// InCluster states whether the actor system has started within a cluster of nodes
	InCluster() bool
	// Partition returns the partition where a given actor is located
	Partition(actorName string) uint64
	// Subscribe creates an event subscriber to consume events from the actor system.
	// Remember to use the Unsubscribe method to avoid resource leakage.
	Subscribe() (eventstream.Subscriber, error)
	// Unsubscribe unsubscribes a subscriber.
	Unsubscribe(subscriber eventstream.Subscriber) error
	// ScheduleOnce schedules a one-time delivery of a message to the specified actor (PID) after a given delay.
	//
	// The message will be sent exactly once to the target actor after the specified duration has elapsed.
	// This is a fire-and-forget scheduling mechanism — once delivered, the message will not be retried or repeated.
	//
	// This method is location-transparent: it works identically whether the target actor is local or on a
	// remote node (when clustering/remoting is enabled).
	//
	// Parameters:
	//	  - ctx: The context for managing cancellation and deadlines.
	//   - message: The proto.Message to be sent.
	//   - pid: The PID of the actor that will receive the message.
	//   - delay: The duration to wait before delivering the message.
	//   - opts: Optional ScheduleOption values such as WithReference to control scheduling behavior.
	//
	// Returns:
	//   - error: An error is returned if scheduling fails due to invalid input or internal errors.
	//
	// Note:
	//   - It's strongly recommended to set a unique reference ID using WithReference if you intend to cancel, pause, or resume the message later.
	//   - If no reference is set, an automatic one will be generated, which may not be easily retrievable.
	ScheduleOnce(ctx context.Context, message any, pid *PID, delay time.Duration, opts ...ScheduleOption) error
	// Schedule schedules a recurring message to be delivered to the specified actor (PID) at a fixed interval.
	//
	// This function sets up a message to be sent repeatedly to the target actor, with each delivery occurring
	// after the specified interval. The scheduling continues until explicitly canceled or if the actor is no longer available.
	//
	// This method is location-transparent: it works identically whether the target actor is local or on a
	// remote node (when clustering/remoting is enabled).
	//
	// Parameters:
	//	  - ctx: The context for managing cancellation and deadlines.
	//   - message: The proto.Message to be delivered at regular intervals.
	//   - pid: The PID of the actor that will receive the message.
	//   - interval: The time duration between each delivery of the message.
	//   - opts: Optional ScheduleOption values such as WithReference to control scheduling behavior.
	//
	// Returns:
	//   - error: An error is returned if the message could not be scheduled due to invalid input or internal issues.
	//
	// Note:
	//   - It's strongly recommended to set a unique reference ID using WithReference if you plan to cancel, pause, or resume the scheduled message.
	//   - If no reference is set, an automatic one will be generated internally, which may not be easily retrievable for later operations.
	//   - This function does not provide built-in delivery guarantees such as at-least-once or exactly-once semantics; ensure idempotency where needed.
	Schedule(ctx context.Context, message any, pid *PID, interval time.Duration, opts ...ScheduleOption) error
	// ScheduleWithCron schedules a message to be delivered to the specified actor (PID) using a cron expression.
	//
	// This method enables flexible time-based scheduling using standard cron syntax, allowing you to specify complex recurring schedules.
	// The message will be sent to the target actor according to the schedule defined by the cron expression.
	//
	// This method is location-transparent: it works identically whether the target actor is local or on a
	// remote node (when clustering/remoting is enabled).
	//
	// Parameters:
	//	  - ctx: The context for managing cancellation and deadlines.
	//   - message: The proto.Message to be delivered.
	//   - pid: The PID of the actor that will receive the message.
	//   - cronExpression: A standard cron-formatted string (e.g., "0 */5 * * * *") representing the schedule.
	//   - opts: Optional ScheduleOption values such as WithReference to control scheduling behavior.
	//
	// Returns:
	//   - error: An error is returned if the cron expression is invalid or if scheduling fails due to internal errors.
	//
	// Note:
	//   - It's strongly recommended to set a unique reference ID using WithReference if you plan to cancel, pause, or resume the scheduled message.
	//   - If no reference is set, an automatic one will be generated internally, which may not be easily retrievable for future operations.
	//   - The cron expression must follow the format supported by the scheduler (typically 6 or 5 fields depending on implementation).
	ScheduleWithCron(ctx context.Context, message any, pid *PID, cronExpression string, opts ...ScheduleOption) error
	// RemoteScheduleWithCron schedules a message to be sent to a remote actor according to a cron expression.
	//
	// This method allows scheduling messages to remote actors using flexible cron-based timing,
	// enabling complex recurring schedules for message delivery.
	// Remoting must be enabled in the actor system for this method to work.
	//
	// Parameters:
	//	  - ctx: The context for managing cancellation and deadlines.
	//   - message: The proto.Message to be delivered according to the cron schedule.
	//   - receiver: The address.Address of the remote actor that will receive the message.
	//   - cronExpression: A standard cron-formatted string defining the schedule (e.g., "0 0 * * *").
	//   - opts: Optional ScheduleOption values such as WithReference to control scheduling behavior.
	//
	// Returns:
	//   - error: An error is returned if the cron expression is invalid, remoting is disabled, or scheduling fails.
	//
	// Note:
	//   - Remoting must be enabled in the actor system for this functionality.
	//   - It's strongly recommended to set a unique reference ID using WithReference if you intend to cancel, pause, or resume the scheduled message.
	//   - If no reference is set, an automatic one will be generated internally and may not be easily retrievable.
	//   - The cron expression must conform to the scheduler’s supported format (usually 5 or 6 fields).
	// CancelSchedule cancels a previously scheduled message intended for delivery to a target actor (PID).
	//
	// It attempts to locate and cancel the scheduled task associated with the specified message reference.
	// If the scheduled message cannot be found, has already been delivered, or was previously canceled, an error is returned.
	//
	// Parameters:
	//   - reference: The message reference previously used when scheduling the message
	//
	// Returns:
	//   - error: An error is returned if the scheduled message could not be found or canceled.
	CancelSchedule(reference string) error
	// PauseSchedule pauses a previously scheduled message that was set to be delivered to a target actor (PID).
	//
	// This function temporarily halts the delivery of the scheduled message. It can be resumed later using a corresponding resume mechanism,
	// depending on the scheduler's capabilities. If the message has already been delivered or cannot be found, an error is returned.
	//
	// Parameters:
	//   - reference: The message reference previously used when scheduling the message
	//
	// Returns:
	//   - error: An error is returned if the scheduled message cannot be found, has already been delivered, or cannot be paused.
	PauseSchedule(reference string) error
	// ResumeSchedule resumes a previously paused scheduled message intended for delivery to a target actor (PID).
	//
	// This function reactivates a scheduled message that was previously paused, allowing it to continue toward delivery.
	// If the message has already been delivered, canceled, or cannot be found, an error is returned.
	//
	// Parameters:
	//   - reference: The message reference previously used when scheduling the message
	//
	// Returns:
	//   - error: An error is returned if the scheduled message cannot be found, was never paused, has already been delivered, or cannot be resumed.
	ResumeSchedule(reference string) error
	// PeersAddress returns the actor system address known in the cluster. That address is used by other nodes to communicate with the actor system.
	// This address is empty when cluster mode is not activated
	PeersAddress() string
	// Register registers an actor for future use. This is necessary when creating an actor remotely
	Register(ctx context.Context, actor Actor) error
	// Deregister removes a registered actor from the registry
	Deregister(ctx context.Context, actor Actor) error
	// Logger returns the logger sets when creating the actor system
	Logger() log.Logger
	// Host returns the actor system node host address
	// This is the bind address for remote communication
	Host() string
	// Port returns the actor system node port.
	// This is the bind port for remote communication
	Port() int
	// Uptime is the number of seconds since the actor system started
	Uptime() int64
	// Running returns true when the actor system is running
	Running() bool
	// Run starts the actor system, blocks on the signals channel, and then
	// gracefully shuts the application down.
	// It's designed to make typical applications simple to run.
	// All of Run's functionality is implemented in terms of the exported
	// Start and Stop methods. Applications with more specialized needs
	// can use those methods directly instead of relying on Run.
	Run(ctx context.Context, startHook func(ctx context.Context) error, stopHook func(ctx context.Context) error)
	// TopicActor returns the topic actor, a system-managed actor responsible for handling
	// publish-subscribe (pub-sub) functionality within the actor system.
	//
	// The topic actor maintains a registry of subscribers (actors) per topic and ensures
	// that messages published to a topic are delivered to all registered subscribers.
	//
	// Requirements:
	//   - PubSub mode must be enabled via the WithPubSub() option when initializing the actor system.
	//
	// Cluster Behavior:
	//   - In cluster mode, messages published to a topic on one node are forwarded to other nodes,
	//     but only once per topic actor with active local subscribers. This ensures efficient message
	//     propagation without redundant network traffic.
	//
	// Usage:
	//   system := NewActorSystem(WithPubSub())
	//
	// Returns the actor reference for the topic actor.
	TopicActor() *PID
	// Extensions returns a slice of all registered extensions in the ActorSystem.
	//
	// This allows system-level introspection or iteration over all available extensions.
	// It can be useful for diagnostics, monitoring, or applying configuration across all extensions.
	//
	// Returns:
	//   - []extension.Extension: A list of all registered extensions.
	Extensions() []extension.Extension
	// Extension retrieves a specific registered extension by its unique ID.
	//
	// This method allows actors or system components to access a specific Extension
	// instance that was previously registered using WithExtensions.
	//
	// Parameters:
	//   - extensionID: The unique identifier of the extension to retrieve.
	//
	// Returns:
	//   - extension.Extension: The registered extension with the given ID, or nil if not found.
	Extension(extensionID string) extension.Extension
	// Inject provides a way to register shared dependencies with the ActorSystem.
	//
	// These dependencies — such as clients, services, or repositories — will be injected
	// into actors that declare them as required when using SpawnOptions.
	//
	// This mechanism ensures actors are provisioned consistently with the resources they
	// depend on, even when they're relocated (e.g., during failover or rescheduling in a
	// distributed cluster environment).
	//
	// Actors can retrieve these dependencies via their context, enabling decoupled,
	// testable, and runtime-injected configurations.
	//
	// Returns an error if the actor system has not started.
	Inject(dependencies ...extension.Dependency) error
	// GrainIdentity retrieves or activates a Grain (virtual actor) identified by the given name.
	//
	// This method ensures that a Grain with the specified name exists in the system. If the Grain is not already active,
	// it will be created using the provided factory function. Grains are virtual actors that are automatically managed
	// and can be transparently activated or deactivated based on usage.
	//
	// Parameters:
	//   - ctx: Context for cancellation and timeout control.
	//   - name: The unique name identifying the Grain.
	//   - factory: A function that creates a new Grain instance if activation is required.
	//	 - opts: Optional GrainOptions to customize the Grain's activation behavior (e.g., activation timeout, retries).
	//
	// Returns:
	//   - *GrainIdentity: The identity object representing the located or newly activated Grain.
	//   - error: An error if the Grain could not be found, created, or activated.
	//
	// Note:
	//   - This method abstracts away the details of Grain lifecycle management.
	//   - Use this to obtain a reference to a Grain for message passing or further operations.
	GrainIdentity(ctx context.Context, name string, factory GrainFactory, opts ...GrainOption) (*GrainIdentity, error)
	// AskGrain sends a synchronous request message to a Grain (virtual actor) identified by the given identity.
	//
	// This method locates or activates the target Grain (locally or in the cluster), sends the provided
	// protobuf message, and waits for a response or error. The request will block until a response is received,
	// the context is canceled, or the timeout elapses.
	//
	// Parameters:
	//   - ctx: Context for cancellation and timeout control.
	//   - identity: The unique identity of the Grain.
	//   - message: The protobuf message to send to the Grain.
	//   - timeout: The maximum duration to wait for a response.
	//
	// Returns:
	//   - response: The response message from the Grain, if successful.
	//   - error: An error if the request fails, times out, or the system is not started.
	AskGrain(ctx context.Context, identity *GrainIdentity, message any, timeout time.Duration) (response any, err error)
	// TellGrain sends an asynchronous message to a Grain (virtual actor) identified by the given identity.
	//
	// This method locates or activates the target Grain (locally or in the cluster) and delivers the provided
	// protobuf message without waiting for a response. Use this for fire-and-forget scenarios where no reply is expected.
	//
	// Parameters:
	//   - ctx: Context for cancellation and timeout control.
	//   - identity: The unique identity of the Grain.
	//   - message: The protobuf message to send to the Grain.
	//
	// Returns:
	//   - error: An error if the message could not be delivered or the system is not started.
	TellGrain(ctx context.Context, identity *GrainIdentity, message any) error
	// Grains retrieves a list of all active Grains (virtual actors) in the system.
	//
	// Grains are virtual actors that are automatically managed by the actor system. This method returns a slice of
	// GrainIdentity objects representing the currently active Grains. In cluster mode, it attempts to aggregate Grains
	// across all nodes in the cluster; if the cluster request fails, only locally active Grains will be returned.
	//
	// Use this method with caution, as scanning for all Grains (especially in a large cluster) may impact system performance.
	// The timeout parameter defines the maximum duration for cluster-based requests before they are terminated.
	//
	// Parameters:
	//   - ctx: Context for cancellation and timeout control.
	//   - timeout: The maximum duration to wait for cluster-based queries.
	//
	// Returns:
	//   - []*GrainIdentity: A slice of GrainIdentity objects for all active Grains.
	//
	// Note:
	//   - This method abstracts away the details of Grain lifecycle management.
	//   - Use this to obtain references to all active Grains for monitoring, diagnostics, or administrative purposes.
	Grains(ctx context.Context, timeout time.Duration) []*GrainIdentity
	// NoSender returns a special PID that represents an anonymous / absent sender.
	//
	// Use this PID when sending or scheduling messages for which no sender is expected. The PID
	// is meaningful only for local messaging and is not routable across the network.
	//
	// In remote scenarios use address.NoSender(), which encodes the appropriate
	// network address semantics for a no-sender value.
	//
	// Notes:
	//  - The returned PID should be used as the Sender in envelopes, not as a target
	//    destination for Send operations.
	//  - The value is stable for local use and intended to explicitly indicate the
	//    absence of a sender (as opposed to nil).
	NoSender() *PID
	// PeersPort returns the port known in the cluster.
	// That port is used by other nodes to communicate with this actor system.
	// This port is zero when cluster mode is not activated
	PeersPort() int
	// DiscoveryPort returns the port used for service discovery.
	// This port is zero when cluster mode is not activated
	DiscoveryPort() int
	// Peers returns a best-effort snapshot of currently known peer nodes in the cluster.
	//
	// Behavior:
	//   - Requires cluster mode. If clustering is disabled, an empty slice is returned
	//     and ErrClusterDisabled may be reported depending on the implementation.
	//   - Returns an eventually consistent view of membership as observed by this node.
	//     The result can be stale or incomplete while membership information propagates.
	//   - If the lookup exceeds the provided timeout or fails, a partial snapshot may
	//     be returned (possibly empty) along with a non-nil error.
	//
	// Concurrency and Ordering:
	//   - The returned slice is a point-in-time snapshot. Cluster membership can change
	//     immediately after the call returns.
	//   - The order of peers is unspecified and should not be relied upon.
	//
	// Parameters:
	//   - ctx: Context for cancellation and deadline control.
	//   - timeout: Maximum duration allowed for cluster membership lookup.
	//
	// Returns:
	//   - []*remote.Peer: Zero or more peer descriptors known to this node at call time.
	//   - error: May be non-nil if the lookup timed out, was canceled, or another error occurred.
	//
	// Possible Errors:
	//   - ErrActorSystemNotStarted: The actor system has not been started.
	//   - ErrClusterDisabled: Clustering is not enabled for this node.
	//   - context.DeadlineExceeded: The operation exceeded the supplied timeout.
	//   - context.Canceled: The context was canceled.
	//   - Other transient network/storage errors from the underlying cluster engine.
	//
	// Example:
	//   ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	//   defer cancel()
	//
	//   peers, err := system.Peers(ctx, 2*time.Second)
	//   if err != nil {
	//       system.Logger().Warnf("peer lookup returned an error: %v", err)
	//   }
	//   for _, p := range peers {
	//       // Use p (e.g., p.Path(), p.Host(), p.Port(), etc.)
	//   }
	Peers(ctx context.Context, timeout time.Duration) ([]*remote.Peer, error)
	// IsLeader returns true if the current node is the cluster leader.
	//
	// Behavior:
	//   - Requires cluster mode. If clustering is disabled, false is returned
	//     along with ErrClusterDisabled.
	//   - The leader is determined by the underlying cluster engine and may change
	//     over time as nodes join/leave or failures occur.
	//
	// Parameters:
	//   - ctx: Context for cancellation and deadline control.
	//
	// Returns:
	//   - bool: True if this node is the current cluster leader; false otherwise.
	//   - error: May be non-nil if clustering is disabled or another error occurred.
	//
	// Possible Errors:
	//   - ErrActorSystemNotStarted: The actor system has not been started.
	//   - ErrClusterDisabled: Clustering is not enabled for this node.
	//   - context.Canceled: The context was canceled.
	//   - Other transient errors from the underlying cluster engine.
	//
	// Example:
	//   ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	//   defer cancel()
	//
	//   isLeader, err := system.IsLeader(ctx)
	//   if err != nil {
	//       system.Logger().Errorf("failed to determine cluster leader status: %v", err)
	//   } else if isLeader {
	//       system.Logger().Info("this node is the cluster leader")
	//   } else {
	//       system.Logger().Info("this node is not the cluster leader")
	//   }
	IsLeader(ctx context.Context) (bool, error)
	// Leader returns the current cluster leader peer information.
	//
	// Behavior:
	//   - Requires cluster mode. If clustering is disabled, nil is returned
	//     along with ErrClusterDisabled.
	//   - The leader is determined by the underlying cluster engine and may change
	//     over time as nodes join/leave or failures occur.
	//
	// Parameters:
	//   - ctx: Context for cancellation and deadline control.
	//
	// Returns:
	//   - *remote.Peer: The current cluster leader peer, or nil if not available.
	//   - error: May be non-nil if clustering is disabled or another error occurred.
	//
	// Possible Errors:
	//   - ErrActorSystemNotStarted: The actor system has not been started.
	//   - ErrClusterDisabled: Clustering is not enabled for this node.
	//   - context.Canceled: The context was canceled.
	//   - Other transient errors from the underlying cluster engine.
	// Example:
	//   ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	//   defer cancel()
	//
	//   leader, err := system.Leader(ctx)
	//   if err != nil {
	//       system.Logger().Errorf("failed to retrieve cluster leader: %v", err)
	//   } else if leader != nil {
	//       system.Logger().Infof("current cluster leader is at %s", leader.RemotingAddress())
	//   } else {
	//       system.Logger().Info("no cluster leader is currently elected")
	//   }
	Leader(ctx context.Context) (leader *remote.Peer, err error)
	// DataCenterReady reports whether the multi-datacenter controller is operational.
	//
	// The controller is considered ready when:
	//   - Multi-DC mode is enabled (cluster mode with a datacenter config)
	//   - The controller has started successfully
	//   - The cache has been refreshed at least once
	//
	// Returns true immediately if multi-DC mode is not enabled, as there is no
	// DC controller to wait for.
	//
	// This method is intended for use in readiness probes (e.g., Kubernetes readinessProbe)
	// to gate traffic until the controller has a usable view of active data centers.
	//
	// Note: Ready does not guarantee the cache is fresh; the controller uses
	// MaxCacheStaleness internally to determine routing behavior.
	DataCenterReady() bool
	// DataCenterLastRefresh returns the time of the last successful datacenter cache refresh.
	//
	// Returns the zero time if:
	//   - Multi-DC mode is not enabled
	//   - The cache has never been refreshed
	//
	// This can be used for debugging, monitoring, or custom readiness logic that
	// requires more granular control than DataCenterReady provides.
	DataCenterLastRefresh() time.Time
	// RegisterGrainKind registers a Grain kind in the local registry.
	//
	// Registration associates the Grain's kind (as returned by the Grain implementation) with the
	// factory/metadata used by the actor system to instantiate that kind on demand.
	//
	// This is required for:
	//   - Remote activation/recreation: when another node asks this node to activate a Grain of a given kind.
	//   - Lazy/local activation: when a GrainIdentity is resolved locally via kind lookup.
	RegisterGrainKind(ctx context.Context, kind Grain) error
	// DeregisterGrainKind removes a previously registered Grain kind from the local registry.
	//
	// Deregistration affects future activations that rely on kind lookup (e.g. remote activation/recreation
	// requests and lazy/local activation when a GrainIdentity is resolved by kind). It does not stop or
	// deactivate already-running Grain instances of that kind; it only prevents new activations via the
	// registry.
	DeregisterGrainKind(ctx context.Context, kind Grain) error
	// contains filtered or unexported methods
}

ActorSystem defines the contract of an actor system

func NewActorSystem

func NewActorSystem(name string, opts ...Option) (ActorSystem, error)

NewActorSystem creates and configures a new ActorSystem instance.

The actor system is the root container for all actors on a node. Only one ActorSystem can exist per process. In cluster mode, the system name must be identical across all nodes.

Options allow customization of logging, clustering, remoting, pub/sub, TLS, extensions, and more. The returned ActorSystem is not started; use Start or Run to initialize it.

Parameters:

  • name: Unique name for the actor system (required; must match across cluster nodes).
  • opts: Optional configuration options (see WithLogger, WithCluster, WithPubSub, etc).

Returns:

  • ActorSystem: The configured actor system instance.
  • error: If the name is invalid, options are misconfigured, or required settings are missing.

Example:

system, err := NewActorSystem("my-system", WithLogger(myLogger), WithCluster(clusterConfig))
if err != nil {
    log.Fatalf("Failed to create actor system: %v", err)
}
if err := system.Start(ctx); err != nil {
    log.Fatalf("Failed to start actor system: %v", err)
}

Notes:

  • The actor system must be started before spawning actors.
  • In cluster mode, ensure all nodes use the same system name and compatible options.
  • Use Run for typical applications to handle signals

type AdjustRouterPoolSize

type AdjustRouterPoolSize struct {
	// contains filtered or unexported fields
}

AdjustRouterPoolSize requests a runtime delta resize of a router's routee pool.

Semantics (delta-based):

  • PoolSize > 0: Scale up by PoolSize (spawn that many additional routees).
  • PoolSize < 0: Scale down by |PoolSize| (stop that many existing routees).
  • PoolSize == 0: No-op.

Notes:

  • The resulting pool size never goes below 0. If |PoolSize| exceeds the current size, the pool becomes 0.
  • This is not idempotent: sending +N twice grows by 2N. Prefer absolute-size semantics at the call site if needed.
  • Resizes are processed in mailbox order; intermediate sizes may be observed while scaling is in progress.
  • Routing continues during resize using currently available routees.

Constraints:

  • Negative values are allowed and indicate downsizing.
  • Routees are local to the actor system hosting the router.
  • Supervisory directives (restart/resume/stop) apply to failures, not to normal downsizing.

Verification:

  • Use GetRoutees after some delay to observe effective membership.

Examples (Go):

// Grow by 5
ctx.Tell(routerPID, actor.NewAdjustRouterPoolSize(5))
// Shrink by 3
ctx.Tell(routerPID, actor.NewAdjustRouterPoolSize(-3))

func NewAdjustRouterPoolSize

func NewAdjustRouterPoolSize(poolSize int32) *AdjustRouterPoolSize

NewAdjustRouterPoolSize creates an AdjustRouterPoolSize message with the given signed delta.

  • poolSize > 0 increases the pool by that many routees.
  • poolSize < 0 decreases the pool by the absolute value.
  • poolSize == 0 is a no-op.

func (*AdjustRouterPoolSize) PoolSize

func (a *AdjustRouterPoolSize) PoolSize() int32

PoolSize returns the signed delta to apply to the router's pool.

type Behavior

type Behavior func(ctx *ReceiveContext)

Behavior defines an actor behavior

type BoundedMailbox

type BoundedMailbox struct {
	// contains filtered or unexported fields
}

BoundedMailbox is a bounded, blocking MPSC mailbox backed by a ring buffer.

Characteristics - Bounded capacity: the queue has a fixed size. - Blocking semantics:

  • Enqueue blocks when the mailbox is full until space becomes available or the mailbox is disposed.
  • Dequeue blocks when the mailbox is empty until a message is available or the mailbox is disposed.

- Concurrency: safe for multiple producers (MPSC) and a single consumer. - FIFO ordering: messages are dequeued in the order they were enqueued.

Use this mailbox when you want strict, blocking backpressure with bounded capacity.

func NewBoundedMailbox

func NewBoundedMailbox(capacity int) *BoundedMailbox

NewBoundedMailbox creates a new bounded, blocking mailbox with the given capacity. Capacity must be a positive integer.

Behavior

  • When the mailbox reaches capacity, Enqueue blocks until space becomes available (or the mailbox is disposed).
  • When the mailbox is empty, Dequeue blocks until a message arrives (or the mailbox is disposed).

func (*BoundedMailbox) Dequeue

func (mailbox *BoundedMailbox) Dequeue() (msg *ReceiveContext)

Dequeue removes and returns the next message from the mailbox.

Semantics

  • Blocks when the mailbox is empty until a message is available or the mailbox is disposed.
  • FIFO order is preserved.

Concurrency - Intended for a single consumer; behavior follows the underlying ring buffer.

func (*BoundedMailbox) Dispose

func (mailbox *BoundedMailbox) Dispose()

Dispose releases resources held by the underlying ring buffer and unblocks any internal waiters maintained by it. Do not use the mailbox after calling Dispose.

func (*BoundedMailbox) Enqueue

func (mailbox *BoundedMailbox) Enqueue(msg *ReceiveContext) error

Enqueue inserts a message into the mailbox.

Semantics

  • Blocks when the mailbox is full until space is available or the mailbox is disposed.
  • Returns an error when the mailbox has been disposed or the underlying buffer reports a failure.

Concurrency - Safe for concurrent producers.

func (*BoundedMailbox) IsEmpty

func (mailbox *BoundedMailbox) IsEmpty() bool

IsEmpty reports whether the mailbox currently has no messages. This check is a snapshot and may change immediately under concurrency.

func (*BoundedMailbox) Len

func (mailbox *BoundedMailbox) Len() int64

Len returns the current number of messages in the mailbox. The value is a snapshot and may change immediately after the call under concurrency.

type Broadcast

type Broadcast struct {
	// contains filtered or unexported fields
}

Broadcast is used to send message to a router via Tell. The router will broadcast the message to all its routees given the routing strategy or the type of router used.

func NewBroadcast

func NewBroadcast(message any) *Broadcast

NewBroadcast creates a new Broadcast message.

func (*Broadcast) Message

func (b *Broadcast) Message() any

Message returns the inner message to broadcast to all routees.

type CacheLinePadding

type CacheLinePadding [64]byte

CacheLinePadding prevents false sharing between CPU cache lines

type ClusterConfig

type ClusterConfig struct {
	// contains filtered or unexported fields
}

ClusterConfig defines the cluster mode settings

func NewClusterConfig

func NewClusterConfig() *ClusterConfig

NewClusterConfig creates an instance of ClusterConfig

func (*ClusterConfig) Validate

func (x *ClusterConfig) Validate() error

Validate validates the cluster config

func (*ClusterConfig) WithBootstrapTimeout

func (x *ClusterConfig) WithBootstrapTimeout(timeout time.Duration) *ClusterConfig

WithBootstrapTimeout sets the timeout for the cluster bootstrap process.

This timeout determines the maximum duration the cluster will wait for all required nodes to join and complete the bootstrap sequence before considering the operation as failed. If the cluster does not bootstrap within this period, an error will be returned and the cluster will not start.

Use this option to control startup responsiveness in environments where cluster formation speed is critical, such as automated deployments or orchestrated environments. The default value is 10 seconds.

Example usage:

cfg := NewClusterConfig().WithBootstrapTimeout(15 * time.Second)

Returns the updated ClusterConfig instance for chaining.

func (*ClusterConfig) WithClusterBalancerInterval

func (x *ClusterConfig) WithClusterBalancerInterval(interval time.Duration) *ClusterConfig

WithClusterBalancerInterval sets the cluster balancer interval.

This interval controls how frequently the cluster balancer runs to evaluate and adjust actor/grain placement. It also drives when a rebalance epoch can be acknowledged, which in turn gates stable cluster events.

Relationship to WithClusterStateSyncInterval:

  • Keep the balancer interval shorter than the state sync interval so each routing epoch can complete before the next one starts. If the balancer interval is too large relative to the sync interval, epochs may overlap and stable cluster events can be delayed.

Recommended starting points:

  • Small/medium clusters: 1s to 5s balancer interval with 30s to 1m state sync.
  • Large/busy clusters: increase both intervals together (for example, 5s balancer with 1m to 2m state sync) to reduce overhead while keeping epochs from stacking.

Example usage:

cfg := NewClusterConfig().
	WithClusterStateSyncInterval(1 * time.Minute).
	WithClusterBalancerInterval(2 * time.Second)

Returns the updated ClusterConfig instance for chaining.

func (*ClusterConfig) WithClusterStateSyncInterval

func (x *ClusterConfig) WithClusterStateSyncInterval(interval time.Duration) *ClusterConfig

WithClusterStateSyncInterval sets the interval for syncing nodes' routing tables.

This interval determines how frequently the cluster synchronizes its routing tables across all nodes. Regular synchronization ensures that each node has an up-to-date view of the cluster topology, which is essential for accurate message routing and partition management.

It is important to set this interval to a value greater than the write timeout to avoid updating the routing table while a write operation is in progress. Setting the interval too low may increase network and processing overhead, while setting it too high may delay the propagation of cluster topology changes.

The default value is 1 minute, which provides a balance between consistency and resource usage. Adjust this value based on your cluster's size, network characteristics, and desired responsiveness.

Example usage:

cfg := NewClusterConfig().WithClusterStateSyncInterval(2 * time.Minute)

Returns the updated ClusterConfig instance for chaining.

func (*ClusterConfig) WithDataCenter

func (x *ClusterConfig) WithDataCenter(config *datacenter.Config) *ClusterConfig

WithDataCenter configures multi–data center (multi-DC) support for the cluster.

This option is only meaningful when cluster mode is enabled. The supplied config is stored on the cluster configuration and will be fully validated when the actor system starts (not here).

Expected minimum fields on config (see datacenter.Config for details):

  • ControlPlane: the control-plane implementation/driver to use
  • DataCenter.Name: the local datacenter identifier (e.g. "dc-1")

Endpoints for cross-DC routing are not set in config; the cluster leader registers the local DC with the control plane using cluster members' remoting addresses.

Example:

mdc := datacenter.NewConfig()
mdc.ControlPlane = controlPlane
mdc.DataCenter = datacenter.DataCenter{Name: "dc-1", Region: "us-east-1"}
cfg := NewClusterConfig().WithDataCenter(mdc)

Returns the updated ClusterConfig instance for chaining.

func (*ClusterConfig) WithDiscovery

func (x *ClusterConfig) WithDiscovery(discovery discovery.Provider) *ClusterConfig

WithDiscovery sets the cluster discovery provider

func (*ClusterConfig) WithDiscoveryPort

func (x *ClusterConfig) WithDiscoveryPort(port int) *ClusterConfig

WithDiscoveryPort sets the discovery port

func (*ClusterConfig) WithGrainActivationBarrier

func (x *ClusterConfig) WithGrainActivationBarrier(timeout time.Duration) *ClusterConfig

WithGrainActivationBarrier enables the grain activation barrier.

When enabled, grain activation will be delayed until the cluster has reached the configured minimum peers quorum (see WithMinimumPeersQuorum), or until the provided timeout elapses—whichever happens first.

This is useful during startup and rolling deployments to avoid activating grains while the cluster is still forming, which can reduce early churn and unnecessary rebalancing.

Timeout semantics:

  • timeout == 0: wait indefinitely for quorum
  • timeout > 0: wait up to the given duration, then proceed even if quorum has not been reached

Example:

cfg := NewClusterConfig().
	WithMinimumPeersQuorum(3).
	WithGrainActivationBarrier(10 * time.Second)

func (*ClusterConfig) WithGrains

func (x *ClusterConfig) WithGrains(grains ...Grain) *ClusterConfig

WithGrains sets the cluster grains

func (*ClusterConfig) WithKinds

func (x *ClusterConfig) WithKinds(kinds ...Actor) *ClusterConfig

WithKinds sets the cluster actor kinds

func (*ClusterConfig) WithMinimumPeersQuorum

func (x *ClusterConfig) WithMinimumPeersQuorum(minimumQuorum uint32) *ClusterConfig

WithMinimumPeersQuorum sets the cluster config minimum peers quorum

func (*ClusterConfig) WithPartitionCount

func (x *ClusterConfig) WithPartitionCount(count uint64) *ClusterConfig

WithPartitionCount sets the cluster config partition count. Partition count should be a prime number. ref: https://medium.com/swlh/why-should-the-length-of-your-hash-table-be-a-prime-number-760ec65a75d1

func (*ClusterConfig) WithPeersPort

func (x *ClusterConfig) WithPeersPort(peersPort int) *ClusterConfig

WithPeersPort sets the peers port

func (*ClusterConfig) WithReadQuorum

func (x *ClusterConfig) WithReadQuorum(count uint32) *ClusterConfig

WithReadQuorum sets the read quorum for quorum-replicated cluster reads.

Read quorum is the number of replicas consulted to satisfy a read. Increasing it generally reduces the probability of observing stale data, but may increase read latency and network fan-out.

Relationship to replication:

  • replicaCount controls how many replicas exist for each partition.
  • readQuorum controls how many of those replicas are consulted on reads.

Consistency guidance (ensure read/write overlap):

readQuorum + writeQuorum > replicaCount

Typical configurations:

  • replicaCount=1: readQuorum=1, writeQuorum=1
  • replicaCount=3: readQuorum=2, writeQuorum=2 (common “majority” choice)

Notes / constraints:

  • readQuorum must be >= 1 (validated)
  • readQuorum should be <= replicaCount (not currently validated here; if larger, reads may never reach quorum)

⚠️ This is an advanced knob; the defaults are sufficient for most deployments.

func (*ClusterConfig) WithReadTimeout

func (x *ClusterConfig) WithReadTimeout(timeout time.Duration) *ClusterConfig

WithReadTimeout sets the read timeout for cluster read operations.

This timeout specifies the maximum duration allowed for a read operation to complete before it is considered failed. If a read operation exceeds this duration, it will be aborted and an error will be returned. Adjust this value based on your cluster's expected workload and network conditions to balance responsiveness and reliability.

Example:

cfg := NewClusterConfig().WithReadTimeout(2 * time.Second)

Returns the updated ClusterConfig instance for chaining.

func (*ClusterConfig) WithReplicaCount

func (x *ClusterConfig) WithReplicaCount(count uint32) *ClusterConfig

WithReplicaCount sets the number of replicas maintained for each partition’s data.

Replica count is the core durability/availability knob for quorum-based replication: increasing it typically improves fault tolerance and read availability, but also increases write fan-out and resource usage.

Relationship to quorums:

  • WithWriteQuorum(N) controls how many replicas must acknowledge a write.
  • WithReadQuorum(N) controls how many replicas are consulted to satisfy a read.

To avoid stale reads, configure quorums so that read and write sets overlap:

readQuorum + writeQuorum > replicaCount

Additional guidance:

  • replicaCount must be >= 1
  • minimumPeersQuorum should generally be <= replicaCount (otherwise the cluster may have difficulty reaching the desired quorum during bootstrap or failures)
  • A common starting point is replicaCount=3 with writeQuorum=2 and readQuorum=2

⚠️ Note: changing this value should be done with care, as it affects consistency and failure tolerance across the cluster.

func (*ClusterConfig) WithRoles

func (x *ClusterConfig) WithRoles(roles ...string) *ClusterConfig

WithRoles sets the roles advertised by this node.

A role is a label/metadata used by the cluster to define a node’s responsibilities (e.g., "web", "entity", "projection"). Not all nodes need to run the same workloads—roles let you dedicate nodes to specific purposes such as the web front-end, data access layer, or background processing.

In practice, nodes with the "entity" role run actors/services such as persistent entities, while nodes with the "projection" role run read-side projections. This lets you scale parts of your application independently and optimize resource usage.

Once roles are set, you can use SpawnOn("<role>") to spawn an actor on a node that advertises that role.

This call replaces any previously configured roles. Duplicates are de-duplicated; order is not meaningful

func (*ClusterConfig) WithShutdownTimeout

func (x *ClusterConfig) WithShutdownTimeout(timeout time.Duration) *ClusterConfig

WithShutdownTimeout sets the timeout for graceful cluster shutdown.

This timeout determines the maximum duration allowed for the cluster to shut down gracefully. It should be less than or proportional to the actor's shutdown timeout to ensure a clean shutdown process. If the shutdown process exceeds this duration, it may be forcibly terminated.

Example:

cfg := NewClusterConfig().WithShutdownTimeout(1 * time.Minute)

Returns the updated ClusterConfig instance for chaining.

func (*ClusterConfig) WithTableSize

func (x *ClusterConfig) WithTableSize(size uint64) *ClusterConfig

WithTableSize sets the key/value in-memory storage size The default values is 20MB

func (*ClusterConfig) WithWriteQuorum

func (x *ClusterConfig) WithWriteQuorum(count uint32) *ClusterConfig

WithWriteQuorum sets the write quorum for quorum-replicated cluster writes.

Write quorum is the minimum number of replicas that must acknowledge a write before the operation is considered successful. Higher values generally improve consistency and fault tolerance, but increase write latency and fan-out.

Relationship to replication:

  • replicaCount controls how many replicas exist for each partition.
  • writeQuorum controls how many of those replicas must confirm a write.

Consistency guidance (avoid stale reads):

readQuorum + writeQuorum > replicaCount

Typical configurations:

  • replicaCount=1: writeQuorum=1, readQuorum=1
  • replicaCount=3: writeQuorum=2, readQuorum=2 (common “majority” choice)

Notes / constraints:

  • writeQuorum must be >= 1 (validated)
  • writeQuorum should be <= replicaCount (not currently validated here; if larger, writes may never reach quorum)

⚠️ This is an advanced knob; the defaults are sufficient for most deployments.

func (*ClusterConfig) WithWriteTimeout

func (x *ClusterConfig) WithWriteTimeout(timeout time.Duration) *ClusterConfig

WithWriteTimeout sets the write timeout for cluster write operations.

This timeout specifies the maximum duration allowed for a write operation to complete before it is considered failed. If a write operation exceeds this duration, it will be aborted and an error will be returned. Adjust this value based on your cluster's expected workload and network conditions to balance responsiveness and reliability.

Example:

cfg := NewClusterConfig().WithWriteTimeout(2 * time.Second)

Returns the updated ClusterConfig instance for chaining.

type ClusterSingletonOption

type ClusterSingletonOption func(*clusterSingletonConfig)

ClusterSingletonOption defines a function type for configuring cluster singleton actors.

func WithSingletonRole

func WithSingletonRole(role string) ClusterSingletonOption

WithSingletonRole pins the singleton to cluster members that advertise the specified role.

When a role is provided, the actor system picks the oldest node in the cluster that reports the role and spawns (or relocates) the singleton there. Nodes without the role will never host the singleton; when no matching members exist, `SpawnSingleton` returns an error.

Passing the empty string is a no-op and leaves the singleton eligible for placement on the overall oldest cluster member.

Example:

if err := system.SpawnSingleton(
	ctx,
	"scheduler",
	NewSchedulerActor(),
	WithSingletonRole("control-plane"),
); err != nil {
	return err
}

func WithSingletonSpawnRetries

func WithSingletonSpawnRetries(retries int) ClusterSingletonOption

WithSingletonSpawnRetries sets the maximum number of `SpawnSingleton` attempts before giving up.

Notes:

  • Values <= 0 are ignored and the existing/default retry count is kept.
  • Combined with `WithSingletonSpawnWaitInterval`, this controls how long `SpawnSingleton` will keep retrying (roughly `tries * interval`), up to the overall timeout set by `WithSingletonSpawnTimeout`.
  • Default: 5.

func WithSingletonSpawnTimeout

func WithSingletonSpawnTimeout(timeout time.Duration) ClusterSingletonOption

WithSingletonSpawnTimeout sets the maximum amount of time `SpawnSingleton` will spend retrying before giving up.

Notes:

  • Values <= 0 are ignored and the existing/default timeout is kept.
  • This timeout is an overall upper bound; retry attempts (see `WithSingletonSpawnWaitInterval` and `WithSingletonSpawnRetries`) may cause `SpawnSingleton` to return earlier if retries are exhausted.
  • Default: 30 seconds.

func WithSingletonSpawnWaitInterval

func WithSingletonSpawnWaitInterval(interval time.Duration) ClusterSingletonOption

WithSingletonSpawnWaitInterval sets the base delay between `SpawnSingleton` retry attempts.

Notes:

  • Values <= 0 are ignored and the existing/default interval is kept.
  • Used together with `WithSingletonSpawnRetries` to define the retry budget. Approximate retry window: `tries * interval` (best-effort; scheduling can add jitter).
  • Default: 500 milliseconds.

type Context

type Context struct {
	// contains filtered or unexported fields
}

Context provides an environment for an actor when it is about to start.

It embeds the standard context.Context interface and grants access to the ActorSystem that manages the actor's lifecycle.

Use Context to:

  • Access the actor's system-wide services and dependencies.
  • Retrieve deadlines, cancellation signals, and request-scoped values through the embedded context.Context.

A Context is guaranteed to be available during the actor’s initialization phase and should be treated as immutable.

func (*Context) ActorName

func (x *Context) ActorName() string

ActorName returns the name of the actor associated with this Context.

The actor name is a unique string within the actor system, typically used for identification, logging, monitoring, and message routing purposes.

func (*Context) ActorSystem

func (x *Context) ActorSystem() ActorSystem

ActorSystem returns the ActorSystem that manages the actor.

It provides access to system-level services, configuration, and infrastructure components that the actor may interact with during its lifecycle.

func (*Context) Context

func (x *Context) Context() context.Context

Context returns the underlying context associated with the actor's initialization.

It carries deadlines, cancellation signals, and request-scoped values. This method allows direct access to standard context operations.

func (*Context) Dependencies

func (x *Context) Dependencies() []extension.Dependency

Dependencies returns a slice containing all dependencies currently registered within the PID's local context.

These dependencies are typically injected at actor initialization (via SpawnOptions) and made accessible during the actor's lifecycle. They can include services, clients, or any resources that the actor requires to operate.

This method is useful for diagnostic tools, dynamic inspection, or cases where an actor needs to introspect its environment.

Returns: A slice of Dependency instances associated with this PID.

func (*Context) Dependency

func (x *Context) Dependency(dependencyID string) extension.Dependency

Dependency retrieves a specific dependency from the PID's local context by its unique ID.

This allows actors to access resources or services that were injected at initialization directly from the Context. Dependencies are typically registered via SpawnOptions when spawning an actor.

Example:

dbClient := x.Dependency("database").(DatabaseClient)

Parameters:

  • dependencyID: A unique string identifier used when the dependency was registered.

Returns:

  • extension.Dependency: The corresponding dependency if found, or nil otherwise.

func (*Context) Extension

func (x *Context) Extension(extensionID string) extension.Extension

Extension retrieves a specific extension registered in the ActorSystem by its unique ID.

This allows actors to access shared functionality injected into the system, such as event sourcing, metrics, tracing, or custom application services, directly from the Context.

Example:

logger := x.Extension("extensionID").(MyExtension)

Parameters:

  • extensionID: A unique string identifier used when the extension was registered.

Returns:

  • extension.Extension: The corresponding extension if found, or nil otherwise.

func (*Context) Extensions

func (x *Context) Extensions() []extension.Extension

Extensions returns a slice of all extensions registered within the ActorSystem associated with the Context.

This allows system-level introspection or iteration over all available extensions. It can be useful for message processing.

Returns:

  • []extension.Extension: All registered extensions in the ActorSystem.

func (*Context) Logger

func (x *Context) Logger() log.Logger

Logger returns the logger associated with the ActorSystem.

This logger is typically configured at the system level and can be used for structured logging, diagnostics, and monitoring throughout the actor's lifecycle.

Example:

logger := x.Logger()
logger.Info("Actor started", "actorName", x.ActorName())

type Deadletter

type Deadletter struct {
	// contains filtered or unexported fields
}

Deadletter defines the deadletter event

func NewDeadletter

func NewDeadletter(sender, receiver string, message any, sendTime time.Time, reason string) *Deadletter

NewDeadletter creates a new Deadletter event.

func (*Deadletter) Message

func (d *Deadletter) Message() any

Message returns the original message that could not be delivered.

func (*Deadletter) Reason

func (d *Deadletter) Reason() string

Reason returns the reason the message was dead-lettered.

func (*Deadletter) Receiver

func (d *Deadletter) Receiver() string

Receiver returns the receiver's address.

func (*Deadletter) SendTime

func (d *Deadletter) SendTime() time.Time

SendTime returns the time the message was sent.

func (*Deadletter) Sender

func (d *Deadletter) Sender() string

Sender returns the sender's address.

type EvictionPolicy

type EvictionPolicy int

EvictionPolicy defines a strategy for passivating (deactivating) actors based on their usage patterns. It is used to manage memory or resource constraints by limiting the number of concurrently active actors.

const (
	// LRU (Least Recently Used) passivates actors that have not been accessed for the longest time
	// when the number of active actors exceeds the specified limit. This policy is effective
	// for scenarios where older, unused actors are likely to remain idle.
	LRU EvictionPolicy = iota

	// LFU (Least Frequently Used) passivates actors that have been used the least number of times
	// when the number of active actors exceeds the specified limit. This policy is suitable
	// when you want to retain actors that are consistently in use, even if they were accessed
	// a while ago.
	LFU

	// MRU (Most Recently Used) passivates actors that were accessed most recently
	// when the number of active actors exceeds the specified limit. While less common
	// for general resource management, MRU can be useful in specific scenarios
	// where fresh data is prioritized, and older, less volatile data can be evicted.
	MRU
)

func (EvictionPolicy) String

func (x EvictionPolicy) String() string

String returns the string representation of the EvictionPolicy (e.g., "LRU", "LFU", "MRU"). This method makes EvictionPolicy compatible with the fmt.Stringer interface, allowing for easy printing and logging.

type EvictionStrategy

type EvictionStrategy struct {
	// contains filtered or unexported fields
}

EvictionStrategy implements a system-configurable actor passivation policy, limiting the number of active actors based on their usage patterns.

An EvictionStrategy combines a chosen EvictionPolicy (LRU, LFU, or MRU) with a defined limit and a percentage of actors to passivate.

func NewEvictionStrategy

func NewEvictionStrategy(limit uint64, policy EvictionPolicy, percentage int) (*EvictionStrategy, error)

NewEvictionStrategy constructs and initializes a new EvictionStrategy instance. This function acts as a factory for creating valid eviction strategies.

Parameters:

  • limit: The maximum number of actors that can remain active. Once this limit is exceeded, the passivation process begins according to the chosen policy. Must be greater than zero.
  • policy: The EvictionPolicy to apply (LRU, LFU, or MRU). This determines which actors are selected for passivation.
  • percentage: An integer representing the percentage of actors to passivate when the limit is reached. This value will be clamped between 0 and 100, inclusive.

Returns:

  • *EvictionStrategy: A pointer to the newly initialized EvictionStrategy instance.
  • error: An error if the provided 'limit' is zero or if the 'policy' is invalid.

func (*EvictionStrategy) Limit

func (x *EvictionStrategy) Limit() uint64

Limit returns the maximum number of active actors allowed before passivation is triggered.

func (*EvictionStrategy) Percentage

func (x *EvictionStrategy) Percentage() int

Percentage returns the percentage of actors that should be passivated when the system exceeds the defined active actor limit. This value is guaranteed to be between 0 and 100, inclusive.

func (*EvictionStrategy) Policy

func (x *EvictionStrategy) Policy() EvictionPolicy

Policy returns the specific EvictionPolicy (LRU, LFU, or MRU) configured for this strategy instance.

func (*EvictionStrategy) String

func (x *EvictionStrategy) String() string

String returns a human-readable summary of the EvictionStrategy configuration. It satisfies both the fmt.Stringer and SystemStrategy interfaces.

type FuncActor

type FuncActor struct {
	// contains filtered or unexported fields
}

FuncActor is an actor that only handles messages

func (*FuncActor) PostStop

func (x *FuncActor) PostStop(ctx *Context) error

PostStop is executed when the actor is shutting down.

func (*FuncActor) PreStart

func (x *FuncActor) PreStart(ctx *Context) error

PreStart pre-starts the actor.

func (*FuncActor) Receive

func (x *FuncActor) Receive(ctx *ReceiveContext)

Receive processes any message dropped into the actor mailbox.

type FuncOption

type FuncOption interface {
	// Apply sets the Option value of a config.
	Apply(actor *funcConfig)
}

FuncOption is the interface that applies a SpawnHook option.

func WithFuncMailbox

func WithFuncMailbox(mailbox Mailbox) FuncOption

WithFuncMailbox sets the mailbox to use when starting the func-based actor

func WithPostStop

func WithPostStop(fn PostStopFunc) FuncOption

WithPostStop defines the PostStopFunc hook

func WithPreStart

func WithPreStart(fn PreStartFunc) FuncOption

WithPreStart defines the PreStartFunc hook

type GetRoutees

type GetRoutees struct{}

GetRoutees requests a snapshot of the router's current routee set.

Usage:

  • Send with Ask to a router PID to retrieve the list of active routee names.
  • The router replies with a Routees message. If no routees are available, the list is empty.

Scope and consistency:

  • Routers are local to an actor system; this message is only valid within the same process/system.
  • The returned list is a point-in-time snapshot.

Resolving names:

  • Use ActorSystem.ActorOf to resolve a returned name to a local PID/reference.
  • Names are unique within the router's namespace.

type Grain

type Grain interface {
	// OnActivate is called when the grain is loaded into memory.
	// Use this to initialize state or resources.
	//
	// Arguments:
	//   - ctx: context for cancellation and deadlines.
	//   - props: grain properties and system-level references.
	// Returns:
	//   - error: non-nil to indicate activation failure (grain will not be activated).
	OnActivate(ctx context.Context, props *GrainProps) error

	// OnReceive is called when the grain receives a message.
	//
	// Arguments:
	//   - ctx: GrainContext containing the message, sender, grain identity, and system references.
	// Behavior:
	//   - Processes the message and updates grain state as needed.
	//   - Always respect cancellation and deadlines via the context in GrainContext.
	//   - Do not retain references to the GrainContext or its fields beyond the method scope.
	OnReceive(ctx *GrainContext)

	// OnDeactivate is called before the grain is removed from memory.
	// Use this to persist state and release resources.
	//
	// Arguments:
	//   - ctx: context for cancellation and deadlines.
	//   - props: grain properties and system-level references.
	// Returns:
	//   - error: non-nil to indicate deactivation failure (system may log or handle the failure).
	OnDeactivate(ctx context.Context, props *GrainProps) error
}

Grain defines the contract for grains (virtual actors) in the goakt actor system.

A Grain is a lightweight, virtual actor that encapsulates state and behavior, managed by goakt. Grains are automatically activated and deactivated by the system. Each grain instance is uniquely identified and processes messages sequentially, ensuring single-threaded execution and simplifying state management.

Implementations must be safe for single-threaded access; concurrent calls are not made to a single grain instance.

Methods:

  • OnActivate: Called when the grain is loaded into memory. Use this to initialize state or resources. Arguments:

  • ctx: context for cancellation and deadlines.

  • props: grain properties and system-level references. Returns:

  • error: non-nil to indicate activation failure (grain will not be activated).

  • OnReceive: Handles an incoming message. Only one call is active at a time per grain instance. Arguments:

  • ctx: GrainContext containing the message, sender, grain identity, and system references. Behavior:

  • Processes the message and updates grain state as needed.

  • Always respect cancellation and deadlines via the context in GrainContext.

  • Do not retain references to the GrainContext or its fields beyond the method scope.

  • OnDeactivate: Called before the grain is removed from memory. Use this to persist state and release resources. Arguments:

  • ctx: context for cancellation and deadlines.

  • props: grain properties and system-level references. Returns:

  • error: non-nil to indicate deactivation failure (system may log or handle the failure).

type GrainContext

type GrainContext struct {
	// contains filtered or unexported fields
}

GrainContext provides contextual information and operations available to an actor when it is processing a message.

It typically carries the incoming message, the grain's identity, the actor system managing the grain, and methods to respond to messages.

Example usage:

func (g *MyGrain) OnReceive(ctx *actor.GrainContext) {
    msg := ctx.Message()
    switch msg := msg.(type) {
    case *MyMessage:
        ctx.Respond(&MyResponse{})
    }
}

func (*GrainContext) ActorSystem

func (gctx *GrainContext) ActorSystem() ActorSystem

ActorSystem returns the ActorSystem that manages the Grain.

It provides access to system-level services, configuration, and infrastructure components that the actor may interact with during its lifecycle.

func (*GrainContext) AskActor

func (gctx *GrainContext) AskActor(actorName string, message any, timeout time.Duration) (any, error)

AskActor sends a message to another actor by name and waits for a response.

This method performs a synchronous request (Ask pattern) to the specified actor, using the provided message and timeout. It returns the response message or an error if the operation times out or fails.

Example:

resp, err := ctx.AskActor("my-actor", &MyRequest{}, 2*time.Second)
if err != nil {
    // handle error
}

func (*GrainContext) AskGrain

func (gctx *GrainContext) AskGrain(to *GrainIdentity, message any, timeout time.Duration) (any, error)

AskGrain sends a message to another Grain and waits for a response.

This method performs a synchronous request (Ask pattern) to the specified Grain, using the provided message and timeout. It returns the response message or an error if the operation times out or fails.

Example:

resp, err := ctx.AskGrain(otherGrainID, &MyRequest{}, 2*time.Second)
if err != nil {
    // handle error
}

func (*GrainContext) Context

func (gctx *GrainContext) Context() context.Context

Context returns the underlying context associated with the GrainContext.

It carries deadlines, cancellation signals, and request-scoped values. This method allows direct access to standard context operations.

func (*GrainContext) Dependencies

func (gctx *GrainContext) Dependencies() []extension.Dependency

Dependencies returns a slice containing all dependencies currently registered within the Grain's local context.

These dependencies are typically injected at grain initialization (via GrainOptions) and can include services, repositories, or other components the grain relies on.

Returns:

  • []extension.Dependency: All registered dependencies in the Grain's context.

func (*GrainContext) Dependency

func (gctx *GrainContext) Dependency(dependencyID string) extension.Dependency

Dependency retrieves a specific dependency registered in the Grain's context by its unique ID.

This allows grains to access shared functionality injected into their context, such as services, repositories, or application components.

Example:

db := x.Dependency("database").(DatabaseService)

Parameters:

  • dependencyID: A unique string identifier used when the dependency was registered.

Returns:

  • extension.Dependency: The corresponding dependency if found, or nil otherwise.

func (*GrainContext) Err

func (gctx *GrainContext) Err(err error)

Err reports an error encountered during message handling without panicking.

This method should be used within a message handler to indicate that the message processing failed due to the provided error. It is the preferred way to signal failure in the message handler, as it avoids crashing the actor or goroutine.

Use Err instead of panicking to enable graceful error handling and reporting, particularly when responding to messages or for observability purposes.

Note: Even if the message handler does panic, the framework will catch it and report it as an error. However, using Err allows for more controlled error.

Example usage:

func (a *MyActor) OnReceive(ctx actor.Context) {
    switch msg := ctx.Message().(type) {
    case DoSomething:
        if err := doWork(); err != nil {
            ctx.Err(err) // fail gracefully
            return
        }
        ctx.NoErr()
    }
}

func (*GrainContext) Extension

func (gctx *GrainContext) Extension(extensionID string) extension.Extension

Extension retrieves a specific extension registered in the ActorSystem by its unique ID.

This allows grains to access shared functionality injected into the system, such as event sourcing, metrics, tracing, or custom application services, directly from the GrainContext.

Example:

logger := x.Extension("extensionID").(MyExtension)

Parameters:

  • extensionID: A unique string identifier used when the extension was registered.

Returns:

  • extension.Extension: The corresponding extension if found, or nil otherwise.

func (*GrainContext) Extensions

func (gctx *GrainContext) Extensions() []extension.Extension

Extensions returns a slice of all extensions registered within the ActorSystem associated with the GrainContext.

This allows system-level introspection or iteration over all available extensions.

It can be useful for message processing.

Returns:

  • []extension.Extension: All registered extensions in the ActorSystem.

func (*GrainContext) GrainIdentity

func (gctx *GrainContext) GrainIdentity(name string, factory GrainFactory, opts ...GrainOption) (*GrainIdentity, error)

GrainIdentity creates or retrieves a unique identity for a Grain instance.

This method is used to generate a GrainIdentity for a given grain name and factory, optionally applying additional GrainOptions. It is typically used when you need to reference or interact with another grain from within a grain's logic.

Arguments:

  • name: The unique name of the grain type or instance.
  • factory: The GrainFactory used to instantiate the grain if it does not already exist.
  • opts: Optional GrainOption values to customize grain creation or configuration.

Returns:

  • *GrainIdentity: The unique identity representing the target grain.
  • error: Non-nil if the identity could not be created or resolved.

Example:

id, err := ctx.GrainIdentity("my-grain", MyGrainFactory)
if err != nil {
    // handle error
}
err = ctx.TellGrain(id, &MyMessage{})

func (*GrainContext) Message

func (gctx *GrainContext) Message() any

Message returns the message currently being processed by the Grain.

This method provides access to the incoming protobuf message that triggered the current Grain invocation. Use this to inspect, type-assert, or handle the message within your Grain's OnReceive method.

Example:

func (g *MyGrain) OnReceive(ctx *GrainContext) {
    switch msg := ctx.Message().(type) {
    case *MyRequest:
        // handle MyRequest
    default:
        ctx.Unhandled()
    }
}

func (*GrainContext) NoErr

func (gctx *GrainContext) NoErr()

NoErr marks the successful completion of a message handler without any error.

This method is typically used in actor-style messaging contexts to explicitly indicate that the message was processed successfully. It is especially useful when:

  • Handling fire-and-forget (Tell-like) messages where no response is expected.
  • Handling Ask-like messages where no error and response need to be returned.

Calling NoErr ensures that the framework does not interpret the absence of an explicit error as a failure or require a default response.

Example usage:

func (a *MyActor) OnReceive(ctx actor.Context) {
    switch msg := ctx.Message().(type) {
    case DoSomething:
        // Handle logic...
        ctx.NoErr() // explicitly declare success
    }
}

func (*GrainContext) PipeToActor

func (gctx *GrainContext) PipeToActor(actorName string, task func() (any, error), opts ...PipeOption) error

PipeToActor runs a task asynchronously and delivers its result to the named actor.

While the task is executing, the calling Grain can continue processing other messages. On success, the task result is sent to the target actor as a normal message. On failure, the error is handled according to PipeTo semantics (e.g., deadletter).

The task runs outside the Grain's message loop. Avoid mutating Grain state inside the task; communicate results via the returned message instead.

Use PipeOptions (e.g., WithTimeout, WithCircuitBreaker) to control execution.

func (*GrainContext) PipeToGrain

func (gctx *GrainContext) PipeToGrain(to *GrainIdentity, task func() (any, error), opts ...PipeOption) error

PipeToGrain runs a task asynchronously and delivers its result to the target Grain.

While the task is executing, the calling Grain can continue processing other messages. On success, the task result is sent to the target Grain as a normal message. On failure, a StatusFailure message containing the error is delivered to the target Grain.

The task runs outside the Grain's message loop. Avoid mutating Grain state inside the task; communicate results via the returned message instead.

Use PipeOptions (e.g., WithTimeout, WithCircuitBreaker) to control execution.

Returns an error when the task is nil or the target identity is invalid.

func (*GrainContext) PipeToSelf

func (gctx *GrainContext) PipeToSelf(task func() (any, error), opts ...PipeOption) error

PipeToSelf runs a task asynchronously and delivers its result to this Grain.

While the task is executing, the calling Grain can continue processing other messages. On success, the task result is sent to this Grain as a normal message. On failure, a StatusFailure message containing the error is delivered to this Grain.

The task runs outside the Grain's message loop. Avoid mutating Grain state inside the task; communicate results via the returned message instead.

Use PipeOptions (e.g., WithTimeout, WithCircuitBreaker) to control execution.

func (*GrainContext) Response

func (gctx *GrainContext) Response(resp any)

Response sets the message response

func (*GrainContext) Self

func (gctx *GrainContext) Self() *GrainIdentity

Self returns the unique identifier of the Grain instance.

func (*GrainContext) TellActor

func (gctx *GrainContext) TellActor(actorName string, message any) error

TellActor sends a message to another actor by name without waiting for a response.

This method performs an asynchronous send (Tell pattern) to the specified actor. It returns an error if the message could not be delivered.

Example:

err := ctx.TellActor("my-actor", &MyNotification{})
if err != nil {
    // handle error
}

func (*GrainContext) TellGrain

func (gctx *GrainContext) TellGrain(to *GrainIdentity, message any) error

TellGrain sends a message to another Grain without waiting for a response.

This method performs an asynchronous send (Tell pattern) to the specified Grain. It returns an error if the message could not be delivered.

Example:

err := ctx.TellGrain(otherGrainID, &MyNotification{})
if err != nil {
    // handle error
}

func (*GrainContext) Unhandled

func (gctx *GrainContext) Unhandled()

Unhandled marks the currently received message as unhandled by the Grain.

This method should be invoked when the Grain does not define a handler for the message type it has received. Calling Unhandled informs the runtime that the message was not processed and allows the framework to respond accordingly.

This is typically used to log, track, or gracefully ignore unsupported messages without causing unexpected behavior.

If Unhandled is called, the caller of TellGrain or AskGrain will receive an ErrUnhandledMessage error as a response, signaling that the message could not be processed.

Example use case:

func (g *MyGrain) OnReceive(ctx *GrainContext) error {
    switch msg := ctx.Message().(type) {
    case *KnownMessage:
        // handle message
        return nil
    default:
        ctx.Unhandled()
        return nil
    }
}

type GrainFactory

type GrainFactory func(ctx context.Context) (Grain, error)

GrainFactory defines a function type responsible for creating new Grain instances.

This factory function is used by the goakt actor system to instantiate grains (virtual actors) on demand. It receives a context.Context, which can be used for cancellation, deadlines, and passing request-scoped values. The function must return a new Grain implementation and an error if instantiation fails.

Typical usage is to provide a GrainFactory when registering a grain type with the actor system.

Example:

func MyGrainFactory(ctx context.Context) (actor.Grain, error) {
    return &MyGrain{}, nil
}

The returned Grain must be safe for single-threaded access, as the system guarantees that only one message is processed at a time per grain instance.

type GrainIdentity

type GrainIdentity struct {
	// contains filtered or unexported fields
}

GrainIdentity uniquely identifies a grain (virtual actor) instance within the actor system.

It consists of:

  • kind: Fully qualified type name of the grain (derived via reflection).
  • name: Unique identifier for the grain instance.

GrainIdentity enables location-transparent routing, lifecycle management, and stable grain identity across distributed systems and restarts. They are immutable and safe for concurrent use.

func (*GrainIdentity) Equal

func (g *GrainIdentity) Equal(other *GrainIdentity) bool

Equal checks whether this GrainIdentity is equal to another.

Two GrainIDs are equal if both kind and name are identical. Returns false if the other is nil.

func (*GrainIdentity) Kind

func (g *GrainIdentity) Kind() string

Kind returns the fully qualified type name of the grain.

Used by the actor system for instantiation, factory lookups, and routing.

func (*GrainIdentity) Name

func (g *GrainIdentity) Name() string

Name returns the unique name of the grain instance.

It identifies this instance within its grain type and is used for routing and persistence.

func (*GrainIdentity) String

func (g *GrainIdentity) String() string

String returns the formatted string representation of the GrainIdentity as "kind:name".

Useful for logging, debugging, and human-readable configuration.

func (*GrainIdentity) Validate

func (g *GrainIdentity) Validate() error

Validate implements validation.Validator.

type GrainOption

type GrainOption func(config *grainConfig)

func WithActivationRole

func WithActivationRole(role string) GrainOption

WithActivationRole records a required node role for Grain placement.

In cluster mode, peers advertise roles via ClusterConfig.WithRoles (e.g. "projection", "payments", "api"). When role is used the Grain will only be activated on nodes that advertise the same role. If multiple nodes match, the placement strategy (RoundRobin, Random, etc.) is applied among those nodes. If clustering is disabled, this option is ignored and the Grain is activated locally.

If no node with the required role exists, activation returns an error. This prevents accidental placement on unsuitable nodes and protects Grains that depend on role-specific services or colocation.

Tip: omit WithActivationRole to allow placement on any node (or ensure all nodes advertise the role if you want it universal).

Parameters:

role — label a node must advertise (e.g. "projection", "payments").

Returns:

  • GrainOption that sets the role in the grainConfig.

func WithActivationStrategy

func WithActivationStrategy(strategy ActivationStrategy) GrainOption

WithActivationStrategy returns a GrainOption that sets the placement strategy to be used when activating a Grain in cluster mode.

This option determines how the actor system selects a target node for activating the Grain across the cluster. Valid strategies include RoundRobin, Random, and Local.

Note: This option only has an effect in a cluster-enabled actor system. If cluster mode is disabled, the placement strategy is ignored and the Grain will be activated locally.

Parameters:

  • strategy: A ActivationStrategy value specifying how to distribute the Grain.

Returns:

  • GrainOption that sets the activation strategy.

func WithGrainDeactivateAfter

func WithGrainDeactivateAfter(value time.Duration) GrainOption

WithGrainDeactivateAfter returns a GrainOption that sets the duration after which a grain will be deactivated if it remains idle. This helps manage resources by deactivating grains that are not in use, reducing memory usage and improving system performance.

Parameters:

  • value: the duration of inactivity after which the grain is deactivated (default is 2 minutes).

Returns:

  • GrainOption: a function that sets the deactivateAfter.

func WithGrainDependencies

func WithGrainDependencies(deps ...extension.Dependency) GrainOption

WithGrainDependencies returns a GrainOption that registers one or more dependencies for the grain. Dependencies are external services, resources, or components that the grain requires to operate. This option enables dependency injection, promoting loose coupling and easier testing.

Each dependency must implement the extension.Dependency interface and must have a unique ID. If a dependency with the same ID already exists, it will be overwritten.

Parameters:

  • deps: One or more extension.Dependency instances to associate with the grain.

Returns:

  • GrainOption: A function that registers the provided dependencies in the grain's configuration.

Example usage:

db := NewDatabaseDependency()
cache := NewCacheDependency()
cfg := newGrainConfig(WithGrainDependencies(db, cache))

func WithGrainDisableRelocation

func WithGrainDisableRelocation() GrainOption

WithGrainDisableRelocation returns a GrainOption that opts a Grain out of relocation in cluster mode.

In a clustered actor system, the runtime may relocate (rebalance) grains across nodes as membership/topology changes. When this option is enabled, the grain is pinned to its current host node and will not be moved automatically.

Important implications:

  • If the hosting node leaves or crashes, the grain instance is lost and will not be migrated elsewhere.
  • The grain can be re-created on another node by addressing it by its Grain ID, but any in-memory state is gone.
  • To recover state after node loss, persist grain state externally (e.g. DB, key/value store, event log).

Notes:

  • This option only has an effect when cluster mode is enabled.
  • Disabling relocation can be useful for grains that depend on node-local resources or require strict host affinity.

Returns:

  • GrainOption: an option that sets disableRelocation to true in the grain configuration.

func WithGrainInitMaxRetries

func WithGrainInitMaxRetries(value int) GrainOption

WithGrainInitMaxRetries returns a GrainOption that sets the maximum number of retries for grain initialization. This is useful for handling transient initialization errors by retrying the initialization process up to the specified number of times before giving up.

Parameters:

  • value: the maximum number of retries (default is 5).

Returns:

  • GrainOption: a function that sets the initMaxRetries field in grainConfig.

Usage example:

cfg := newGrainConfig(WithGrainInitMaxRetries(10))

func WithGrainInitTimeout

func WithGrainInitTimeout(value time.Duration) GrainOption

WithGrainInitTimeout returns a GrainOption that sets the timeout duration for grain initialization. If the grain does not initialize within this duration, initialization is considered failed and no further retries will be attempted.

Parameters:

  • value: the timeout duration (default is 1 second).

Returns:

  • GrainOption: a function that sets the initTimeout.

Usage example:

WithGrainInitTimeout(2 * time.Second)

func WithGrainMailboxCapacity

func WithGrainMailboxCapacity(capacity int64) GrainOption

WithGrainMailboxCapacity returns a GrainOption that configures the capacity of a grain's mailbox.

By default, a grain uses an unbounded mailbox. When a positive capacity is specified, the mailbox becomes bounded and can hold at most `capacity` messages at any given time.

In bounded mode:

  • Enqueue operations are non-blocking.
  • If the mailbox is full, enqueue attempts fail immediately and return ErrMailboxFull.
  • The capacity limit is enforced atomically and is safe under concurrent producers.

A capacity less than or equal to zero configures an unbounded mailbox, which never rejects enqueues due to capacity.

This option is typically used to provide backpressure and prevent unbounded memory growth for hot or slow-processing grains.

Parameters:

  • capacity: the maximum number of messages allowed in the mailbox. Values <= 0 configure an unbounded mailbox (default behavior).

Returns:

  • GrainOption: an option that sets the mailbox capacity on the grain configuration.

Example:

cfg := newGrainConfig(
    WithGrainMailboxCapacity(1024),
)

func WithLongLivedGrain

func WithLongLivedGrain() GrainOption

WithLongLivedGrain returns a GrainOption that configures the grain to never be deactivated due to inactivity. When this option is set, the grain will remain active in memory regardless of idle time, and the passivation timer is effectively disabled.

This is useful for grains that must always be available, such as those managing critical state, coordinating long-running workflows, or acting as singletons.

Note: Use this option judiciously, as long-lived grains consume system resources for their entire lifetime and are not subject to automatic passivation.

type GrainProps

type GrainProps struct {
	// contains filtered or unexported fields
}

GrainProps encapsulates configuration and metadata for a Grain (virtual actor) in the goakt actor system.

It holds the unique identity of the Grain and a reference to the ActorSystem that manages it. GrainProps is used internally to track and manage Grain instances, ensuring each Grain is uniquely addressable and associated with the correct actor system.

Fields:

  • identity: The unique identity of the Grain, used for addressing and routing.
  • actorSystem: The ActorSystem instance that owns and manages the Grain.

func (*GrainProps) ActorSystem

func (props *GrainProps) ActorSystem() ActorSystem

ActorSystem returns the ActorSystem instance associated with these Grain properties.

This provides access to the actor system that manages the lifecycle, messaging, and clustering for the Grain represented by these properties.

Returns:

  • ActorSystem: The actor system managing this Grain.

func (*GrainProps) Dependencies

func (props *GrainProps) Dependencies() []extension.Dependency

Dependencies returns the dependencies registered with this GrainProps.

Dependencies are external services, resources, or components that the Grain requires to operate. These are typically injected into the Grain at creation time, enabling loose coupling and easier testing.

Returns:

  • []extension.Dependency: A slice containing all dependencies associated with this GrainProps instance.

Example usage:

deps := grainProps.Dependencies()
for _, dep := range deps {
    // Use dep as needed
}

func (*GrainProps) Identity

func (props *GrainProps) Identity() *GrainIdentity

Identity returns the unique identity of the Grain associated with these properties.

The GrainIdentity is used to uniquely identify and address the Grain within the actor system. This is typically a composite of the Grain's type and instance key.

Returns:

  • *GrainIdentity: The identity object representing this Grain.

type Mailbox

type Mailbox interface {
	// Enqueue pushes a message into the mailbox. This returns an error
	// when the box is full
	Enqueue(msg *ReceiveContext) error
	// Dequeue fetches a message from the mailbox
	Dequeue() (msg *ReceiveContext)
	// IsEmpty returns true when the mailbox is empty
	IsEmpty() bool
	// Len returns the size of the mailbox
	Len() int64
	// Dispose will dispose of this queue and free any blocked threads
	// in the Enqueue and/or Dequeue methods.
	Dispose()
}

Mailbox defines the actor mailbox. Any implementation should be a thread-safe FIFO

type Metric

type Metric struct {
	// contains filtered or unexported fields
}

Metric is an immutable snapshot of node-level statistics returned by ActorSystem.Metric.

The values are gathered locally (not aggregated across a cluster) and represent a single point in time. Use it to drive dashboards, health checks, or lightweight telemetry without wiring up OpenTelemetry. Callers typically fetch a Metric just before inspecting fields; values will not update after the struct is created.

func (Metric) ActorsCount

func (m Metric) ActorsCount() int64

ActorsCount returns the total number of actors either in the system or the total number of child actor given a specific PID

func (Metric) DeadlettersCount

func (m Metric) DeadlettersCount() int64

DeadlettersCount returns the total number of deadletter

func (Metric) MemoryAvailable

func (m Metric) MemoryAvailable() uint64

MemoryAvailable returns the free memory of the system in bytes

func (Metric) MemorySize

func (m Metric) MemorySize() uint64

MemorySize returns the total memory of the system in bytes

func (Metric) MemoryUsed

func (m Metric) MemoryUsed() uint64

MemoryUsed returns the used memory of the system in bytes

func (Metric) Uptime

func (m Metric) Uptime() int64

Uptime returns the number of seconds since the actor/system started

type NoMessage

type NoMessage struct{}

NoMessage is used to indicate that no message was sent

type NodeJoined

type NodeJoined struct {
	// contains filtered or unexported fields
}

NodeJoined defines the node joined event

func NewNodeJoined

func NewNodeJoined(address string, timestamp time.Time) *NodeJoined

NewNodeJoined creates a new NodeJoined event.

func (*NodeJoined) Address

func (n *NodeJoined) Address() string

Address returns the node's address.

func (*NodeJoined) Timestamp

func (n *NodeJoined) Timestamp() time.Time

Timestamp returns the time the node joined.

type NodeLeft

type NodeLeft struct {
	// contains filtered or unexported fields
}

NodeLeft defines the node left event

func NewNodeLeft

func NewNodeLeft(address string, timestamp time.Time) *NodeLeft

NewNodeLeft creates a new NodeLeft event.

func (*NodeLeft) Address

func (n *NodeLeft) Address() string

Address returns the node's address.

func (*NodeLeft) Timestamp

func (n *NodeLeft) Timestamp() time.Time

Timestamp returns the time the node left.

type Option

type Option interface {
	// Apply sets the Option value of a config.
	Apply(sys *actorSystem)
}

Option is the interface that applies a configuration option.

func WithActorInitMaxRetries

func WithActorInitMaxRetries(value int) Option

WithActorInitMaxRetries sets the number of times to retry an actor init process

func WithActorInitTimeout

func WithActorInitTimeout(timeout time.Duration) Option

WithActorInitTimeout sets how long in seconds an actor start timeout

func WithCluster

func WithCluster(config *ClusterConfig) Option

WithCluster enables the cluster mode

func WithCoordinatedShutdown

func WithCoordinatedShutdown(hooks ...ShutdownHook) Option

WithCoordinatedShutdown registers internal and user-defined tasks to be executed during the shutdown process. The defined tasks will be executed in the same order of insertion. Any failure will halt the shutdown process.

func WithDefaultSupervisor

func WithDefaultSupervisor(supervisor *supervisor.Supervisor) Option

WithDefaultSupervisor configures the ActorSystem-wide supervisor used for actors that are spawned without an explicit supervisor.

This option lets you enforce a consistent supervision strategy across the system. It is applied at ActorSystem construction time and becomes the fallback supervisor for any actor whose spawn configuration does not specify a supervisor.

Precedence:

  • If an actor is spawned with an explicitly configured supervisor, that supervisor is used.
  • Otherwise, the ActorSystem's default supervisor (configured by this option) is used.
  • If supervisor is nil, this option makes no change and the built-in default supervisor remains in effect.

func WithEvictionStrategy

func WithEvictionStrategy(strategy *EvictionStrategy, interval time.Duration) Option

WithEvictionStrategy configures the actor system to use the specified eviction strategy and sets the interval at which the eviction engine runs.

The eviction strategy defines how the system passivates (deactivates) actors when the number of active actors exceeds the configured limit. It determines which actors to remove from memory based on their usage patterns (e.g., LRU, LFU, MRU).

The `interval` parameter controls how frequently the eviction engine evaluates and passivates actors. A shorter interval results in more aggressive cleanup, while a longer interval conserves resources but may retain inactive actors longer.

If the provided strategy is nil, no changes are applied.

Example:

strategy := NewEvictionStrategy(1000, LRU)
system := NewActorSystem(WithEvictionStrategy(strategy, 5*time.Second))

func WithExtensions

func WithExtensions(extensions ...extension.Extension) Option

WithExtensions registers one or more Extensions with the ActorSystem during initialization.

This function allows you to inject pluggable components that implement the Extension interface, enabling additional functionality such as event sourcing, metrics collection, or tracing.

Registered extensions will be accessible from any actor's message context, allowing them to be used transparently across the system.

Example:

system := NewActorSystem(
    WithExtensions(
        NewEventSourcingExtension(),
        NewMetricsExtension(),
    ),
)

Each extension must have a unique ID to avoid collisions in the system registry.

Parameters:

  • extensions: One or more Extension instances to be registered.

Returns:

  • Option: A configuration option used when constructing the ActorSystem.

func WithLogger

func WithLogger(logger log.Logger) Option

WithLogger sets the actor system custom logger. Pass nil to disable logging (equivalent to WithLoggingDisabled).

func WithLoggingDisabled

func WithLoggingDisabled() Option

WithLoggingDisabled disables all logging for the actor system. Uses a no-op logger that discards all messages with minimal overhead. Recommended for production when logging is not needed, or for benchmarks.

func WithMetrics

func WithMetrics() Option

WithMetrics enables OpenTelemetry metrics collection for the ActorSystem.

This option wires goakt's internal metric provider so that runtime metrics for the actor system can be recorded. Metrics are not exported unless the OpenTelemetry metrics SDK is initialized and a MeterProvider/exporter is configured.

Initialize the SDK before starting the system:

https://opentelemetry.io/docs/languages/go/getting-started/#initialize-the-opentelemetry-sdk

Example:

system := NewActorSystem(
    WithLogger(logger),
    WithMetrics(),
)

Returns an Option that enables metrics instrumentation during system construction.

func WithPartitionHasher

func WithPartitionHasher(hasher hash.Hasher) Option

WithPartitionHasher sets the partition hasher.

func WithPubSub

func WithPubSub() Option

WithPubSub enables the pub-sub (publish-subscribe) mode for the actor system.

In pub-sub mode, actors can subscribe to one or more named topics and receive messages that are published to those topics. This is useful for implementing decoupled communication patterns where the publisher does not need to know the identity or number of subscribers.

When this option is applied during system initialization, internal mechanisms for managing topic subscriptions and broadcasting messages to subscribers will be activated.

Example:

system := NewActorSystem(WithPubSub())

Returns an Option that configures the actor system accordingly.

func WithRemote

func WithRemote(config *remote.Config) Option

WithRemote enables remoting on the actor system

func WithShutdownTimeout

func WithShutdownTimeout(timeout time.Duration) Option

WithShutdownTimeout sets the shutdown timeout The timeout needs to be considerable reasonable based upon the total number of actors the system will probably needs. The graceful timeout is shared amongst all actors and children actors created in the system to graceful shutdown via a cancellation context.

func WithTLS

func WithTLS(info *tls.Info) Option

WithTLS configures TLS settings for both the Server and Client. Ensure that both the Server and Client are configured with the same root Certificate Authority (CA) to enable successful handshake and mutual authentication.

In cluster mode, all nodesMap must share the same root CA to establish secure communication and complete handshakes successfully.

func WithoutRelocation

func WithoutRelocation() Option

WithoutRelocation returns an Option that disables actor relocation in the cluster.

When this option is set, the actor system will not attempt to relocate actors from a node that leaves the cluster. This applies even if individual actors are configured to support relocation.

Instead of migrating to a healthy node, actors hosted on the departing node will be terminated, and any associated in-memory state will be lost permanently.

This is useful in scenarios where actor state is ephemeral, externally managed, or where graceful degradation is preferred over relocation overhead.

type OptionFunc

type OptionFunc func(*actorSystem)

OptionFunc implements the Option interface.

func (OptionFunc) Apply

func (f OptionFunc) Apply(c *actorSystem)

type PID

type PID struct {
	// contains filtered or unexported fields
}

PID is the sole actor reference in GoAkt. It is location-transparent: a PID may represent either a live local actor or a lightweight handle for an actor on a remote node. Use IsLocal / IsRemote to distinguish.

Location-transparent operations (work for both local and remote PIDs)

  • Identity: Name, ID, Address, Kind, Role, Equals
  • State queries: IsLocal, IsRunning, IsSuspended, IsSingleton, IsRelocatable, IsStopping
  • Messaging: Tell, Ask, BatchTell, BatchAsk
  • Remote helpers: RemoteLookup, RemoteStop, RemoteReSpawn

Local-only operations (return ErrNotLocal for remote PIDs)

Lifecycle: Stop, Restart, Shutdown, SpawnChild, Reinstate, ReinstateNamed Tree navigation: Child, Children, ChildrenCount, Parent Watch: Watch, UnWatch Name-based messaging: SendAsync, SendSync, PipeTo, PipeToName, DiscoverActor

Query methods (safe for remote, return zero values)

Actor, ProcessedCount, RestartCount, LatestProcessedDuration, LatestActivityTime, StashSize, PassivationStrategy, Dependencies, Dependency, Uptime, Metric

Nil for remote PIDs

ActorSystem returns nil for remote PIDs — always guard with IsLocal before use.

func (*PID) Actor

func (pid *PID) Actor() Actor

Actor returns the underlying Actor implementation. Returns nil for remote PIDs.

func (*PID) ActorSystem

func (pid *PID) ActorSystem() ActorSystem

ActorSystem returns the actor system this PID belongs to. Returns nil for remote PIDs — check pid.IsLocal() before use.

func (*PID) Ask

func (pid *PID) Ask(ctx context.Context, to *PID, message any, timeout time.Duration) (response any, err error)

Ask sends a synchronous message to to and waits for a response. It blocks until a response is received, the context is cancelled, or timeout elapses.

func (*PID) BatchAsk

func (pid *PID) BatchAsk(ctx context.Context, to *PID, messages []any, timeout time.Duration) (responses chan any, err error)

BatchAsk sends multiple messages synchronously to the given PID and returns responses in the same order. When to is remote, a single RemoteBatchAsk RPC is used for efficiency. When to is local, each message is delivered via Ask; the call blocks until all responses are received or any Ask fails.

func (*PID) BatchTell

func (pid *PID) BatchTell(ctx context.Context, to *PID, messages ...any) error

BatchTell sends multiple messages asynchronously to the given PID, in order. When to is remote, a single RemoteBatchTell RPC is used for efficiency. When to is local, each message is delivered via Tell; processing order is guaranteed.

func (*PID) Child

func (pid *PID) Child(name string) (*PID, error)

Child returns the running child PID with the given name. Returns ErrNotLocal for remote PIDs (or when remoting is nil), ErrDead if this actor is not running, or ErrActorNotFound when no such child exists or the child is stopped.

func (*PID) Children

func (pid *PID) Children() []*PID

Children returns all direct child PIDs that are currently running.

func (*PID) ChildrenCount

func (pid *PID) ChildrenCount() int

ChildrenCount returns the number of direct children currently running.

func (*PID) Dependencies

func (pid *PID) Dependencies() []extension.Dependency

Dependencies returns all dependencies registered with this actor. Dependencies are injected at spawn time via SpawnOptions and remain accessible for the lifetime of the actor.

func (*PID) Dependency

func (pid *PID) Dependency(dependencyID string) extension.Dependency

Dependency returns the registered dependency with the given identifier, or nil if not found.

func (*PID) DiscoverActor

func (pid *PID) DiscoverActor(ctx context.Context, actorName string, timeout time.Duration) (*PID, error)

DiscoverActor locates a named actor across all active datacenters using parallel discovery. All datacenter endpoints are queried concurrently; the first successful result is returned and remaining queries are cancelled. Discovery is best-effort: a stale cache is used with a warning rather than failing hard. Returns ErrNotLocal for remote PIDs, ErrDead if not running, and ErrActorNotFound when the actor does not exist in any active datacenter.

func (*PID) Equals

func (pid *PID) Equals(to *PID) bool

Equals reports whether pid and to refer to the same actor.

func (*PID) ID

func (pid *PID) ID() string

ID returns the actor unique identifier, which is its canonical address string.

func (*PID) IsLocal

func (pid *PID) IsLocal() bool

IsLocal reports whether this PID represents an actor running in the local actor system.

Local PIDs have a live mailbox, a behavior stack, and a full actor-system reference. Use IsLocal to distinguish in-process actors from remote handles before performing operations that are only meaningful locally (e.g. inspecting children, passivation, etc.).

func (*PID) IsRelocatable

func (pid *PID) IsRelocatable() bool

IsRelocatable reports whether the actor may be relocated to another node if its host node shuts down unexpectedly. Actors are relocatable by default; pass WithRelocationDisabled at spawn time to opt out.

func (*PID) IsRemote

func (pid *PID) IsRemote() bool

IsRemote reports whether this PID is a lightweight handle for an actor on a remote node.

Remote PIDs hold only the actor's address and a remoting handle; they carry no mailbox, supervisor, or actor-system reference. Messaging through a remote PID is routed via the remoting layer rather than a local mailbox enqueue.

func (*PID) IsRunning

func (pid *PID) IsRunning() bool

IsRunning reports whether the actor is alive and ready to process messages. Returns false when the actor has not started, is stopping, passivating, or suspended.

func (*PID) IsSingleton

func (pid *PID) IsSingleton() bool

IsSingleton reports whether the actor was spawned as a cluster singleton. A singleton exists at most once across the entire cluster and is always hosted on the oldest node. When that node leaves unexpectedly the singleton is restarted on the new oldest node.

func (*PID) IsStopping

func (pid *PID) IsStopping() bool

IsStopping reports whether the actor has begun stopping, either explicitly or via passivation.

func (*PID) IsSuspended

func (pid *PID) IsSuspended() bool

IsSuspended reports whether the actor is suspended due to a fault.

func (*PID) Kind

func (pid *PID) Kind() string

Kind returns the reflected type name of the underlying Actor implementation. Returns an empty string for remote PIDs.

func (*PID) LatestActivityTime

func (pid *PID) LatestActivityTime() time.Time

LatestActivityTime returns the timestamp of the last message received by this actor. Returns the zero time when no message has been processed yet.

func (*PID) LatestProcessedDuration

func (pid *PID) LatestProcessedDuration() time.Duration

LatestProcessedDuration returns the elapsed time since the most recent message was processed.

func (*PID) Metric

func (pid *PID) Metric(ctx context.Context) *ActorMetric

Metric returns a snapshot of the actor's runtime metrics. Returns nil when the actor is not running. Cluster data is not included.

func (*PID) Name

func (pid *PID) Name() string

Name returns the actor name.

func (*PID) Parent

func (pid *PID) Parent() *PID

Parent returns the parent PID in the actor tree. Returns nil for root actors and remote PIDs.

func (*PID) PassivationStrategy

func (pid *PID) PassivationStrategy() passivation.Strategy

PassivationStrategy returns the passivation strategy configured for this actor.

func (*PID) Path

func (pid *PID) Path() Path

Path returns the actor path (location-transparent view of host, port, name, system, parent). Returns nil when called on a nil PID or when the address is nil. Use PathToAddress to convert to *address.Address when needed for RemoteTell, RemoteAsk, etc.

No lock is taken: path is written exactly once during construction and never mutated.

func (*PID) PipeTo

func (pid *PID) PipeTo(ctx context.Context, to *PID, task func() (any, error), opts ...PipeOption) error

PipeTo runs task asynchronously and, on success, delivers the result to to's mailbox. The calling actor is not blocked; it continues processing other messages while the task runs. On task failure the error is forwarded to the dead-letter queue. Returns ErrNotLocal for remote PIDs and ErrUndefinedTask when task is nil.

func (*PID) PipeToName

func (pid *PID) PipeToName(ctx context.Context, actorName string, task func() (any, error), opts ...PipeOption) error

PipeToName runs task asynchronously and, on success, delivers the result to the named actor's mailbox. The actor is resolved by name, providing location transparency: the caller does not need a PID. On task failure the error is forwarded to the dead-letter queue. Returns ErrNotLocal for remote PIDs and ErrUndefinedTask when task is nil.

func (*PID) ProcessedCount

func (pid *PID) ProcessedCount() int

ProcessedCount returns the total number of messages this actor has processed.

func (*PID) Reinstate

func (pid *PID) Reinstate(cid *PID) error

Reinstate resumes a suspended actor, allowing it to process messages again. The actor's internal state is preserved across the suspension. It is a no-op when cid is already running or not suspended. Returns ErrNotLocal for remote PIDs, ErrDead if this actor is not running, and ErrActorNotFound when cid is not known to the actor system.

See also: ReinstateNamed for name-based reinstatement.

func (*PID) ReinstateNamed

func (pid *PID) ReinstateNamed(ctx context.Context, actorName string) error

ReinstateNamed resumes a suspended actor identified by name. Unlike Reinstate, the actor is looked up by name, making this method suitable for cluster-wide recovery where the PID may not be available locally. It is a no-op when the actor is already running or not suspended. Returns ErrNotLocal for remote PIDs, ErrDead if this actor is not running, ErrActorNotFound when no actor with that name exists, and ErrRemotingDisabled when the actor is remote but remoting has not been configured.

See also: Reinstate for direct PID-based reinstatement.

func (*PID) RemoteLookup

func (pid *PID) RemoteLookup(ctx context.Context, host string, port int, name string) (*PID, error)

RemoteLookup resolves a named actor on a specific remote node and returns it as a PID. Returns ErrRemotingDisabled when remoting is not configured, and ErrActorNotFound when no actor with that name exists on the target node.

func (*PID) RemoteReSpawn

func (pid *PID) RemoteReSpawn(ctx context.Context, host string, port int, name string) (*PID, error)

RemoteReSpawn restarts a named actor on the specified remote node. Returns ErrRemotingDisabled when remoting is not configured.

func (*PID) RemoteStop

func (pid *PID) RemoteStop(ctx context.Context, host string, port int, name string) error

RemoteStop stops a named actor on the specified remote node. Returns ErrRemotingDisabled when remoting is not configured.

func (*PID) Restart

func (pid *PID) Restart(ctx context.Context) error

Restart restarts this actor and all running or suspended descendants.

The subtree is snapshotted, the same parent/child topology is rebuilt, and each actor is re-initialized via its PreStart hook. Suspended actors are reinitialized without a prior shutdown step; non-running descendants are skipped entirely. Mailboxes are not preserved — queued or in-flight messages may be dropped.

If any descendant fails to restart, Restart returns that error and the subtree may be only partially recovered. The target actor is left non-running on failure.

When pid is remote, Restart delegates to RemoteReSpawn via the remoting layer. Returns ErrRemotingDisabled when pid is remote but remoting is not configured, and ErrUndefinedActor for nil receivers.

func (*PID) RestartCount

func (pid *PID) RestartCount() int

RestartCount returns the total number of times this actor has been restarted.

func (*PID) Role

func (pid *PID) Role() *string

Role returns the cluster placement role assigned to this actor, or nil if none was set. Placement roles constrain on which nodes an actor may be started or relocated. This setting only affects SpawnOn and SpawnSingleton; local-only spawns ignore it.

func (*PID) SendAsync

func (pid *PID) SendAsync(ctx context.Context, actorName string, message any) error

SendAsync sends a message asynchronously to the named actor. The actor is resolved locally first; if not found, all active datacenters are queried.

func (*PID) SendSync

func (pid *PID) SendSync(ctx context.Context, actorName string, message any, timeout time.Duration) (response any, err error)

SendSync sends a synchronous message to the named actor and waits for a response. The actor is resolved locally first; if not found, all active datacenters are queried. It blocks until a response is received, the context is cancelled, or timeout elapses.

func (*PID) Shutdown

func (pid *PID) Shutdown(ctx context.Context) error

Shutdown gracefully stops this actor and all its children. When pid is remote, Shutdown delegates to RemoteStop via the remoting layer. Pending mailbox messages are processed before the actor terminates. Returns ErrRemotingDisabled when pid is remote but remoting is not configured, and ErrShutdownForbidden when called on a system actor while the actor system is still running.

func (*PID) SpawnChild

func (pid *PID) SpawnChild(ctx context.Context, name string, actor Actor, opts ...SpawnOption) (*PID, error)

SpawnChild creates, starts, and supervises a child actor with the given name. If a running child with the same name already exists, its PID is returned without creating a new one. Returns ErrNotLocal for remote PIDs and ErrDead if this actor is not running.

func (*PID) StashSize

func (pid *PID) StashSize() uint64

StashSize returns the number of messages currently held in the stash buffer.

func (*PID) Stop

func (pid *PID) Stop(ctx context.Context, cid *PID) error

Stop signals the given child PID to shut down after it finishes processing its current message. When cid is remote, Stop delegates to RemoteStop via the remoting layer. Returns ErrRemotingDisabled when cid is remote but remoting is not configured, ErrDead if this actor is not running, and ErrActorNotFound when cid is not a child of this actor. It is a no-op when cid is already stopped.

func (*PID) Tell

func (pid *PID) Tell(ctx context.Context, to *PID, message any) error

Tell sends a message asynchronously to the target PID. Routing is location-transparent: remote PIDs are handled via the remoting layer.

func (*PID) UnWatch

func (pid *PID) UnWatch(cid *PID)

UnWatch cancels the watch previously registered by Watch for cid. It is a no-op for remote PIDs.

func (*PID) Uptime

func (pid *PID) Uptime() int64

Uptime returns the number of seconds elapsed since the actor started. Returns zero when the actor is not running.

func (*PID) Watch

func (pid *PID) Watch(cid *PID)

Watch registers pid to receive a Terminated message when cid shuts down. It is a no-op for remote PIDs.

type PanicSignal

type PanicSignal struct {
	// contains filtered or unexported fields
}

PanicSignal is a system-level message used in actor-based systems to notify a parent actor that one of its child actors has encountered a critical failure or unhandled condition. This message is automatically sent when the Escalate supervision directive is invoked, indicating that the issue cannot be handled at the child level.

The child actor is suspended, and the parent actor is expected to take appropriate action. Upon receiving a PanicSignal, the parent actor can decide how to handle the failure, such as restarting the child, stopping it, escalating further, or applying custom recovery logic.

func NewPanicSignal

func NewPanicSignal(message any, reason string, timestamp time.Time) *PanicSignal

NewPanicSignal creates a new PanicSignal.

func (*PanicSignal) Message

func (p *PanicSignal) Message() any

Message returns the original message that triggered the failure.

func (*PanicSignal) Reason

func (p *PanicSignal) Reason() string

Reason returns the human-readable explanation of the failure.

func (*PanicSignal) Timestamp

func (p *PanicSignal) Timestamp() time.Time

Timestamp returns when the PanicSignal event occurred.

type Path

type Path interface {
	Host() string
	HostPort() string
	Port() int
	Name() string
	Parent() Path
	String() string
	System() string
	Equals(other Path) bool
}

Path represents the logical path of an actor within an actor system. It provides a location-transparent view of host, port, name, system, and parent.

type PausePassivation

type PausePassivation struct{}

PausePassivation is a system-level message used to pause the passivation of an actor. One can send this message to an actor to prevent it from being passivated when passivation is enabled. This is useful in scenarios where an actor needs to remain active for a certain period while processing critical messages or performing important tasks. This is a fire-and-forget message, so it does not expect a response. This will no-op if the actor does not have passivation enabled.

type PipeOption

type PipeOption func(config *pipeConfig)

PipeOption configures a pipeConfig instance.

Options are mutually exclusive; attempting to set more than one will return ErrOnlyOneOptionAllowed.

func WithCircuitBreaker

func WithCircuitBreaker(cb *breaker.CircuitBreaker) PipeOption

WithCircuitBreaker configures PipeTo with a circuit breaker that controls whether task outcomes are delivered. If the breaker is open due to repeated failures, outcomes will be dropped instead of being sent.

func WithTimeout

func WithTimeout(timeout time.Duration) PipeOption

WithTimeout configures PipeTo with a maximum duration for waiting on the task outcome. If the result is not available within this duration, the message will not be delivered.

type PoisonPill

type PoisonPill struct{}

PoisonPill is a special control message used to gracefully stop an actor.

When an actor receives a PoisonPill, it will initiate a controlled shutdown sequence. The PoisonPill is enqueued in the actor's mailbox like any other message, meaning:

  • It will not interrupt message processing.
  • It will only be handled after all previously enqueued messages are processed.

This allows the actor to finish processing in-flight work before termination, ensuring clean shutdown semantics without abrupt interruptions.

type PostStart

type PostStart struct{}

PostStart is used when an actor has successfully started

type PostStopFunc

type PostStopFunc = func(ctx context.Context) error

PostStopFunc defines the PostStopFunc hook for an actor creation

type PreStartFunc

type PreStartFunc = func(ctx context.Context) error

PreStartFunc defines the PreStartFunc hook for an actor creation

type PriorityFunc

type PriorityFunc func(msg1, msg2 any) bool

PriorityFunc defines the priority function that will help determines the priority of two messages

type Publish

type Publish struct {
	// contains filtered or unexported fields
}

Publish is used to send a message to a topic by the TopicActor. The message will be broadcasted to all actors that are subscribed to the topic in the cluster.

func NewPublish

func NewPublish(id, topic string, message any) *Publish

NewPublish creates a new Publish message.

func (*Publish) ID

func (p *Publish) ID() string

ID returns the message unique identifier.

func (*Publish) Message

func (p *Publish) Message() any

Message returns the message payload.

func (*Publish) Topic

func (p *Publish) Topic() string

Topic returns the topic to publish to.

type ReceiveContext

type ReceiveContext struct {
	// contains filtered or unexported fields
}

ReceiveContext carries per-message context and operations available to an actor while handling a single message. It exposes:

  • Message metadata (Message, Sender, RemoteSender)
  • Actor lifecycle and behavior management (Become, BecomeStacked, UnBecome, UnBecomeStacked, Stash, Unstash, UnstashAll, Stop, Shutdown, Watch, UnWatch, Reinstate, ReinstateNamed)
  • Messaging operations (Tell, Ask, Request/RequestName, BatchTell, BatchAsk, SendAsync, SendSync, Forward, ForwardTo, RemoteTell/Ask/BatchTell/BatchAsk/Forward, RemoteLookup)
  • Utilities (Context, Logger, Err, Response for Ask, PipeTo/PipeToName, ActorSystem access, Extension/Extensions)

Concurrency and lifecycle:

  • A ReceiveContext instance is created by the runtime for each delivered message and is only valid within the scope of handling that message. Do not retain it beyond the current Receive call.
  • Methods that send messages are non-blocking unless explicitly documented (e.g., Ask/SendSync).
  • The underlying actor processes messages one-at-a-time, preserving mailbox order.

Context propagation:

  • ReceiveContext.Context() is the message-scoped context. Blocking calls invoked through ReceiveContext use a non-cancelable derivative (context.WithoutCancel) so that finishing work is not inadvertently canceled by the caller context. Use timeouts on synchronous calls.

Message immutability:

  • Message returns an any. Treat it as immutable; copy before mutation.

Sender semantics:

  • Sender() returns the PID of the message sender — local or remote.
  • For remote messages, Sender() holds a lightweight remote PID created from the sender address embedded in the wire message.
  • Use Sender().Path() for the sender's path and Self().Path() for the receiver's path (handle nil PIDs; use PathToAddress when *address.Address is needed).

Examples:

func (a *MyActor) Receive(ctx *actor.ReceiveContext) {
    switch msg := ctx.Message().(type) {
    case *Ping:
        ctx.Respond(&Pong{}) // Ask reply
    case *Work:
        ctx.PipeToName("worker", func() (any, error) {
            return doWork(msg), nil
        })
    default:
        ctx.Unhandled()
    }
}

func (*ReceiveContext) ActorSystem

func (rctx *ReceiveContext) ActorSystem() ActorSystem

ActorSystem returns the ActorSystem hosting the current actor.

Use this to access system-level configuration, tools, or registries.

func (*ReceiveContext) Ask

func (rctx *ReceiveContext) Ask(to *PID, message any, timeout time.Duration) (response any)

Ask sends a synchronous request to another actor and waits for a reply.

The call blocks until a response arrives or the timeout elapses. On timeout or delivery error, the error is recorded via Err and the returned response may be nil. Choose timeouts carefully to avoid false positives.

Prefer small timeouts and design protocols so that timeouts are treated as expected failures.

func (*ReceiveContext) BatchAsk

func (rctx *ReceiveContext) BatchAsk(to *PID, messages []any, timeout time.Duration) (responses chan any)

BatchAsk sends multiple synchronous requests to a target PID and waits for replies.

Replies are returned on a channel in the same order as the input messages. Use the provided timeout to bound the overall wait. Errors are recorded via Err.

func (*ReceiveContext) BatchTell

func (rctx *ReceiveContext) BatchTell(to *PID, messages ...any)

BatchTell sends multiple messages asynchronously to the target PID.

Messages are enqueued and processed one at a time in the order provided. This preserves the actor model's single-threaded processing guarantee. For a single message, this is equivalent to Tell.

func (*ReceiveContext) Become

func (rctx *ReceiveContext) Become(behavior Behavior)

Become replaces the current behavior with a new one.

The current message finishes under the old behavior. The new behavior will be used for subsequent messages. This does not maintain a stack; use BecomeStacked when you need to restore the previous behavior later.

func (*ReceiveContext) BecomeStacked

func (rctx *ReceiveContext) BecomeStacked(behavior Behavior)

BecomeStacked pushes a new behavior on top of the current one.

The current message continues to be processed by the existing behavior; subsequent messages are handled by the newly stacked behavior until UnBecomeStacked is called.

Use this to model temporary protocol phases (e.g., awaiting response).

func (*ReceiveContext) Child

func (rctx *ReceiveContext) Child(name string) *PID

Child returns the PID of a named child actor if it exists and is alive.

On failure, Err is set and a nil PID may be returned.

func (*ReceiveContext) Children

func (rctx *ReceiveContext) Children() []*PID

Children returns the list of all live child PIDs of the current actor.

The returned slice represents a snapshot at the time of the call.

func (*ReceiveContext) Context

func (rctx *ReceiveContext) Context() context.Context

Context returns the context associated with the current message.

This context is bound to the delivery of this message. Blocking calls dispatched via ReceiveContext typically derive a non-cancelable context to avoid accidental cancellation from the caller, so prefer using explicit timeouts on synchronous APIs.

Do not store this context beyond the current Receive invocation.

func (*ReceiveContext) CorrelationID

func (rctx *ReceiveContext) CorrelationID() string

CorrelationID returns the async correlation ID when handling a Request/RequestName message.

Design decision: correlation IDs are internal to the async envelope and exposed for tracing/debugging without changing the user message contract. For non-async messages this returns an empty string.

func (*ReceiveContext) Dependencies

func (rctx *ReceiveContext) Dependencies() []extension.Dependency

Dependencies returns a slice containing all dependencies currently registered within the given actor

These dependencies are typically injected at actor initialization (via SpawnOptions) and made accessible during the actor's lifecycle. They can include services, clients, or other resources that the actor may utilize while processing messages.

Returns:

  • []extension.Dependency: All registered dependencies in the actor.

func (*ReceiveContext) Dependency

func (rctx *ReceiveContext) Dependency(dependencyID string) extension.Dependency

Dependency retrieves a specific dependency registered in the actor by its unique ID.

This allows actors to access injected services or resources directly from the ReceiveContext.

Parameters:

  • dependencyID: A unique string identifier used when the dependency was registered.

Returns:

  • extension.Dependency: The corresponding dependency if found, or nil otherwise.

Example:

myService := rctx.Dependency("dependencyID").(MyService)

func (*ReceiveContext) Err

func (rctx *ReceiveContext) Err(err error)

Err records a non-fatal error observed during message handling.

Use Err to report issues to the runtime without panicking. Supervisors or the actor system may log, escalate, or apply policies based on this error. Calling Err does not stop message processing immediately.

Typical usage:

if err != nil {
    rctx.Err(err)
    return
}

func (*ReceiveContext) Extension

func (rctx *ReceiveContext) Extension(extensionID string) extension.Extension

Extension returns the extension registered under the given ID, or nil if not found.

Use type assertion to access the concrete extension API. The returned value is shared system-wide; treat it as read-safe or follow its concurrency contract.

func (*ReceiveContext) Extensions

func (rctx *ReceiveContext) Extensions() []extension.Extension

Extensions returns all extensions registered in the ActorSystem associated with this context.

Extensions provide cross-cutting services (metrics, tracing, persistence, etc.) that can be accessed by actors at runtime.

func (*ReceiveContext) Forward

func (rctx *ReceiveContext) Forward(to *PID)

Forward forwards the current message to another local PID, preserving the original sender.

The receiver of the forwarded message sees the original Sender. This is only valid within a single-node system where the target PID is known and running. No action is taken if the target is not running.

func (*ReceiveContext) ForwardTo

func (rctx *ReceiveContext) ForwardTo(actorName string)

ForwardTo forwards the current message to a named actor, preserving the original sender.

This is location transparent within a clustered system. No action is taken if the system is not in cluster mode.

func (*ReceiveContext) Logger

func (rctx *ReceiveContext) Logger() log.Logger

Logger returns the actor system logger.

Use this for structured logging tied to the actor system's logging configuration.

func (*ReceiveContext) Message

func (rctx *ReceiveContext) Message() any

Message returns the protobuf message being processed.

Treat the returned message as immutable. Use type assertions or pattern matching on the message type to implement your actor behavior.

func (*ReceiveContext) PipeTo

func (rctx *ReceiveContext) PipeTo(to *PID, task func() (any, error), opts ...PipeOption)

PipeTo runs a task asynchronously and sends its successful result to the target PID.

The calling actor remains responsive while the task executes. On success, the returned proto.Message is delivered to the target's mailbox. On failure, behavior is controlled by PipeOptions (e.g., error mapping, retries).

The task runs outside the actor's mailbox thread. Avoid mutating the actor's internal state inside the task. Communicate results via the returned message.

func (*ReceiveContext) PipeToName

func (rctx *ReceiveContext) PipeToName(actorName string, task func() (any, error), opts ...PipeOption)

PipeToName runs a task asynchronously and sends its successful result to a named actor.

Unlike PipeTo, the target is resolved by name (location transparent). Semantics otherwise match PipeTo. Use this to decouple background work from a specific PID.

func (*ReceiveContext) Reinstate

func (rctx *ReceiveContext) Reinstate(cid *PID)

Reinstate transitions a previously suspended actor (by PID) back to active.

The actor resumes processing its mailbox and continues from its prior internal state. Typical callers include supervisors and administrative actors. Errors are recorded via Err.

func (*ReceiveContext) ReinstateNamed

func (rctx *ReceiveContext) ReinstateNamed(actorName string)

ReinstateNamed transitions a previously suspended actor (by name) back to active.

This is useful in cluster mode where actors are addressed by name and a PID may not be directly available. Errors are recorded via Err.

func (*ReceiveContext) RemoteLookup

func (rctx *ReceiveContext) RemoteLookup(host string, port int, name string) *PID

RemoteLookup resolves an actor name on a remote node to an address.

If the provided actor system is nil, the lookup uses the current actor's system. On failure, the error is recorded via Err and the returned address may be nil.

func (*ReceiveContext) RemoteReSpawn

func (rctx *ReceiveContext) RemoteReSpawn(host string, port int, name string) *PID

RemoteReSpawn restarts (re-spawns) a named actor on a remote node.

Returns the restarted actor's PID when successful, or nil and an error when the actor is not found or the operation fails. Errors are also recorded via Err.

func (*ReceiveContext) Request

func (rctx *ReceiveContext) Request(to *PID, message any, opts ...RequestOption) RequestCall

Request sends a message to another actor and returns a RequestCall.

Request is the non-blocking counterpart of Ask. Use it when the current actor must remain responsive (e.g., fan-out, long I/O via PipeTo, or avoiding call cycles like A -> B -> A). Unlike Ask/SendSync, this method does not wait for a reply.

Reply delivery and continuations:

  • The reply is delivered back through this actor’s mailbox (single-threaded), so any continuation registered on the returned handle executes within the actor processing model.
  • Use the returned RequestCall to register a continuation (e.g., Then) and/or cancel the request.

Reentrancy requirement:

  • The actor must be spawned with WithReentrancy enabled; otherwise the request may be rejected and Err will be set.

Timeouts and options:

  • Per-call behavior (timeout, mode/policy) can be customized via RequestOption. Defaults may be configured at actor/system level.

On failure to initiate the request, Err is set and the returned call is nil.

func (*ReceiveContext) RequestName

func (rctx *ReceiveContext) RequestName(actorName string, message any, opts ...RequestOption) RequestCall

RequestName sends a message to an actor identified by name and returns a RequestCall.

This is the non-blocking, location-transparent counterpart of SendSync: the target may be local or remote depending on the actor system configuration (e.g., cluster/remoting). Name resolution happens once at call time (local registry and/or cluster lookup, if enabled).

Reply delivery and continuations:

  • Replies are delivered back to the caller through this actor’s mailbox, preserving the single-threaded actor processing model.
  • Use the returned RequestCall to register continuations (e.g., Then) and/or cancel the request.

Reentrancy requirement:

  • The actor must be spawned with reentrancy enabled; otherwise the request may be rejected and Err will be set.

Options:

  • Per-call behavior (timeout, mode/policy, etc.) can be customized via RequestOption. Defaults may be configured at actor/system level.

On failure to initiate the request, Err is set and the returned call is nil.

func (*ReceiveContext) Response

func (rctx *ReceiveContext) Response(resp any)

Response publishes a reply to the sender of the current message.

Design decision:

  • If the message was initiated via Ask, Response completes that request.
  • If the message was initiated via Request/RequestName, Response sends an async reply to the stored correlation ID and reply address.
  • If neither applies, Response is a no-op.

Use Respond/Response exactly once per Ask-initiated message. Multiple calls may be ignored or override each other depending on the underlying channel semantics.

func (*ReceiveContext) Self

func (rctx *ReceiveContext) Self() *PID

Self returns the PID of the currently executing actor.

The returned PID is the logical identity of the recipient processing the current message. It can be used to access actor-level facilities (ActorSystem, Logger, etc.). Self may be nil only in internal bootstrap flows; within Receive it is non-nil.

func (*ReceiveContext) SendAsync

func (rctx *ReceiveContext) SendAsync(actorName string, message any)

SendAsync sends an asynchronous message to a named actor.

The actor name is resolved within the system (local or cluster), if supported. This call does not block and does not expect a reply.

func (*ReceiveContext) SendSync

func (rctx *ReceiveContext) SendSync(actorName string, message any, timeout time.Duration) (response any)

SendSync sends a synchronous request to a named actor and waits for a reply.

The actor may be local or remote, depending on system configuration. Blocks until a response is received or the timeout expires. On error or timeout, the error is recorded via Err and the returned response may be nil.

func (*ReceiveContext) Sender

func (rctx *ReceiveContext) Sender() *PID

Sender returns the PID of the message sender.

For local messages this is the sending actor's PID. For messages that originated from a remote node, this is a lightweight remote PID constructed from the wire sender address. Returns NoSender when the sender is unknown or the message was system-generated.

func (*ReceiveContext) Shutdown

func (rctx *ReceiveContext) Shutdown()

Shutdown gracefully stops the current actor.

The actor completes processing of already enqueued messages (subject to system configuration) before terminating. All child actors are also shut down. Errors are recorded via Err.

func (*ReceiveContext) Spawn

func (rctx *ReceiveContext) Spawn(name string, actor Actor, opts ...SpawnOption) *PID

Spawn creates a named child actor and returns its PID.

On failure, Err is set and a nil PID may be returned. Options can be used to configure mailbox, supervision, etc.

func (*ReceiveContext) Stash

func (rctx *ReceiveContext) Stash()

Stash buffers the current message to be processed later.

This is useful when the message cannot be handled under the current behavior (e.g., waiting for some state or handshake). Messages are preserved in arrival order.

Stash requires a stash buffer (enable via WithStashing or a stash-mode reentrancy request); otherwise ErrStashBufferNotSet is recorded.

Beware that indiscriminate stashing can grow memory usage. Always ensure stashed messages are eventually unstashed.

func (*ReceiveContext) Stop

func (rctx *ReceiveContext) Stop(child *PID)

Stop requests a child actor to terminate after it finishes its current message.

If the child is already stopped, this is a no-op. Errors stopping the child are recorded via Err.

func (*ReceiveContext) Tell

func (rctx *ReceiveContext) Tell(to *PID, message any)

Tell sends an asynchronous message to the target PID.

Delivery preserves ordering per-sender. This method does not block. Use Ask when a reply is needed.

func (*ReceiveContext) UnBecome

func (rctx *ReceiveContext) UnBecome()

UnBecome resets the actor behavior to its default (initial) behavior, clearing any stacked or currently swapped behavior.

Use this to return the actor to its baseline protocol state.

func (*ReceiveContext) UnBecomeStacked

func (rctx *ReceiveContext) UnBecomeStacked()

UnBecomeStacked pops the most recently stacked behavior.

After calling UnBecomeStacked, the actor resumes the previous behavior (the one that was active before the last BecomeStacked call). No effect if there is no stack.

func (*ReceiveContext) UnWatch

func (rctx *ReceiveContext) UnWatch(cid *PID)

UnWatch stops watching the given PID for termination notifications.

Idempotent: unwatching a PID that is not watched has no adverse effect.

func (*ReceiveContext) Unhandled

func (rctx *ReceiveContext) Unhandled()

Unhandled marks the current message as unhandled without raising a panic.

The system may log or route this condition depending on configuration and supervision strategy. Prefer Unhandled when unknown messages are expected.

func (*ReceiveContext) Unstash

func (rctx *ReceiveContext) Unstash()

Unstash dequeues the oldest stashed message and prepends it to the mailbox.

The unstashed message will be processed before any newly arriving messages, preserving the original arrival order relative to other stashed messages.

func (*ReceiveContext) UnstashAll

func (rctx *ReceiveContext) UnstashAll()

UnstashAll moves all stashed messages back to the mailbox in arrival order.

Older stashed messages are delivered before newer ones. Use this after a behavior transition that enables processing of the previously deferred messages.

func (*ReceiveContext) Watch

func (rctx *ReceiveContext) Watch(cid *PID)

Watch subscribes the current actor to termination notifications for the given PID.

When the watched actor terminates, a Terminated message is sent to the watcher. Idempotent: watching an already watched PID has no adverse effect.

type ReceiveFunc

type ReceiveFunc = func(ctx context.Context, message any) error

ReceiveFunc is a message handling placeholder

type RecoveryOption

type RecoveryOption func(*ShutdownHookRecovery)

RecoveryOption defines a functional option for configuring a ShutdownHookRecovery.

These options are used with NewShutdownHookRecovery to customize the number of retries, the delay between retries, and the recovery policy for shutdown hooks. Each option is a function that modifies the ShutdownHookRecovery instance during its construction.

Example usage:

recovery := NewShutdownHookRecovery(
    WithShutdownHookRetries(3),
    WithShutdownHookRetryDelay(2 * time.Second),
    WithShutdownHookRecoveryPolicy(ShouldRetryAndSkip),
)

func WithShutdownHookRecoveryStrategy

func WithShutdownHookRecoveryStrategy(strategy RecoveryStrategy) RecoveryOption

WithShutdownHookRecoveryStrategy sets the RecoveryStrategy for a ShutdownHookRecovery. Use this option with NewShutdownHookRecovery to specify the error handling policy to apply if a shutdown hook fails after all retries.

Parameters:

  • strategy: The RecoveryStrategy to use (e.g., ShouldFail, ShouldRetryAndSkip).

Example:

recovery := NewShutdownHookRecovery(
    WithShutdownHookRecoveryStrategy(ShouldRetryAndSkip),
)

func WithShutdownHookRetry

func WithShutdownHookRetry(retries int, interval time.Duration) RecoveryOption

WithShutdownHookRetry configures the number of retries and the interval between retries for a ShutdownHookRecovery. Use this option with NewShutdownHookRecovery to specify how many times a shutdown hook should be retried and how long to wait between attempts.

Parameters:

  • retries: The number of retry attempts.
  • interval: The duration to wait between retries.

Example:

recovery := NewShutdownHookRecovery(
    WithShutdownHookRetry(2, time.Second),
)

type RecoveryStrategy

type RecoveryStrategy int

RecoveryStrategy defines the strategy to apply when a ShutdownHook fails during the shutdown process.

This policy determines how the shutdown sequence should proceed if a hook returns an error. It allows fine-grained control over error handling, including whether to halt, retry, skip, or combine these actions.

The available policies are:

  • ShouldFail: Stop execution and report the error.
  • ShouldRetryAndFail: Retry the failed hook, then stop if still unsuccessful.
  • ShouldSkip: Skip the failed hook and continue with the next.
  • ShouldRetryAndSkip: Retry the failed hook, then skip and continue if still unsuccessful.
const (
	// ShouldFail indicates that if a ShutdownHook fails, the shutdown process should immediately stop executing any remaining hooks.
	//
	// The error from the failed hook is reported, and no further shutdown hooks are run.
	// Use this policy when subsequent hooks depend on the success of previous ones, or when a failure should halt the shutdown sequence.
	ShouldFail RecoveryStrategy = iota

	// ShouldRetryAndFail indicates that if a ShutdownHook fails, the system should retry executing the hook.
	//
	// The shutdown process will pause and repeatedly attempt the failed hook until it succeeds or a maximum retry limit is reached.
	// If the hook still fails after all retries, the error is reported and no further hooks are executed.
	// Use this policy when the hook is critical and transient errors may be recoverable.
	ShouldRetryAndFail

	// ShouldSkip indicates that if a ShutdownHook fails, the error should be reported, but the shutdown process should skip the failed hook and continue executing the remaining hooks.
	//
	// Use this policy when hooks are independent and a failure in one should not prevent the execution of others.
	ShouldSkip

	// ShouldRetryAndSkip indicates that if a ShutdownHook fails, the system should retry executing the hook.
	//
	// The shutdown process will pause and repeatedly attempt the failed hook until it succeeds or a maximum retry limit is reached.
	// If the hook still fails after all retries, the error is reported, but the shutdown process continues with the remaining hooks.
	// Use this policy when you want to maximize the chance of successful execution, but do not want a persistent failure to block the shutdown sequence.
	ShouldRetryAndSkip
)

type RequestCall

type RequestCall interface {
	// Then registers a continuation to be invoked when the request completes.
	//
	// The callback is passed either the response message (as proto.Message) and
	// a nil error, or a nil message and a non-nil error.
	//
	// Execution:
	//   - If registered before completion, the continuation is intended to run on
	//     the actor's mailbox thread (see the type-level comment).
	//   - If called after completion, the continuation runs immediately and
	//     synchronously in the caller's goroutine.
	//
	// Only the first call to Then is honored; subsequent calls are ignored.
	Then(func(any, error))

	// Cancel requests cancellation of the pending request.
	//
	// Cancel is safe to call multiple times. It returns nil if the request has
	// already completed or cancellation has already been requested.
	//
	// Cancellation is best-effort and may race with completion. When possible,
	// the cancellation is delivered via the actor's mailbox so that any
	// continuation runs on the actor's processing thread.
	Cancel() error
}

RequestCall represents a handle to a pending asynchronous request started via Request or RequestName.

A RequestCall lets callers:

  • register exactly one completion continuation (Then), and
  • attempt to cancel the in-flight request (Cancel).

Concurrency and execution model

To preserve single-threaded access to an actor's internal state, continuations registered with Then are intended to run on the actor's mailbox (processing) thread when the request completes.

Call Then from within Receive (or otherwise from the actor's message loop) to ensure the continuation executes on the mailbox thread. If Then is registered after the request has already completed, the continuation is invoked synchronously in the caller's goroutine.

Then is one-shot: only the first call registers a continuation; subsequent calls are ignored.

Cancellation

Cancel requests cancellation of the pending request. Cancellation is best-effort: the request may still complete successfully or with an error if it races with completion or if the underlying operation cannot be interrupted.

Cancel is idempotent: it may be called multiple times. If the request is already completed or cancellation has already been requested, Cancel returns nil.

When possible, cancellation is delivered through the actor's mailbox so that any completion continuation runs on the actor's processing thread.

type RequestOption

type RequestOption func(*requestConfig)

RequestOption configures a single async request.

Design decision: request-level options let callers refine behavior without changing actor-wide defaults.

func WithReentrancyMode

func WithReentrancyMode(mode reentrancy.Mode) RequestOption

WithReentrancyMode overrides the actor-level reentrancy policy for this request.

This is a per-call override: it affects only the async request being started (via Request/RequestName) and does not mutate the actor's configured default.

Use this to temporarily opt into stricter or looser reentrancy semantics for a single interaction—for example, select reentrancy.StashNonReentrant for a blocking/serialized request—while leaving the actor's overall behavior unchanged.

Note: while any StashNonReentrant request is in flight, user messages are stashed until the last blocking request completes; per-call overrides do not bypass this.

If the actor does not have reentrancy enabled, Request/RequestName still return ErrReentrancyDisabled regardless of this option. If mode is reentrancy.Off, the request is rejected with ErrReentrancyDisabled.

Design decision: per-call overrides allow safe, targeted deviations from the actor default (e.g., stashing a single request while keeping general throughput).

func WithRequestTimeout

func WithRequestTimeout(timeout time.Duration) RequestOption

WithRequestTimeout sets a timeout for a single async request.

If the timeout elapses before a response is observed, the request completes with ErrRequestTimeout. The timeout signal is delivered through the requester's mailbox, so completion timing depends on mailbox processing. It is best-effort and may race with a successful response (whichever completion wins).

A value <= 0 disables the timeout for this request (no automatic expiry).

Notes:

  • This option is per-request; there is no implicit/global default timeout.
  • On completion (success, error, cancellation, or timeout) any registered continuation (Then) is invoked according to RequestCall's execution rules.
  • Cancel and timeout are independent signals; either may "win" depending on timing.

type ResumePassivation

type ResumePassivation struct{}

ResumePassivation is a system-level message used to resume the passivation of an actor. One can send this message to an actor to allow it to be passivated again after it has been paused. This is useful in scenarios where an actor has temporarily paused its passivation to complete critical tasks or handle important messages, and now it can return to its normal passivation behavior. This is a fire-and-forget message, so it does not expect a response.

This will no-op if the actor does not have passivation enabled. If the actor is not created with a custom passivation timeout, it will use the default passivation timeout.

type Routees

type Routees struct {
	// contains filtered or unexported fields
}

Routees contains the set of routee names managed by a router at the time of the GetRoutees request.

Semantics:

  • Names represents the router's best-effort snapshot when the request was handled.
  • A name can be resolved to a local PID/reference via ActorSystem.ActorOf.
  • Since routers are locally scoped, these names are not valid across different actor systems or nodes.

Example (Go):

resp, err := ctx.Ask(routerPID, &actor.GetRoutees{}, 500*time.Millisecond)
if err == nil {
    r := resp.(*actor.Routees)
    for _, name := range r.Names() {
        if pid := ctx.ActorSystem().ActorOf(ctx, name); pid != nil {
            ctx.Tell(pid, &MyMessage{})
        }
    }
}

func NewRoutees

func NewRoutees(names []string) *Routees

NewRoutees creates a new Routees response with the given routee names.

func (*Routees) Names

func (r *Routees) Names() []string

Names returns the list of routee actor names (unique within the router's scope).

type RouterOption

type RouterOption interface {
	// Apply sets the Option value of a config.
	Apply(r *router)
}

RouterOption is the interface that applies a configuration option.

func AsScatterGatherFirst

func AsScatterGatherFirst(within time.Duration) RouterOption

AsScatterGatherFirst configures the router to use the Scatter‑Gather First routing strategy.

Pattern summary:

  • Sends the same request to all currently live routees concurrently.
  • Returns the first successful reply within the time budget (within).
  • Late replies are ignored once the first result is returned or the deadline elapses.
  • Replies arrive asynchronously to the Broadcast sender; the router never blocks.

Behavioral contract (first-response-wins):

  • Every routee may work on the request (duplicate work is expected).
  • The first successful reply is forwarded to the original sender; remaining replies are dropped.
  • If no routee replies before within, the sender receives a StatusFailure.
  • If no live routees exist at send time, fail fast to avoid wasting time.

Parameters:

within: Total time budget for obtaining any successful response.
        Must be > 0. Choose based on SLOs or empirical latency.

Tuning guidelines:

  • within ≈ p95/p99 of expected fast response or a business SLO so healthy routees usually win.
  • For large pools, consider sampling a subset (e.g., nearest N) or capping concurrency to avoid storms.
  • Add a small safety margin for serialization and scheduling overheads before declaring a timeout.

Example:

// Create a scatter‑gather router that waits up to 500ms for any reply.
r := newRouter(10, &MyWorker{}, logger, AsScatterGatherFirst(500*time.Millisecond))

payload, _ := anypb.New(&MyRequest{})
ctx.Tell(rPID, &goaktpb.Broadcast{Message: payload})

// Later, handle the first successful reply (or StatusFailure) asynchronously
// in the sender's Receive.

Errors and edge cases:

  • within <= 0 should be validated by the caller before passing.
  • Expect a timeout error after within elapses without any reply.
  • Decide what constitutes “successful reply” (e.g., non‑error response type).

Returns a RouterOption that sets internal state used by the router.

func AsTailChopping

func AsTailChopping(within, interval time.Duration) RouterOption

AsTailChopping configures the router to use the Tail-Chopping routing strategy.

Tail-Chopping prioritizes predictable latency over full concurrency by probing one routee at a time (in random order) and only moving on when the current attempt does not respond fast enough. This avoids thundering herds while still racing to find any healthy worker within a bounded deadline. Replies are forwarded asynchronously back to the Broadcast sender; the router itself never blocks waiting for results.

Sequence (conceptual):

  1. Shuffle live routees to avoid bias.
  2. Send the message to the first routee.
  3. If no response arrives before interval, send to the next routee.
  4. Repeat until: - A routee replies (first success wins and the rest are cancelled). - All routees have been attempted. - The cumulative time exceeds within (global deadline).
  5. If the deadline expires without success, the sender receives a StatusFailure.

Parameters:

within   - Total time budget allowed for obtaining any successful response.
           Must be > 0. Starts counting when the first routee receives the message.
interval - Delay between successive attempts. Must be > 0 and < within for multiple routees
           to be attempted. If interval >= within only the first routee is tried.

Recommended tuning guidelines:

  • interval ≈ p95 of expected fast response time so that healthy actors reply before the next probe fires.
  • within ≈ (number_of_routees * interval) or a business SLO bound to cap worst-case latency.
  • Ensure interval << within to allow multiple attempts; otherwise only the first probe runs.

Example:

// Create a tail-chopping router that will try for up to 2s,
// issuing a new attempt every 200ms until a reply is obtained.
r := newRouter(5, &MyWorker{}, logger, AsTailChopping(2*time.Second, 200*time.Millisecond))

Use Scatter-Gather when you need the absolute fastest responder; use Tail-Chopping when you prefer controlled sequential probing with upper-bounded fan-out.

Returns a RouterOption that sets internal state used by the router.

func WithRestartRouteeOnFailure

func WithRestartRouteeOnFailure(maxRetries uint32, timeout time.Duration) RouterOption

WithRestartRouteeOnFailure sets the router's supervision directive for its routees to Restart.

Behavior:

  • The failing routee is restarted (stopped and started anew), resetting its internal state.
  • The failing message is not retried by the router.
  • Subsequent messages may be processed after the restart.
  • Use when local state may be corrupted or requires re-initialization.

Notes:

  • Mutually exclusive with WithResumeRouteeOnFailure and WithStopRouteeOnFailure; the last applied wins.

func WithResumeRouteeOnFailure

func WithResumeRouteeOnFailure() RouterOption

WithResumeRouteeOnFailure sets the router's supervision directive for its routees to Resume.

Behavior:

  • The failing routee keeps its current in-memory state and continues processing the next messages.
  • The failing message is not retried by the router.
  • Best for transient, non-corrupting failures (e.g., timeouts, temporary downstream issues).

Notes:

  • Mutually exclusive with WithRestartRouteeOnFailure and WithStopRouteeOnFailure; the last applied wins.
  • If no directive option is provided, the default is Restart.

func WithRoutingStrategy

func WithRoutingStrategy(strategy RoutingStrategy) RouterOption

WithRoutingStrategy sets the routing strategy to use by the router to forward messages to its routees. By default, FanOutRouting is used. ⚠️ Note: Fan-out, round-robin, and random strategies are fire-and-forget; they never wait for replies. When you need the router to observe replies consider Scatter-Gather First or Tail-Chopping, but remember that routers always operate asynchronously—the outcome is delivered back to the sender as a new message rather than through Ask.

func WithStopRouteeOnFailure

func WithStopRouteeOnFailure() RouterOption

WithStopRouteeOnFailure sets the router's supervision directive for its routees to Stop.

Behavior:

  • The failing routee is terminated and removed from the routing pool.
  • The failing message is not retried by the router.
  • Use when the routee cannot continue safely or should be quarantined.

Notes:

  • Mutually exclusive with WithResumeRouteeOnFailure and WithRestartRouteeOnFailure; the last applied wins.
  • The router will stop sending messages to the stopped routee; it is not automatically replaced by this option.
  • This is the default directive if none is specified.

type RouterOptionFunc

type RouterOptionFunc func(router *router)

RouterOptionFunc implements the Option interface.

func (RouterOptionFunc) Apply

func (f RouterOptionFunc) Apply(r *router)

type RoutingStrategy

type RoutingStrategy int

RoutingStrategy defines how a router actor forwards incoming messages to its routees.

Available strategies:

  • RoundRobinRouting: Distributes messages one at a time to each routee in sequence. Useful for balancing uniform, stateless workloads. Example: // creates a router that round-robins messages across 5 workers r := newRouter(5, &MyWorker{}, logger, WithRoutingStrategy(RoundRobinRouting))
  • RandomRouting: Chooses a routee at random for every message. Useful when uneven load patterns are acceptable or desired. Example: r := newRouter(10, &MyWorker{}, logger, WithRoutingStrategy(RandomRouting))
  • FanOutRouting: Broadcasts each message to all active routees concurrently. Useful for pub/sub, cache invalidation, or multi-sink processing. Example: r := newRouter(3, &EventConsumer{}, logger, WithRoutingStrategy(FanOutRouting))

Note: If a routee stops, it is removed from the internal map and no longer receives messages.

const (
	// RoundRobinRouting sends each incoming message to the next routee in order,
	// cycling back to the first after the last. Provides even distribution.
	RoundRobinRouting RoutingStrategy = iota
	// RandomRouting selects a routee uniformly at random for each message.
	RandomRouting
	// FanOutRouting broadcasts every message to all currently available routees.
	FanOutRouting
)

type ScheduleOption

type ScheduleOption interface {
	// Apply sets the Option value of a config.
	Apply(*scheduleConfig)
}

ScheduleOption defines an interface for applying configuration options to a scheduleConfig instance

func WithReference

func WithReference(referenceID string) ScheduleOption

WithReference sets a custom reference ID for the scheduled message.

This reference ID uniquely identifies the scheduled message and can be used later to manage it, such as canceling, pausing, or resuming the message.

If no reference ID is explicitly set using this option, the scheduler will generate an automatic reference internally. However, omitting a reference may make it impossible to manage the message later, as you'll lack a consistent identifier.

Parameters:

  • referenceID: A user-defined unique identifier for the scheduled message.

Returns:

  • ScheduleOption: An option that can be passed to the scheduler to associate the reference ID with the message.

Note:

  • It's strongly recommended to set a reference ID if you plan to cancel, pause, or resume the message later.

func WithSender

func WithSender(sender *PID) ScheduleOption

WithSender returns a ScheduleOption that explicitly sets the sender PID for a scheduled message.

This is useful when you want to associate the scheduled message with a specific sender (PID).

Parameters:

  • sender: The PID of the actor initiating the schedule.

Returns:

  • ScheduleOption: An option that can be passed to the scheduler.

type ScheduleOptionFunc

type ScheduleOptionFunc func(*scheduleConfig)

ScheduleOptionFunc is a function type used to configure a scheduleConfig instance. It implements the ScheduleOption interface by applying modifications to scheduleConfig.

func (ScheduleOptionFunc) Apply

func (f ScheduleOptionFunc) Apply(c *scheduleConfig)

Apply applies the ScheduleOptionFunc to the given scheduleConfig instance, modifying its fields as defined within the function.

type ShutdownHook

type ShutdownHook interface {
	// Execute runs the shutdown logic for this hook.
	//
	// Parameters:
	//   - ctx:         The context for cancellation and deadlines.
	//   - actorSystem: The ActorSystem being shut down.
	//
	// Returns:
	//   - error: An error if the shutdown logic fails, or nil on success.
	Execute(ctx context.Context, actorSystem ActorSystem) error

	// Recovery returns the ShutdownHookRecovery configuration for this hook.
	//
	// This determines how failures are handled, including retry and recovery strategies.
	//
	// Returns:
	//   - *ShutdownHookRecovery: The recovery configuration for this hook.
	Recovery() *ShutdownHookRecovery
}

ShutdownHook defines the interface for a coordinated shutdown hook.

A ShutdownHook is executed during the shutdown process of an ActorSystem to perform cleanup, resource release, or other termination logic. Implementations should provide the Execute method to define the shutdown behavior, and Recovery to specify error handling and retry strategies.

Example:

type MyShutdownHook struct{}

func (h *MyShutdownHook) Execute(ctx context.Context, actorSystem ActorSystem) error {
    // custom shutdown logic
    return nil
}

func (h *MyShutdownHook) Recovery() *ShutdownHookRecovery {
    return NewShutdownHookRecovery(
        WithShutdownHookRetry(2, time.Second),
        WithShutdownHookRecoveryStrategy(ShouldRetryAndSkip),
    )
}

type ShutdownHookRecovery

type ShutdownHookRecovery struct {
	// contains filtered or unexported fields
}

ShutdownHookRecovery defines the configuration for handling failures during the execution of a ShutdownHook. It specifies the number of retries, the interval between retries, and the recovery strategy to use if a shutdown hook fails.

Use NewShutdownHookRecovery and RecoveryOption functions to construct and configure an instance.

func NewShutdownHookRecovery

func NewShutdownHookRecovery(opts ...RecoveryOption) *ShutdownHookRecovery

NewShutdownHookRecovery creates a new ShutdownHookRecovery with the provided options.

By default, it uses DefaultShutdownRecoveryMaxRetries, DefaultShutdownHookRecoveryRetryInterval, and the ShouldFail strategy. You can override these defaults using RecoveryOption functions.

Example:

recovery := NewShutdownHookRecovery(
    WithShutdownHookRetry(2, time.Second),
    WithShutdownHookRecoveryStrategy(ShouldRetryAndSkip),
)

func (*ShutdownHookRecovery) Retry

func (r *ShutdownHookRecovery) Retry() (int, time.Duration)

Retry returns the configured number of retries and the interval between retries for the ShutdownHookRecovery.

Returns:

  • int: The number of retry attempts.
  • time.Duration: The duration to wait between retries.

func (*ShutdownHookRecovery) Strategy

func (r *ShutdownHookRecovery) Strategy() RecoveryStrategy

Strategy returns the RecoveryStrategy configured for the ShutdownHookRecovery.

Returns:

  • RecoveryStrategy: The strategy to use when a shutdown hook fails.

type SpawnOption

type SpawnOption interface {
	// Apply sets the option value on the provided spawnConfig.
	Apply(config *spawnConfig)
}

SpawnOption defines the interface for configuring actor spawn behavior.

Implementations of this interface can be passed to actor spawning functions to customize aspects such as mailbox, supervisor, passivation, dependencies, etc.

func WithDataCenter

func WithDataCenter(dataCenter *datacenter.DataCenter) SpawnOption

WithDataCenter returns a SpawnOption that sets the datacenter to spawn the actor in.

This option is only used when using the SpawnOn function with a datacenter-aware control plane.

Parameters:

  • dataCenter: The datacenter to spawn the actor in.

Returns:

  • SpawnOption that sets the datacenter in the spawn configuration.

func WithDependencies

func WithDependencies(dependencies ...extension.Dependency) SpawnOption

WithDependencies returns a SpawnOption that injects the given dependencies into the actor during its initialization.

This function allows you to configure an actor with one or more dependencies, such as services, clients, or configuration objects it needs to function. These dependencies will be made available to the actor when it is spawned, enabling better modularity and testability.

Parameters:

  • dependencies: Variadic list of objects implementing the Dependency interface.

Returns:

  • SpawnOption that sets the actor's dependencies in the spawn configuration.

func WithHostAndPort

func WithHostAndPort(host string, port int) SpawnOption

WithHostAndPort returns a SpawnOption that sets the host and port to spawn the actor on.

This option is only used when using the SpawnOn function with a datacenter-aware control plane. It will be used when remoting is enabled and the actor type must be registered on the remote node.

Parameters:

  • host: The host to spawn the actor on.
  • port: The port to spawn the actor on.

Returns:

  • SpawnOption that sets the host and port in the spawn configuration.

func WithLongLived

func WithLongLived() SpawnOption

WithLongLived returns a SpawnOption that ensures the given actor, once created, will persist for the entire lifespan of the running actor system. Unlike short-lived actors that may be restarted or garbage-collected, a long-lived actor remains active until the actor system itself shuts down.

Returns:

  • SpawnOption that disables passivation for the actor.

func WithMailbox

func WithMailbox(mailbox Mailbox) SpawnOption

WithMailbox returns a SpawnOption that sets the mailbox to use when starting the given actor.

Use this option to specify a custom mailbox implementation, such as a priority mailbox. Care should be taken to ensure the mailbox is compatible with the actor's message handling logic.

Parameters:

  • mailbox: The Mailbox implementation to use.

Returns:

  • SpawnOption that sets the mailbox in the spawn configuration.

func WithPassivateAfter deprecated

func WithPassivateAfter(after time.Duration) SpawnOption

WithPassivateAfter returns a SpawnOption that sets a custom duration after which an idle actor will be passivated. Passivation allows the actor system to free up resources by stopping actors that have been inactive for the specified duration. If the actor receives a message before this timeout, the passivation timer is reset.

Deprecated: Use WithPassivationStrategy with TimeBasedStrategy instead.

Parameters:

  • after: Duration of inactivity before passivation.

Returns:

  • SpawnOption that sets the passivation timeout.

func WithPassivationStrategy

func WithPassivationStrategy(strategy passivation.Strategy) SpawnOption

WithPassivationStrategy returns a SpawnOption that sets the passivation strategy to be used when spawning an actor.

This option allows you to define how and when the actor should be passivated, which can help manage resource usage and actor lifecycle in a more controlled manner. The following strategies are available:

  • TimeBasedStrategy: Passivates the actor after a specified duration of inactivity.
  • LongLivedStrategy: Ensures the actor remains active for the entire lifespan of the actor system, effectively preventing passivation.
  • MessagesCountStrategy: Passivates the actor after processing a specified number of messages.

Parameters:

  • strategy: passivation.Strategy to apply.

Returns:

  • SpawnOption that sets the passivation strategy in the spawn configuration.

func WithPlacement

func WithPlacement(strategy SpawnPlacement) SpawnOption

WithPlacement returns a SpawnOption that sets the placement strategy to be used when spawning an actor in cluster mode via the SpawnOn function.

This option determines how the actor system selects a target node for spawning the actor across the cluster. Valid strategies include RoundRobin, Random, and Local.

⚠️ Note: This option only has an effect when used with SpawnOn in a cluster-enabled actor system. If cluster mode is disabled, the placement strategy is ignored and the actor will be spawned locally.

Example:

err := system.SpawnOn(ctx, "analytics-worker", NewWorkerActor(),
    WithPlacement(RoundRobin))

Parameters:

  • strategy: A SpawnPlacement value specifying how to distribute the actor.

Returns:

  • SpawnOption that sets the placement strategy in the spawn configuration.

func WithReentrancy

func WithReentrancy(reentrancy *reentrancy.Reentrancy) SpawnOption

WithReentrancy enables async requests for an actor and sets the default policy.

Request/RequestName are disabled unless reentrancy is configured on the actor. Per-call overrides are available via WithReentrancyMode.

Design decision: reentrancy is opt-in to preserve backward compatibility for existing actors. Enabling it makes Request/RequestName available; per-call overrides via WithReentrancyMode can tighten or relax behavior for a single request.

Production note: prefer reentrancy.AllowAll for throughput and to avoid deadlocks in call cycles (A -> B -> A). Use reentrancy.StashNonReentrant only when strict message ordering is required, and always set Request timeouts to prevent indefinite stashing if a dependency slows or fails.

func WithRelocationDisabled

func WithRelocationDisabled() SpawnOption

WithRelocationDisabled returns a SpawnOption that prevents the actor from being relocated to another node when cluster mode is active and its host node shuts down unexpectedly. By default, actors are relocatable to support system resilience and maintain high availability by automatically redeploying them on healthy nodes.

Use this option when you need strict control over an actor's lifecycle and want to ensure that the actor is not redeployed after a node failure, such as for actors with node-specific state or dependencies that cannot be easily replicated.

Returns:

  • SpawnOption that disables relocation for the actor.

func WithRole

func WithRole(role string) SpawnOption

WithRole records a required node role for actor placement.

In cluster mode, peers advertise roles via ClusterConfig.WithRoles (e.g. "projection", "payments", "api"). When used with SpawnOn in a cluster-enabled system, the actor will only be placed on nodes that advertise the same role. If multiple nodes match, the placement strategy (RoundRobin, Random, etc.) is applied among those nodes. If clustering is disabled, this option is ignored and the actor is spawned locally.

If no node with the required role exists, spawning returns an error. This prevents accidental placement on unsuitable nodes and protects actors that depend on role-specific services or colocation.

Tip: omit WithRole to allow placement on any node (or ensure all nodes advertise the role if you want it universal).

Example:

pid, err := system.SpawnOn(ctx, "payment-saga", NewPaymentSaga(), WithRole("payments"))
if err != nil {
    return err
}

Parameters:

role — label a node must advertise (e.g. "projection", "payments").

Returns:

  • SpawnOption that sets the role in the spawn configuration.

func WithStashing

func WithStashing() SpawnOption

WithStashing returns a SpawnOption that enables stashing and sets the stash buffer for the actor, allowing it to temporarily store incoming messages that cannot be immediately processed.

This is particularly useful in scenarios where the actor must delay handling certain messages—for example, during initialization, while awaiting external resources, or transitioning between states.

By stashing messages, the actor can defer processing until it enters a stable or ready state, at which point the buffered messages can be retrieved and handled in a controlled sequence. This helps maintain a clean and predictable message flow without dropping or prematurely processing input.

⚠️ Note: The stash buffer is *not* a substitute for robust message handling or proper supervision strategies. Misuse may lead to unbounded memory growth if messages are stashed but never unstashed. Always ensure the actor eventually processes or discards stashed messages to avoid leaks or state inconsistencies.

Returns:

  • SpawnOption that enables stashing for the actor.

func WithSupervisor

func WithSupervisor(supervisor *supervisor.Supervisor) SpawnOption

WithSupervisor returns a SpawnOption that sets the supervisor strategy to apply when the actor fails or panics during message processing. The specified supervisor determines how failures are handled, such as restarting, stopping, or resuming the actor.

Parameters:

  • supervisor: Pointer to a Supervisor defining the failure-handling policy.

Returns:

  • SpawnOption that sets the supervisor in the spawn configuration.

type SpawnPlacement

type SpawnPlacement int

SpawnPlacement defines the algorithm used by the actor system to determine where an actor should be spawned in a clustered environment.

This strategy is only relevant when cluster mode is enabled. It affects how actors are distributed across the nodes in the cluster.

const (
	// RoundRobin distributes actors evenly across nodes
	// by cycling through the available nodes in a round-robin manner.
	// This strategy provides balanced load distribution over time.
	// ⚠️ Note: This strategy is subject to the cluster topology at the time of creation. For a stable cluster topology,
	// it ensures an even distribution of actors across all nodes.
	RoundRobin SpawnPlacement = iota

	// Random selects a node at random from the available pool of nodes.
	// This strategy is stateless and can help quickly spread actors across the cluster,
	// but may result in uneven load distribution.
	Random

	// Local forces the actor to be spawned on the local node,
	// regardless of the cluster configuration.
	// Useful when locality is important (e.g., accessing local resources).
	Local

	// LeastLoad selects the node with the least current load to spawn the actor.
	// This strategy aims to optimize resource utilization by placing actors
	// on nodes that are less busy, potentially improving performance and responsiveness.
	// Note: This strategy may require additional overhead when placing actors,
	// as it needs to get nodes load metrics depending on the cluster size.
	LeastLoad
)

type StatusFailure

type StatusFailure struct {
	// contains filtered or unexported fields
}

StatusFailure is used to indicate a failure status with an error message

func NewStatusFailure

func NewStatusFailure(err string, message any) *StatusFailure

NewStatusFailure creates a new StatusFailure with the given error message and optional original message.

func (*StatusFailure) Error

func (s *StatusFailure) Error() string

Error returns the error message.

func (*StatusFailure) Message

func (s *StatusFailure) Message() any

Message returns the original message that caused the failure.

type Subscribe

type Subscribe struct {
	// contains filtered or unexported fields
}

Subscribe is used to subscribe to a topic by an actor. The actor will receive an acknowledgement message when the subscription is successful.

func NewSubscribe

func NewSubscribe(topic string) *Subscribe

NewSubscribe creates a new Subscribe message for the given topic.

func (*Subscribe) Topic

func (s *Subscribe) Topic() string

Topic returns the topic to subscribe to.

type SubscribeAck

type SubscribeAck struct {
	// contains filtered or unexported fields
}

SubscribeAck is used to acknowledge a successful subscription to a topic by an actor.

func NewSubscribeAck

func NewSubscribeAck(topic string) *SubscribeAck

NewSubscribeAck creates a new SubscribeAck message for the given topic.

func (*SubscribeAck) Topic

func (s *SubscribeAck) Topic() string

Topic returns the topic that was subscribed to.

type Terminated

type Terminated struct {
	// contains filtered or unexported fields
}

Terminated is a lifecycle notification message sent to all actors that are watching a given actor when it has stopped or been terminated.

This message allows supervising or dependent actors to react to the shutdown of the actor they were observing—for example, by cleaning up resources, restarting the actor, or triggering failover behavior.

func NewTerminated

func NewTerminated(address string) *Terminated

NewTerminated creates a new Terminated message stamped with the current UTC time.

func (*Terminated) Address

func (t *Terminated) Address() string

Address returns the address of the terminated actor.

func (*Terminated) TerminatedAt

func (t *Terminated) TerminatedAt() time.Time

TerminatedAt returns the time the actor was terminated.

type UnboundedFairMailbox

type UnboundedFairMailbox struct {
	// contains filtered or unexported fields
}

UnboundedFairMailbox is an unbounded, multi-producer/single-consumer mailbox that enforces fairness across independent senders by giving each sender its own private sub-queue. Messages coming from the same sender remain FIFO-ordered, while the actor drains sub-queues in round-robin order so that a chatty sender cannot starve quieter peers. This makes the mailbox well-suited for multi- tenant actors, protocol handlers that serve many clients, or any actor that must provide predictable latency guarantees under bursty workloads.

Compared to the default mailbox (a single global FIFO), UnboundedFairMailbox trades a small amount of bookkeeping overhead for stronger isolation. It is conceptually similar to "a mailbox where each sender has its sub-queue": the distinction is that UnboundedFairMailbox manages the activation lifecycle of those sub-queues automatically, re-queuing only the senders that still have pending work. Choose it whenever fairness is more important than the absolute throughput of a single hot sender. For latency-critical applications that prioritize raw throughput, prefer the lighter-weight UnboundedMailbox.

func NewUnboundedFairMailbox

func NewUnboundedFairMailbox() *UnboundedFairMailbox

NewUnboundedFairMailbox creates a new UnboundedFairMailbox instance with all bookkeeping structures initialised and ready for concurrent producers. The mailbox is unbounded, so memory consumption grows with the number of messages and active senders. Prefer this constructor when an actor processes requests from many independent peers (local or remote) and must deliver each peer's messages in order without letting any one of them monopolise the processing pipeline.

func (*UnboundedFairMailbox) Dequeue

func (m *UnboundedFairMailbox) Dequeue() (msg *ReceiveContext)

Dequeue fetches one message from the next active sender in round‑robin order.

Semantics

  • Returns nil when there is currently no active sender.
  • If a sender remains non‑empty after a pop, it is re‑enqueued to the tail of the active‑senders queue; otherwise it is marked inactive.

Single consumer - Must be called by exactly one goroutine (the actor’s receiver loop).

func (*UnboundedFairMailbox) Dispose

func (m *UnboundedFairMailbox) Dispose()

Dispose releases resources held by the mailbox. This implementation does not spawn background goroutines, so Dispose is a no‑op; messages already enqueued will remain until dequeued or garbage‑collected.

func (*UnboundedFairMailbox) Enqueue

func (m *UnboundedFairMailbox) Enqueue(msg *ReceiveContext) error

Enqueue pushes a message into the mailbox.

Semantics

  • Per‑sender FIFO: Messages from the same sender are delivered in order.
  • Activation: The first message into an empty sub‑queue marks the sender active and enqueues it into the active‑senders queue for round‑robin service.

Concurrency

  • Safe for concurrent producers (many senders). Each sub‑queue is internally synchronized, and the active‑senders queue is an MPSC structure.

func (*UnboundedFairMailbox) IsEmpty

func (m *UnboundedFairMailbox) IsEmpty() bool

IsEmpty reports whether the mailbox currently has no messages.

This is an O(1) snapshot based on an atomic counter and is best‑effort under concurrency. It is intended for observability and fast checks, not for hard synchronization.

func (*UnboundedFairMailbox) Len

func (m *UnboundedFairMailbox) Len() int64

Len returns an approximate number of messages across all sub‑queues.

The value is maintained as an atomic counter and may be approximate under concurrency. Use for metrics/observability rather than coordination.

type UnboundedMailbox

type UnboundedMailbox struct {
	// contains filtered or unexported fields
}

UnboundedMailbox is a lock-free multi-producer, single-consumer (MPSC) FIFO queue used as the actor mailbox.

It is safe for many producer goroutines to call Enqueue concurrently, while exactly one consumer goroutine calls Dequeue. Ordering is preserved (FIFO) with respect to overall arrival order. Operations are non-blocking and rely on atomic pointer updates; underlying nodes are pooled via sync.Pool to reduce allocations.

The mailbox is unbounded: if producers outpace the consumer, memory usage can grow without limit. Consider higher-level backpressure if needed.

The zero value of UnboundedMailbox is not ready for use; always construct via NewUnboundedMailbox.

Reference: https://concurrencyfreaks.blogspot.com/2014/04/multi-producer-single-consumer-queue.html

func NewUnboundedMailbox

func NewUnboundedMailbox() *UnboundedMailbox

NewUnboundedMailbox returns a new, initialized UnboundedMailbox.

The returned mailbox supports concurrent producers and a single consumer. Always use this constructor; the zero value is not usable.

func (*UnboundedMailbox) Dequeue

func (m *UnboundedMailbox) Dequeue() *ReceiveContext

Dequeue removes and returns the next message at the head of the mailbox.

It returns nil if the mailbox is empty. Dequeue must be called by exactly one consumer goroutine; concurrent calls to Dequeue are not supported. The call is non-blocking and suitable for use inside an actor/event loop.

func (*UnboundedMailbox) Dispose

func (m *UnboundedMailbox) Dispose()

Dispose implements the Mailbox interface.

For UnboundedMailbox this is a no-op provided for interface compliance. It does not free resources beyond what the garbage collector and internal pools already manage, and it does not affect in-flight producers/consumer.

func (*UnboundedMailbox) Enqueue

func (m *UnboundedMailbox) Enqueue(value *ReceiveContext) error

Enqueue appends the given ReceiveContext to the tail of the mailbox.

It is safe to call Enqueue concurrently from multiple goroutines. The call is non-blocking and preserves FIFO ordering. The method currently always returns nil; the error is present to satisfy the Mailbox interface.

func (*UnboundedMailbox) IsEmpty

func (m *UnboundedMailbox) IsEmpty() bool

IsEmpty reports whether the mailbox currently holds no messages.

The result is a snapshot that may become stale immediately in the presence of concurrent producers. It is O(1) and safe to call from the consumer.

func (*UnboundedMailbox) Len

func (m *UnboundedMailbox) Len() int64

Len returns an approximate number of messages currently in the mailbox.

This performs an O(n) traversal and may race with concurrent producers, making the result a point-in-time estimate. Avoid calling Len in hot paths; prefer external accounting if frequent checks are required.

type UnboundedPriorityMailBox

type UnboundedPriorityMailBox struct {
	// contains filtered or unexported fields
}

UnboundedPriorityMailBox is a Priority Queue (FIFO) It implements a binary heap (using the standard library container/heap)

func NewUnboundedPriorityMailBox

func NewUnboundedPriorityMailBox(priorityFunc PriorityFunc) *UnboundedPriorityMailBox

NewUnboundedPriorityMailBox creates an instance of UnboundedPriorityMailBox

func (*UnboundedPriorityMailBox) Dequeue

func (q *UnboundedPriorityMailBox) Dequeue() (msg *ReceiveContext)

Dequeue takes the mail from the mailbox based upon the priority function defined when initializing the mailbox

func (*UnboundedPriorityMailBox) Dispose

func (q *UnboundedPriorityMailBox) Dispose()

Dispose will dispose of this queue and free any blocked threads in the Enqueue and/or Dequeue methods.

func (*UnboundedPriorityMailBox) Enqueue

Enqueue places the given value in the mailbox The given message must be a priority message otherwise an error will be returned

func (*UnboundedPriorityMailBox) IsEmpty

func (q *UnboundedPriorityMailBox) IsEmpty() bool

IsEmpty returns true when the mailbox is empty

func (*UnboundedPriorityMailBox) Len

Len returns mailbox length

type UnboundedSegmentedMailbox

type UnboundedSegmentedMailbox struct {
	// contains filtered or unexported fields
}

UnboundedSegmentedMailbox is an unbounded, lock‑free MPSC mailbox that stores messages in fixed‑size array segments connected in a singly linked list. It combines the cache locality of ring buffers with the growth characteristics of linked queues.

Concurrency model

  • MPSC: many producers can call Enqueue concurrently; exactly one consumer must call Dequeue.

Characteristics

  • Unbounded: capacity grows by allocating and linking new fixed‑size segments as needed.
  • FIFO: dequeue order matches enqueue order across all producers.
  • Hot‑path efficiency: producers reserve a slot by atomically incrementing a segment write index and store directly into a cache‑friendly array slot; the consumer reads sequentially via a dequeue index.
  • Low GC pressure: segments are pooled; steady‑state traffic typically performs zero allocations per message.
  • Observability: IsEmpty is O(1); Len is an approximate atomic counter (best‑effort under concurrency) and intended for metrics, not strict synchronization.

Use cases (in an actor framework)

  • Fan‑in actors: aggregators, reducers, routers’ routees, topic hubs and stream sinks that receive from many producers concurrently. The segment‑locality reduces cache misses compared to list nodes.
  • Ingestion/telemetry/logging actors: bursts of events followed by quick processing. Segment pooling amortizes allocation spikes and keeps the hot path mostly allocation‑free.
  • Scheduling/dispatch actors: timers, batchers, or background workers that consume quickly and benefit from contiguous array scans when draining.
  • Broker/bridge actors: gateways that translate external messages (NATS, gRPC streams, HTTP push) into local tells where short‑lived spikes are common.

Guidance

  • Prefer this mailbox for high‑throughput, CPU‑bound actors with bursty input and a single fast consumer. If you need strict backpressure or hard limits, choose a bounded mailbox (BoundedMailbox for blocking).
  • This mailbox is unbounded: pair it with upstream throttling, admission control, or supervision strategies to avoid unbounded memory growth when the consumer becomes slow.

func NewUnboundedSegmentedMailbox

func NewUnboundedSegmentedMailbox() *UnboundedSegmentedMailbox

NewUnboundedSegmentedMailbox creates and initializes a UnboundedSegmentedMailbox.

The mailbox starts with a single, pooled segment and grows by linking new segments as necessary. Choose this mailbox when you need an unbounded, fast MPSC queue with good cache locality and low allocation rates.

func (*UnboundedSegmentedMailbox) Dequeue

Dequeue removes and returns the next value at the head of the mailbox.

Semantics

  • Returns nil if the mailbox is empty.
  • Amortized O(1) for the single consumer: read from the current segment; when a segment is drained, advance to the next pooled segment.

Single‑consumer requirement

  • Must be called from exactly one goroutine. Multiple consumers are not supported and would violate internal invariants.

func (*UnboundedSegmentedMailbox) Dispose

func (m *UnboundedSegmentedMailbox) Dispose()

Dispose releases resources, if any. This mailbox does not spawn background goroutines; Dispose is currently a no‑op. Messages already enqueued remain in the queue until dequeued or garbage‑collected along with their segments.

func (*UnboundedSegmentedMailbox) Enqueue

func (m *UnboundedSegmentedMailbox) Enqueue(value *ReceiveContext) error

Enqueue places the given value in the mailbox.

Semantics

  • Never blocks; always returns nil.
  • Amortized O(1): usually a single atomic increment + a store into the current segment; when a segment fills, one producer links the next segment.

Concurrency & ordering

  • Safe for concurrent producers; each reserves a unique slot via atomic increment of the tail segment write index.
  • FIFO order is preserved across segment boundaries.

func (*UnboundedSegmentedMailbox) IsEmpty

func (m *UnboundedSegmentedMailbox) IsEmpty() bool

IsEmpty reports whether the mailbox currently has no messages.

It is an O(1) snapshot check. Under concurrency it is best‑effort and may briefly lag producers.

func (*UnboundedSegmentedMailbox) Len

Len returns a best‑effort snapshot of the mailbox length.

The value is maintained as an atomic counter for observability and may be approximate under concurrency. Do not use it for flow‑control decisions where exactness is required.

type Unsubscribe

type Unsubscribe struct {
	// contains filtered or unexported fields
}

Unsubscribe is used to unsubscribe from a topic by an actor. The actor will receive an acknowledgement message when the unsubscription is successful.

func NewUnsubscribe

func NewUnsubscribe(topic string) *Unsubscribe

NewUnsubscribe creates a new Unsubscribe message for the given topic.

func (*Unsubscribe) Topic

func (u *Unsubscribe) Topic() string

Topic returns the topic to unsubscribe from.

type UnsubscribeAck

type UnsubscribeAck struct {
	// contains filtered or unexported fields
}

UnsubscribeAck is used to acknowledge a successful unsubscription from a topic by an actor.

func NewUnsubscribeAck

func NewUnsubscribeAck(topic string) *UnsubscribeAck

NewUnsubscribeAck creates a new UnsubscribeAck message for the given topic.

func (*UnsubscribeAck) Topic

func (u *UnsubscribeAck) Topic() string

Topic returns the topic that was unsubscribed from.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL