graph

package
v0.0.0-...-7871f83 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 23, 2025 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Overview

Package graph provides structured errors for the graph package.

Package graph provides a simplified API for building executable workflows.

Index

Constants

View Source
const (
	// DefaultMaxWorkers is the default number of parallel workers for executing
	// graph nodes in the Pregel runtime. This controls concurrency of node execution
	// within each superstep.
	//
	// Why 4? Matches typical CPU core count for development machines. Production
	// deployments should tune this based on workload characteristics:
	//   - CPU-bound tasks: set to runtime.NumCPU()
	//   - I/O-bound tasks (API calls): can be 10-100x higher
	//   - Memory-constrained: reduce based on per-node memory footprint
	DefaultMaxWorkers = 4

	// DefaultMaxSteps is the default maximum number of supersteps before the
	// graph execution terminates. This prevents infinite loops in cyclic graphs
	// or runaway agent behavior.
	//
	// Why 100? Sufficient for most agent workflows:
	//   - Simple ReAct agents: 5-20 steps typical
	//   - Complex multi-agent: 20-50 steps typical
	//   - Research/exploration: may need 100+ (use WithMaxSteps to override)
	// If exceeded, execution stops with ErrMaxIterationsExceeded.
	DefaultMaxSteps = 100

	// DefaultCheckpointInterval is the default interval for saving checkpoints
	// during graph execution. Value of 1 means checkpoint after every superstep.
	//
	// Why 1? Ensures maximum recoverability at the cost of checkpoint overhead.
	// For long-running graphs with expensive checkpointing, increase via
	// graph.WithCheckpointInterval() to reduce I/O overhead.
	DefaultCheckpointInterval = 1

	// DefaultResultChanSize buffers outputs to prevent backpressure when the yield
	// consumer is slower than the producer. This provides smoother execution flow
	// without blocking nodes. Typical agents produce <10 results/superstep.
	//
	// Why 100? Sized for ~10 supersteps worth of buffering (10 results/step * 10 steps).
	// Large enough to prevent blocking during brief consumer slowdowns, small enough
	// to avoid excessive memory usage (~8KB for typical output types).
	DefaultResultChanSize = 100
)
View Source
const (
	// DefaultRetryMaxAttempts is the default maximum number of retry attempts
	// for node execution. After this many failures, the error propagates.
	//
	// Why 3? Industry standard for transient failures (network timeouts, rate limits).
	// First attempt + 2 retries = 3 total attempts, sufficient for most transient issues.
	DefaultRetryMaxAttempts = 3

	// DefaultRetryDelay is the default base delay between retry attempts.
	// Combined with exponential backoff (2.0 multiplier), delays are:
	// 100ms -> 200ms -> 400ms -> ... (capped at DefaultRetryMaxDelay)
	DefaultRetryDelay = 100 * time.Millisecond

	// DefaultRetryMaxDelay is the maximum delay between retry attempts.
	// Prevents excessive wait times during extended outages.
	//
	// Why 5s? Balances patience for recovery with user experience.
	// Longer delays rarely help—if service is down >5s, manual intervention needed.
	DefaultRetryMaxDelay = 5 * time.Second

	// DefaultRetryMultiplier is the exponential backoff multiplier.
	// Each retry delay = previous delay * multiplier (until MaxDelay).
	//
	// Why 2.0? Standard exponential backoff. Quickly backs off to reduce
	// load on failing services while not being overly aggressive.
	DefaultRetryMultiplier = 2.0
)
View Source
const ApprovalsKey = "__approvals__"

ApprovalsKey is the reserved state key for storing node approvals.

View Source
const END = "__end__"

END is the terminal node constant.

View Source
const MessagesKeyName = "messages"

MessagesKeyName is the string name used for the messages state key. Use this constant when initializing state maps in tests or when accessing the raw state map.

Variables

View Source
var (
	ErrNoEntryPoint  = errors.New("graph: no entry point defined")
	ErrNodeNotFound  = errors.New("graph: node not found")
	ErrDuplicateNode = errors.New("graph: duplicate node name")
	ErrDuplicateKey  = errors.New("graph: duplicate key")
	ErrInvalidTarget = errors.New("graph: invalid target node")
)

Sentinel errors for graph validation.

View Source
var ErrEmptySequence = errors.New("graph: empty sequence")

ErrEmptySequence is returned when trying to get the last element of an empty sequence.

View Source
var ErrNamespaceViolation = fmt.Errorf("graph: namespace violation")

ErrNamespaceViolation is returned when a node attempts to access or update keys outside its namespace.

Functions

func Collect

func Collect[T any](seq iter.Seq2[T, error]) ([]T, error)

Collect gathers all values from an iterator sequence into a slice. Returns an error if the sequence produces an error.

Example:

results, err := graph.Collect(g.Run(ctx, input))
if err != nil {
    return fmt.Errorf("execution failed: %w", err)
}

func Get

func Get[T any](scope ReadOnlyScope, key Key[T]) T

Get returns the typed value for a key from the scope. If no value exists, returns the reducer's zero value.

func GetList

func GetList[T any](scope ReadOnlyScope, key Key[[]T]) []T

GetList returns the typed slice for a list key from the scope. If no value exists, returns nil.

func GetManaged

func GetManaged[T any](ctx context.Context, scope ReadOnlyScope, mv ManagedValueAccessor[T]) T

GetManaged retrieves a managed value from the view with type safety. The managed value must have been passed via WithManagedValues.

This follows the same pattern as Get(view, key) for regular state.

Example:

var configMV = graph.NewManagedValue("config", defaultConfig)

func myNode(ctx context.Context, view graph.ReadOnlyScope) (*graph.Command, error) {
    config := graph.GetManaged(ctx, view, configMV)
    // use config...
    return graph.Set(resultKey, result).End()
}

func GetMessages

func GetMessages(scope ReadOnlyScope) []message.Message

GetMessages retrieves the message history from a ReadOnlyScope. This is a convenience function - you can also use scope.Messages() directly.

func Last

func Last[T any](seq iter.Seq2[T, error]) (T, error)

Last returns the last value from an iterator sequence. Returns an error if the sequence produces an error or is empty.

ERROR HANDLING:

  • Returns immediately when iterator yields any error (err != nil)
  • Use this for blocking/non-streaming execution

Example:

lastMsg, err := graph.Last(g.Run(ctx, input))
if err != nil {
    return fmt.Errorf("execution failed: %w", err)
}

func LastMessage

func LastMessage(scope ReadOnlyScope) message.Message

LastMessage returns the last message from the history, or nil if empty.

func LastStructured

func LastStructured[T any, O fmt.Stringer](seq iter.Seq2[O, error]) (*T, error)

LastStructured extracts the last value from an iterator and unmarshals content into the specified type T. The output type O must implement fmt.Stringer. This is useful for extracting structured output from agent execution.

Example with agent:

type MovieReview struct {
    Title   string `json:"title"`
    Rating  int    `json:"rating"`
    Summary string `json:"summary"`
}

agent, _ := agent.NewReAct(model, agent.WithOutputSchema(outputSchema))
review, err := graph.LastStructured[MovieReview](agent.Run(ctx, messages))
if err != nil {
    return err
}
fmt.Printf("Rating: %d/5\n", review.Rating)

func ScopeGet

func ScopeGet[T any](scope Scope, key Key[T]) T

ScopeGet returns the typed value for a key from the scope. This is a convenience function that works with Scope.

func ScopeGetList

func ScopeGetList[T any](scope Scope, key Key[[]T]) []T

ScopeGetList returns the typed list for a slice key from the scope.

func SetValue

func SetValue[T any](key Key[T], value T) func(*CommandBuilder) *CommandBuilder

SetValue returns a type-safe update function for use with With(). The generic parameter T is inferred from Key[T], ensuring type safety.

Example:

cmd.With(graph.SetValue(statusKey, "done"))   // ✅ Type-safe
cmd.With(graph.SetValue(statusKey, 42))       // ❌ Compile error

func WithScope

func WithScope(ctx context.Context, scope Scope) context.Context

WithScope attaches a Scope to the context. This allows tools to access streaming capabilities.

Types

type AppendReducer

type AppendReducer[T any] struct{}

AppendReducer concatenates slices. This is the default reducer for list keys.

func (AppendReducer[T]) Reduce

func (AppendReducer[T]) Reduce(existing, incoming []T) []T

Reduce appends incoming slice to existing slice.

func (AppendReducer[T]) Zero

func (AppendReducer[T]) Zero() []T

Zero returns nil (empty slice).

type ApprovalDecision

type ApprovalDecision string

ApprovalDecision represents the approval outcome.

const (
	ApprovalApproved ApprovalDecision = "approved"
	ApprovalRejected ApprovalDecision = "rejected"
)

ApprovalDecision constants.

type ApprovalGuard

type ApprovalGuard func(ctx context.Context, scope ReadOnlyScope) (needsApproval bool, reason string, err error)

ApprovalGuard determines if approval is needed. Uses ReadOnlyScope (not Scope) since it doesn't need streaming access.

type ApprovalResponse

type ApprovalResponse struct {
	Decision    ApprovalDecision
	Reason      string
	User        string
	Timestamp   time.Time
	Edits       Updates
	Annotations map[string]any
}

ApprovalResponse represents a human approval decision.

type BSPState

type BSPState struct {
	// contains filtered or unexported fields
}

BSPState manages state with proper BSP semantics: - All reads within a superstep see the same snapshot (from previous superstep) - All writes are buffered and only become visible after barrier commit - This ensures deterministic parallel execution regardless of scheduling order

Optimizations: - Copy-on-write: readSnapshot only recreated when writes occur - Version tracking: avoid unnecessary snapshot copies - Reducer-based merging: uses registered reducers for state updates - Atomic version checking: skip locking in ReadView when possible

func NewBSPState

func NewBSPState(initial map[string]any, keyRegistry KeyRegistry) *BSPState

NewBSPState creates a new BSP-compliant state manager.

func (*BSPState) ApplyPendingWrites

func (s *BSPState) ApplyPendingWrites(pending []checkpoint.PendingWrite)

ApplyPendingWrites applies externally provided pending writes to committed state. Used when restoring from a checkpoint with Committed=false. The writes are applied directly to committed state (since the checkpoint was saved after node execution but before barrier commit).

func (*BSPState) CommitBarrier

func (s *BSPState) CommitBarrier()

CommitBarrier commits all buffered writes and creates a new read snapshot. This is called at the end of each superstep (barrier synchronization point). Optimized: only creates new snapshot if there were writes.

func (*BSPState) GetCommitted

func (s *BSPState) GetCommitted(key string) (any, bool)

GetCommitted returns a value from committed state. Used for extracting final output after execution completes.

func (*BSPState) HasPendingWrites

func (s *BSPState) HasPendingWrites() bool

HasPendingWrites returns true if there are uncommitted writes in the buffer.

func (*BSPState) PendingWrites

func (s *BSPState) PendingWrites() []checkpoint.PendingWrite

PendingWrites returns write buffer contents as checkpoint.PendingWrite slice. Used for two-phase commit checkpointing - captures writes before barrier commit.

func (*BSPState) ReadView

func (s *BSPState) ReadView() ReadOnlyScope

ReadView returns a ReadOnlyScope that reads from the current superstep's snapshot. This scope is safe for concurrent reads - it reads from immutable snapshot. Optimized: uses atomic pointer to cached view, avoiding lock acquisition when snapshot hasn't changed.

func (*BSPState) Snapshot

func (s *BSPState) Snapshot() map[string]any

Snapshot returns a copy of the committed state. Used for checkpointing and final result extraction.

func (*BSPState) Write

func (s *BSPState) Write(nodeName string, updates Updates)

Write buffers a state update for the current superstep. The update will only be visible after CommitBarrier is called. Thread-safe for concurrent writes from parallel nodes.

type BuildOption

type BuildOption func(*buildConfig)

BuildOption configures a Build call.

func WithStrictValidation

func WithStrictValidation() BuildOption

WithStrictValidation enables strict validation mode. This includes cycle detection and disconnected node detection.

func WithValidation

func WithValidation(opts ValidationOptions) BuildOption

WithValidation sets custom validation options.

func WithoutValidation

func WithoutValidation() BuildOption

WithoutValidation disables validation (use with caution). Only use this for trusted graphs or when validation overhead is unacceptable.

type Builder

type Builder struct {
	// contains filtered or unexported fields
}

Builder is a fluent workflow builder. Create with New(), add nodes, then Build() to get an executable Graph. Input is []Message (conversation history), output is Message (response).

func New

func New(keys ...StateKey) *Builder

New creates a graph builder with the given state keys. Keys are automatically registered. messagesKey is always included and used as the output key. Duplicate keys will cause Build() to fail.

By default, messagesKey is included. Additional keys can be provided.

func (*Builder) Build

func (b *Builder) Build(opts ...BuildOption) (*Graph, error)

Build compiles and validates the builder. Returns an executable Graph or an error.

func (*Builder) InterruptAfter

func (b *Builder) InterruptAfter(nodeName string, opts ...InterruptOption) *Builder

InterruptAfter adds an interrupt after the specified node. When execution completes this node, it will pause and yield an interrupt event.

func (*Builder) InterruptBefore

func (b *Builder) InterruptBefore(nodeName string, opts ...InterruptOption) *Builder

InterruptBefore adds an interrupt before the specified node. When execution reaches this node, it will pause and yield an interrupt event.

func (*Builder) Node

func (b *Builder) Node(name string, fn NodeFunc, targets ...string) *Builder

Node adds a node to the graph. Targets are the possible next nodes (use END for terminal).

func (*Builder) Start

func (b *Builder) Start(names ...string) *Builder

Start sets the entry point node(s). Multiple entry points run in parallel.

func (*Builder) Validate

func (b *Builder) Validate(opts ...ValidationOptions) []ValidationError

Validate performs validation on the graph and returns any errors found. This is useful for more detailed error reporting than Build() provides.

func (*Builder) WithCheckpointer

func (b *Builder) WithCheckpointer(cp checkpoint.Checkpointer, runID string) *Builder

WithCheckpointer sets the checkpointer and run ID.

func (*Builder) WithExecutor

func (b *Builder) WithExecutor(exec Executor) *Builder

WithExecutor sets a custom executor.

func (*Builder) WithNodeMiddleware

func (b *Builder) WithNodeMiddleware(mw ...NodeMiddleware) *Builder

WithNodeMiddleware adds node-level middleware to the builder. Node middleware wraps each node execution and runs for every node. For middleware that should wrap the entire Run/Resume operation, use WithRunMiddleware.

func (*Builder) WithRunMiddleware

func (b *Builder) WithRunMiddleware(mw ...RunMiddleware) *Builder

WithRunMiddleware adds run-level middleware to the builder. Run middleware wraps the entire Run/Resume operation, intercepting:

  • Input before execution starts
  • Output after execution completes

This is useful for:

  • Input validation/guardrails (check user input once at start)
  • Output validation/guardrails (check final output once at end)
  • Logging/observability at the run level
  • Request/response transformation

Middleware is applied in order: first added = outermost wrapper.

func (*Builder) WithStore

func (b *Builder) WithStore(store Store) *Builder

WithStore sets a custom state store.

type CheckpointConfig

type CheckpointConfig struct {
	// Checkpointer handles state persistence.
	Checkpointer checkpoint.Checkpointer

	// RunID identifies this execution run for checkpointing.
	RunID string
}

CheckpointConfig holds checkpointing configuration. This enables state persistence, fault tolerance, and resume capabilities.

type CheckpointStateError

type CheckpointStateError struct {
	UnknownKeys []string // Keys in checkpoint that are not registered in the graph
}

CheckpointStateError indicates that a checkpoint contains invalid state.

func (*CheckpointStateError) Error

func (e *CheckpointStateError) Error() string

type Command

type Command struct {
	Updates Updates  // State changes
	Next    []string // Next nodes to execute (or END)
}

Command is what a node returns: state updates and next targets.

DESIGN NOTE - High-Level Workflow Control: Command is a graph-layer abstraction that combines state updates (Updates) with routing decisions (Next). It provides workflow control semantics with BSP state management guarantees.

KEY CHARACTERISTICS:

  • Specifies both state updates AND routing targets
  • Graph-specific semantics (not generic like pregel.Message)
  • Always uses Updates (map[string]any) for state changes
  • Converted to pregel.Message[Updates] by executor adapter

RELATIONSHIP TO PREGEL LAYER: The executor adapter (pregelVertexAdapter.Run in executor.go) converts Command into pregel.Message[Updates] for BSP execution. The Command.Updates become Message.Data, and Command.Next becomes the Message.To field.

Create with:

  • To("next") - just routing
  • Set(key, val).To("next") - with updates
  • Fail(err) - error

Example - simple routing:

return graph.To("next")
return graph.To(graph.END)

Example - with one update:

return graph.Set(StatusKey, "done").To("next")

Example - with multiple updates:

return graph.Set(Key1, val1).
    With(graph.SetValue(Key2, val2)).
    To("next")

Example - conditional:

if done {
    return graph.To(graph.END)
}
return graph.Set(CountKey, count+1).To("process")

Example - error:

if err != nil {
    return graph.Fail(err)
}

func Fail

func Fail(err error) (*Command, error)

Fail returns an error without a command.

Example:

if err != nil {
    return graph.Fail(err)
}

func To

func To(targets ...string) (*Command, error)

To creates a Command that routes to the specified targets without updates.

Example:

return graph.To("next")
return graph.To(graph.END)
return graph.To("a", "b")  // parallel

type CommandBuilder

type CommandBuilder struct {
	// contains filtered or unexported fields
}

CommandBuilder accumulates updates before creating a Command.

func Cmd

func Cmd() *CommandBuilder

Cmd creates an empty CommandBuilder for incremental building. Use this when you need to build commands conditionally.

Example:

cmd := graph.Cmd()
if turn >= maxTurns {
    return cmd.To(graph.END)
}

resp, err := model.Generate(ctx, req)
if err != nil {
    return graph.Fail(err)
}

cmd.With(graph.SetValue(TurnKey, turn+1))
return graph.Reply(resp.Message).To("next")

func Reply

func Reply(msg message.Message) *CommandBuilder

Reply creates a CommandBuilder that appends a message to the conversation history. This is a convenience function for the most common agent operation: adding a response.

Example:

// Simple reply and end
return graph.Reply(graph.NewAIMessageFromText("Hello!")).End()

// Reply and continue to next node
return graph.Reply(aiMessage).To("tools")

// Reply with additional state updates
return graph.Reply(aiMessage).With(graph.SetValue(turnKey, turn+1)).To("next")

func ReplyAll

func ReplyAll(msgs ...message.Message) *CommandBuilder

ReplyAll creates a CommandBuilder that appends multiple messages to the conversation history. Useful when you need to add multiple messages at once (e.g., tool results).

Example:

toolResults := []graph.Message{result1, result2}
return graph.ReplyAll(toolResults...).To("agent")

func Set

func Set[T any](key Key[T], value T) *CommandBuilder

Set creates a CommandBuilder with a typed value (starter function). The generic parameter T is inferred from Key[T], ensuring type safety.

For list keys with AppendReducer, pass a slice containing the new items. The reducer will merge them with the existing list.

Example:

graph.Set(statusKey, "done").To("next")           // ✅ Type-safe
graph.Set(myListKey, []Item{item}).To("next")     // ✅ Appends via reducer

func (*CommandBuilder) End

func (b *CommandBuilder) End() (*Command, error)

End is shorthand for To(END).

Example:

return graph.Set(key, val).End()

func (*CommandBuilder) To

func (b *CommandBuilder) To(targets ...string) (*Command, error)

To creates a Command with the accumulated updates and specified targets.

Example:

return graph.Set(key, val).To("next")

func (*CommandBuilder) With

With applies a type-safe update function to the builder. Use with SetValue, AppendValue for compile-time type checking.

Example:

cmd.With(graph.SetValue(statusKey, "done"))   // ✅ Type-safe
cmd.With(graph.SetValue(statusKey, 42))       // ❌ Compile error

type DistributedBackend

type DistributedBackend interface {
	// Send delivers state updates to target nodes in a distributed execution.
	// This abstracts away the underlying message-passing mechanism.
	Send(ctx context.Context, updates []StateUpdate) error

	// Receive retrieves pending state updates for a specific node.
	// Returns nil if no updates are pending.
	Receive(ctx context.Context, node string) ([]StateUpdate, error)

	// Clear removes all pending updates for a node.
	Clear(ctx context.Context, node string) error

	// Close releases resources held by the backend.
	Close() error
}

DistributedBackend abstracts the underlying distributed execution mechanism. This interface hides Pregel implementation details from graph-layer users, allowing them to work with distributed execution without understanding supersteps, vertices, or message buses.

The graph package provides adapters to convert common backends (Redis, gRPC, etc.) into this interface, keeping the graph API clean and implementation-agnostic.

func NewPregelBackend

func NewPregelBackend(bus pregel.MessageBus[Updates]) DistributedBackend

NewPregelBackend creates a DistributedBackend from a Pregel MessageBus. This adapter allows existing Pregel message buses (Redis, in-memory, etc.) to work with the graph-layer API without exposing Pregel types to users.

type EdgeInfo

type EdgeInfo struct {
	From string `json:"from"`
	To   string `json:"to"`
}

EdgeInfo contains metadata about an edge in the graph.

type ExecutionConfig

type ExecutionConfig struct {
	// Nodes contains all graph nodes indexed by name.
	Nodes map[string]ExecutorNode

	// EntryPoints are the starting nodes for execution.
	EntryPoints []string

	// NodeMiddleware wraps node execution.
	// This runs for every node during graph execution.
	NodeMiddleware []NodeMiddleware

	// Store provides state storage.
	Store Store

	// KeyRegistry holds type-erased reducers for state merging.
	KeyRegistry KeyRegistry
}

ExecutionConfig holds node execution configuration. This includes the graph structure, entry points, middleware, and output settings. Types are fixed for Message-based agent workflows.

type Executor

type Executor interface {
	Run(ctx context.Context, cfg *ExecutorConfig, input []message.Message, opts ...runOption) iter.Seq2[message.Message, error]
}

Executor runs the graph.

type ExecutorConfig

type ExecutorConfig struct {
	// Execution contains node and execution settings.
	Execution ExecutionConfig

	// Checkpoint contains checkpointing settings.
	Checkpoint CheckpointConfig

	// Interrupt contains interrupt settings for human-in-the-loop workflows.
	Interrupt InterruptConfig
}

ExecutorConfig provides the executor with graph configuration. It composes focused configuration structs for better separation of concerns. Types are fixed for Message-based agent workflows.

type ExecutorNode

type ExecutorNode struct {
	Name    string
	Fn      NodeFunc
	Targets []string
}

ExecutorNode represents a node for the executor. Output type is fixed to Message.

type FirstReducer

type FirstReducer[T comparable] struct{}

FirstReducer keeps the earliest non-zero value.

func (FirstReducer[T]) Reduce

func (FirstReducer[T]) Reduce(existing, incoming T) T

Reduce returns existing if non-zero, otherwise incoming.

func (FirstReducer[T]) Zero

func (FirstReducer[T]) Zero() T

Zero returns the zero value of T.

type Graph

type Graph struct {
	// contains filtered or unexported fields
}

Graph is an executable workflow with immutable structure. Created by calling Build() on a Builder.

func (*Graph) GetMetrics

func (g *Graph) GetMetrics() *Metrics

GetMetrics returns static graph metrics.

func (*Graph) GetNodeInfo

func (g *Graph) GetNodeInfo(name string) (*NodeInfo, error)

GetNodeInfo returns detailed information about a specific node.

func (*Graph) GetNodes

func (g *Graph) GetNodes() []string

GetNodes returns a sorted list of all node names in the compiled graph.

func (*Graph) GetTopology

func (g *Graph) GetTopology() *Topology

GetTopology returns a comprehensive view of the compiled graph structure.

func (*Graph) MermaidFlowchart

func (g *Graph) MermaidFlowchart(direction string) string

MermaidFlowchart generates a Mermaid flowchart representation. Direction can be "TD" (top-down), "LR" (left-right), "BT", "RL".

func (*Graph) Resume

func (g *Graph) Resume(ctx context.Context, runID string, opts ...ResumeOption) iter.Seq2[message.Message, error]

Resume continues execution from a checkpoint without providing new input. This is the correct way to resume a paused graph - the checkpoint state is restored without being overwritten by a zero-value input.

Parameters:

  • runID: The run ID for checkpointing (required)
  • opts: Optional resume options for human-in-the-loop workflows

Example:

// Resume from the latest checkpoint (auto-restore)
for output, err := range compiled.Resume(ctx, runID) {
    // process output
}

// Resume from a specific checkpoint
savedCp, _ := checkpointer.Load(ctx, runID)
for output, err := range compiled.Resume(ctx, runID, graph.WithCheckpoint(savedCp)) {
    // process output
}

// Resume with human input (human-in-the-loop)
for output, err := range compiled.Resume(ctx, runID,
    graph.WithCheckpoint(savedCp),
    graph.WithResumeValue("wait_node", answerKey.Name(), "user input"),
) {
    // process output
}

func (*Graph) Run

func (g *Graph) Run(ctx context.Context, input []message.Message, opts ...RunOption) iter.Seq2[message.Message, error]

Run executes the compiled graph with input. For resuming from a checkpoint without providing new input, use [Resume] instead.

type InputMapper

type InputMapper func(ctx context.Context, scope ReadOnlyScope) ([]message.Message, error)

InputMapper maps parent graph state to subgraph input. Used with Subgraph to transform parent state into the input type expected by the child graph. Uses ReadOnlyScope (not Scope) since it's a read-only operation.

type InterruptConfig

type InterruptConfig struct {
	// Before maps node names to interrupt configs that trigger before node execution.
	Before map[string]*interruptConfig

	// After maps node names to interrupt configs that trigger after node execution.
	After map[string]*interruptConfig
}

InterruptConfig holds interrupt configuration for human-in-the-loop workflows. Interrupts pause execution to await human approval before or after specific nodes.

type InterruptError

type InterruptError struct {
	NodeName string
	Before   bool
}

InterruptError signals that execution has paused for approval.

func (*InterruptError) Error

func (e *InterruptError) Error() string

type InterruptOption

type InterruptOption func(*interruptConfig)

InterruptOption configures an interrupt.

func WithApprovalGuard

func WithApprovalGuard(guard ApprovalGuard) InterruptOption

WithApprovalGuard sets a guard function for the interrupt.

func WithFeedbackAnnotation

func WithFeedbackAnnotation(enabled bool) InterruptOption

WithFeedbackAnnotation enables recording approval in message history.

type Key

type Key[T any] struct {
	// contains filtered or unexported fields
}

Key defines a typed state channel with an associated reducer. The reducer determines how values are merged during state updates.

func NewCounterKey

func NewCounterKey(name string, opts ...KeyOption[int]) Key[int]

NewCounterKey creates a counter key with Sum semantics.

func NewKey

func NewKey[T any](name string, opts ...KeyOption[T]) Key[T]

NewKey creates a state key with Replace (overwrite) semantics by default.

func NewListKey

func NewListKey[T any](name string, opts ...KeyOption[[]T]) Key[[]T]

NewListKey creates a list state key with Append semantics by default. The key stores []T and appends incoming slices.

func NewMapKey

func NewMapKey[K comparable, V any](name string, opts ...KeyOption[map[K]V]) Key[map[K]V]

NewMapKey creates a map key with MergeMap semantics.

func (Key[T]) Name

func (k Key[T]) Name() string

Name returns the key name.

func (Key[T]) ReducerFunc

func (k Key[T]) ReducerFunc() ReducerFunc

ReducerFunc returns the type-erased reducer for runtime use.

func (Key[T]) Zero

func (k Key[T]) Zero() T

Zero returns the zero value for this key's type.

type KeyOption

type KeyOption[T any] func(*Key[T])

KeyOption configures a Key.

func WithReducer

func WithReducer[T any](r Reducer[T]) KeyOption[T]

WithReducer sets a custom reducer for the key.

type KeyRegistry

type KeyRegistry map[string]ReducerFunc

KeyRegistry holds type-erased reducers for all registered keys.

func NewKeyRegistry

func NewKeyRegistry() KeyRegistry

NewKeyRegistry creates a new empty KeyRegistry.

func (KeyRegistry) Register

func (r KeyRegistry) Register(name string, reducer ReducerFunc)

Register adds a reducer for a key name.

type LastReducer

type LastReducer[T any] = ReplaceReducer[T]

LastReducer is an alias for ReplaceReducer - keeps the most recent value.

type ManagedValue

type ManagedValue interface {
	// Name returns the unique identifier for this managed value.
	Name() string

	// Descriptor returns metadata stored in checkpoints for validation.
	Descriptor() checkpoint.ManagedValueDescriptor

	// Rehydrate refreshes the managed value after checkpoint restore. Default is
	// no-op. Implementations should be idempotent.
	Rehydrate(ctx context.Context) error
}

ManagedValue is the base interface for all managed values. This non-generic interface allows managed values to be passed to WithManagedValues.

Managed values represent ephemeral runtime state that is NOT included in checkpoints. Use managed values for:

  • Runtime configuration (API keys, timeouts, feature flags)
  • Session state (auth tokens, user sessions)
  • Metrics collectors (runtime statistics)
  • Resource handles (connections, caches)
  • Computed values (derived state)

Unlike regular state keys, managed values:

  • Are NOT persisted to checkpoints
  • Are lost on process restart
  • Must be reinitialized at runtime
  • Are perfect for ephemeral/sensitive data

type ManagedValueAccessor

type ManagedValueAccessor[T any] interface {
	ManagedValue

	// Get retrieves the current value.
	Get(ctx context.Context) (T, error)

	// Set updates the value (for mutable managed values).
	Set(ctx context.Context, value T) error
}

ManagedValueAccessor is the generic interface for type-safe access to managed values.

type ManagedValueError

type ManagedValueError struct {
	// MissingValues contains the names of the missing managed values.
	MissingValues []string
	// IsRequired indicates if these are required (vs checkpoint-related) values.
	IsRequired bool
}

ManagedValueError represents errors related to managed values. Use errors.As to extract the MissingValues field for programmatic handling.

func (*ManagedValueError) Error

func (e *ManagedValueError) Error() string

Error implements the error interface.

func (*ManagedValueError) Is

func (e *ManagedValueError) Is(target error) bool

Is enables comparison with sentinel errors.

type ManagedValueOption

type ManagedValueOption func(*managedValueConfig)

ManagedValueOption configures metadata or lifecycle hooks for a managed value.

func WithManagedValueRehydrator

func WithManagedValueRehydrator(fn func(context.Context) error) ManagedValueOption

WithManagedValueRehydrator registers a callback that rebuilds the managed value after checkpoint restores. The callback runs before execution resumes whenever the checkpoint lists this managed value. Use it to re-open network connections or refresh credentials.

func WithManagedValueRequired

func WithManagedValueRequired() ManagedValueOption

WithManagedValueRequired marks the managed value as required when resuming from checkpoints. Missing required values during restore will abort execution.

type ManagedValueProvider

type ManagedValueProvider[T any] struct {
	// contains filtered or unexported fields
}

ManagedValueProvider computes its value dynamically with optional caching. Use this for derived state, expensive computations, or values that need refresh.

func NewManagedValueProvider

func NewManagedValueProvider[T any](name string, provider func(ctx context.Context) (T, error), opts ...ManagedValueProviderOption) *ManagedValueProvider[T]

NewManagedValueProvider creates a managed value that computes its value dynamically. By default, the provider is called on every access. Use WithCacheTTL to enable caching.

Example:

// Always fresh (no caching)
currentTimeMV := graph.NewManagedValueProvider("current_time", func(ctx context.Context) (time.Time, error) {
    return time.Now(), nil
})

// With caching (refreshes every 5 seconds)
cachedTimeMV := graph.NewManagedValueProvider("cached_time", func(ctx context.Context) (time.Time, error) {
    return time.Now(), nil
}, graph.WithCacheTTL(5*time.Second))

func (*ManagedValueProvider[T]) Descriptor

Descriptor returns the checkpoint metadata for this managed value.

func (*ManagedValueProvider[T]) Get

func (p *ManagedValueProvider[T]) Get(ctx context.Context) (T, error)

Get retrieves the value, using cache if enabled and not expired.

func (*ManagedValueProvider[T]) Invalidate

func (p *ManagedValueProvider[T]) Invalidate()

Invalidate clears the cache, forcing a refresh on next Get. This is a no-op if caching is not enabled.

func (*ManagedValueProvider[T]) Name

func (p *ManagedValueProvider[T]) Name() string

Name returns the unique identifier for this managed value.

func (*ManagedValueProvider[T]) Rehydrate

func (p *ManagedValueProvider[T]) Rehydrate(ctx context.Context) error

Rehydrate executes the optional rehydration callback for provider values.

func (*ManagedValueProvider[T]) Set

func (p *ManagedValueProvider[T]) Set(_ context.Context, _ T) error

Set is a no-op for provider values (they are computed).

type ManagedValueProviderOption

type ManagedValueProviderOption func(*managedValueProviderConfig)

ManagedValueProviderOption configures a ManagedValueProvider.

func WithCacheTTL

func WithCacheTTL(ttl time.Duration) ManagedValueProviderOption

WithCacheTTL enables caching with the specified TTL. Without this option, the provider is called on every access.

Example:

// Cache for 5 minutes
mv := graph.NewManagedValueProvider("config", fetchConfig, graph.WithCacheTTL(5*time.Minute))

func WithProviderManagedValueOptions

func WithProviderManagedValueOptions(opts ...ManagedValueOption) ManagedValueProviderOption

WithProviderManagedValueOptions applies generic managed value options when constructing a ManagedValueProvider (e.g., mark as required, register rehydrators).

type ManagedValueRegistry

type ManagedValueRegistry struct {
	// contains filtered or unexported fields
}

ManagedValueRegistry holds managed values for a graph execution (internal).

func NewManagedValueRegistry

func NewManagedValueRegistry() *ManagedValueRegistry

NewManagedValueRegistry creates a new registry for managed values.

type MaxReducer

type MaxReducer[T cmp.Ordered] struct{}

MaxReducer keeps the larger value.

func (MaxReducer[T]) Reduce

func (MaxReducer[T]) Reduce(existing, incoming T) T

Reduce returns the maximum of existing and incoming.

func (MaxReducer[T]) Zero

func (MaxReducer[T]) Zero() T

Zero returns the zero value of T.

type MergeMapReducer

type MergeMapReducer[K comparable, V any] struct{}

MergeMapReducer unions two maps; later keys overwrite earlier.

func (MergeMapReducer[K, V]) Reduce

func (MergeMapReducer[K, V]) Reduce(existing, incoming map[K]V) map[K]V

Reduce merges incoming map into existing map.

func (MergeMapReducer[K, V]) Zero

func (MergeMapReducer[K, V]) Zero() map[K]V

Zero returns nil (empty map).

type Metrics

type Metrics struct {
	TotalNodes           int            `json:"total_nodes"`
	TotalEdges           int            `json:"total_edges"`
	AverageFanOut        float64        `json:"average_fan_out"`
	MaxFanOut            int            `json:"max_fan_out"`
	AverageFanIn         float64        `json:"average_fan_in"`
	MaxFanIn             int            `json:"max_fan_in"`
	CyclomaticComplexity int            `json:"cyclomatic_complexity"`
	NodesByType          map[string]int `json:"nodes_by_type"`
}

Metrics provides static graph metrics.

type MinReducer

type MinReducer[T cmp.Ordered] struct{}

MinReducer keeps the smaller value.

func (MinReducer[T]) Reduce

func (MinReducer[T]) Reduce(existing, incoming T) T

Reduce returns the minimum of existing and incoming.

func (MinReducer[T]) Zero

func (MinReducer[T]) Zero() T

Zero returns the zero value of T.

type Namespace

type Namespace struct {
	// contains filtered or unexported fields
}

Namespace represents a state namespace for isolation.

func NewNamespace

func NewNamespace(name string) Namespace

NewNamespace creates a namespace with the given name.

func (Namespace) Name

func (ns Namespace) Name() string

Name returns the namespace name.

func (Namespace) Prefix

func (ns Namespace) Prefix(key string) string

Prefix returns a key with the namespace prefix.

type NodeFunc

type NodeFunc func(ctx context.Context, scope Scope) (*Command, error)

NodeFunc is the typed signature for node logic. Output type is fixed to Message for agent workflows. Read state via Scope, optionally stream partial outputs, return a Command.

Example:

func myNode(ctx context.Context, scope graph.Scope) (*graph.Command, error) {
    messages := scope.Messages()
    scope.Stream(partialMessage)  // Stream partial output
    return graph.Reply(finalMessage).End()
}

func Compose

func Compose(fn NodeFunc, wrappers ...func(NodeFunc) NodeFunc) NodeFunc

Compose combines multiple wrappers around a NodeFunc. Wrappers are applied right-to-left (last wrapper runs first).

Example:

g.Node("fetch", graph.Compose(
    fetchNode,
    func(fn graph.NodeFunc) graph.NodeFunc { return graph.WithRetry(fn, policy) },
    func(fn graph.NodeFunc) graph.NodeFunc { return graph.WithNamespace(fn, ns, false) },
), "next")

This is equivalent to: WithRetry(WithNamespace(fetchNode, ns, false), policy)

func Subgraph

func Subgraph(
	sub *Graph,
	inputMapper InputMapper,
	outputMapper OutputMapper,
) NodeFunc

Subgraph creates a NodeFunc that executes a compiled subgraph. Use with Node() to add a subgraph while maintaining the fluent builder pattern:

child, _ := graph.New(...).Build()

parent.Node("validate", graph.Subgraph(child,
    func(ctx context.Context, view graph.ReadOnlyScope) ([]graph.Message, error) {
        return graph.GetMessages(view), nil
    },
    func(ctx context.Context, out graph.Message) (graph.Updates, error) {
        return graph.Updates{graph.MessagesKeyName: []graph.Message{out}}, nil
    },
), "next")

The InputMapper transforms parent state into subgraph input. The OutputMapper transforms subgraph output into parent state updates.

func WithNamespace

func WithNamespace(fn NodeFunc, ns Namespace, includeGlobal bool) NodeFunc

WithNamespace wraps a NodeFunc to filter state access by namespace. The wrapped function only sees keys from the specified namespace. If includeGlobal is true, global (non-namespaced) keys are also visible.

Example:

agentNS := graph.NewNamespace("agent1")
g.Node("agent1", graph.WithNamespace(agentNode, agentNS, false), "next")

func WithRetry

func WithRetry(fn NodeFunc, policy *RetryPolicy) NodeFunc

WithRetry wraps a NodeFunc with retry logic. On failure, retries according to the policy with exponential backoff.

Example:

g.Node("fetch", graph.WithRetry(fetchNode, graph.DefaultRetryPolicy()), "process")

type NodeInfo

type NodeInfo struct {
	Name            string   `json:"name"`
	Type            string   `json:"type"` // "standard", "entry", "terminal"
	IncomingEdges   int      `json:"incoming_edges"`
	OutgoingEdges   int      `json:"outgoing_edges"`
	DeclaredTargets []string `json:"declared_targets,omitempty"`
	IsEntryPoint    bool     `json:"is_entry_point"`
	HasInterrupt    bool     `json:"has_interrupt"`
}

NodeInfo contains metadata about a node in the graph.

type NodeMiddleware

type NodeMiddleware func(next NodeFunc) NodeFunc

NodeMiddleware wraps node execution. This runs for every node during graph execution. Output type is fixed to Message for agent workflows.

func ChainNodeMiddleware

func ChainNodeMiddleware(middleware ...NodeMiddleware) NodeMiddleware

ChainNodeMiddleware combines multiple node middleware into one. Middleware are applied in order, so the first middleware is the outermost layer.

Example:

import graphmw "github.com/hupe1980/agentmesh/pkg/graph/middleware"

combined := graph.ChainNodeMiddleware(
    graphmw.LoggingMiddleware(logger),
    graphmw.TimingMiddleware(metricsCallback),
    graphmw.RecoveryMiddleware(panicHandler),
)
graph.WithNodeMiddleware(combined)

This produces: logging(timing(recovery(node))) Execution flows: logging → timing → recovery → node

type OutputMapper

type OutputMapper func(ctx context.Context, output message.Message) (Updates, error)

OutputMapper maps subgraph output to parent graph state updates. Used with Subgraph to transform child graph output into state updates for the parent.

type PregelExecutor

type PregelExecutor struct {
	// contains filtered or unexported fields
}

PregelExecutor executes graphs using the Pregel BSP runtime.

func NewPregelExecutor

func NewPregelExecutor() *PregelExecutor

NewPregelExecutor creates a new Pregel executor with default settings. Uses DefaultMaxWorkers (4) and DefaultMaxSteps (100).

func (*PregelExecutor) Run

func (e *PregelExecutor) Run(ctx context.Context, cfg *ExecutorConfig, input []message.Message, opts ...runOption) iter.Seq2[message.Message, error]

Run executes the graph using the Pregel BSP runtime.

func (*PregelExecutor) WithBackend

func (e *PregelExecutor) WithBackend(backend DistributedBackend) *PregelExecutor

WithBackend sets a custom distributed backend for multi-node execution. The backend abstracts the underlying message-passing mechanism, allowing the graph to run across multiple machines without exposing Pregel concepts.

Use NewPregelBackend() to adapt existing Pregel MessageBus implementations.

func (*PregelExecutor) WithMaxSteps

func (e *PregelExecutor) WithMaxSteps(maxSteps int) *PregelExecutor

WithMaxSteps sets the maximum number of execution steps (iterations).

func (*PregelExecutor) WithMaxWorkers

func (e *PregelExecutor) WithMaxWorkers(n int) *PregelExecutor

WithMaxWorkers sets the maximum number of parallel workers.

type PrependReducer

type PrependReducer[T any] struct{}

PrependReducer inserts incoming slice at the front of existing slice.

func (PrependReducer[T]) Reduce

func (PrependReducer[T]) Reduce(existing, incoming []T) []T

Reduce prepends incoming slice before existing slice.

func (PrependReducer[T]) Zero

func (PrependReducer[T]) Zero() []T

Zero returns nil (empty slice).

type ReadOnlyScope

type ReadOnlyScope interface {
	// NodeName returns the name of the currently executing node.
	// Returns empty string if not in a node execution context.
	NodeName() string

	// GetValue returns the raw value for a key name.
	GetValue(name string) (any, bool)

	// Messages returns the current conversation history.
	// This is the primary way to access the message list.
	Messages() []message.Message

	// LastMessage returns the most recent message in the conversation history.
	// Returns nil if there are no messages.
	LastMessage() message.Message

	// ManagedValues returns the managed values registry, or nil if not configured.
	ManagedValues() *ManagedValueRegistry

	// ToMap returns regular state values as a map for template rendering.
	// Only includes checkpointed state values, NOT managed values.
	// Managed values (ephemeral runtime state) are excluded for safety.
	ToMap() map[string]any
}

ReadOnlyScope provides read-only access to state and execution context. This is the base interface that Scope embeds.

type Reducer

type Reducer[T any] interface {
	// Zero returns the identity element for T (used when no prior value exists).
	Zero() T

	// Reduce merges an incoming value into the existing state.
	Reduce(existing, incoming T) T
}

Reducer specifies how values are combined for a state key. Implementations must be deterministic and ideally commutative for parallel writes.

type ReducerFunc

type ReducerFunc struct {
	// ZeroFn returns the zero value for this reducer.
	ZeroFn func() any

	// ReduceFn merges incoming into existing and returns the result.
	ReduceFn func(existing, incoming any) any
}

ReducerFunc is a type-erased reducer function for runtime use. It wraps the generic Reducer[T] for storage in maps and runtime dispatch.

func WrapReducer

func WrapReducer[T any](r Reducer[T]) ReducerFunc

WrapReducer creates a type-erased ReducerFunc from a generic Reducer[T]. This is used at graph build time to store reducers in a registry.

type RehydrateError

type RehydrateError struct {
	// Name is the name of the managed value that failed to rehydrate.
	Name string
	// Cause is the underlying error.
	Cause error
}

RehydrateError represents an error that occurred while rehydrating a managed value. Use errors.As to extract the Name and Cause fields for programmatic handling.

func (*RehydrateError) Error

func (e *RehydrateError) Error() string

Error implements the error interface.

func (*RehydrateError) Is

func (e *RehydrateError) Is(target error) bool

Is enables comparison with sentinel errors.

func (*RehydrateError) Unwrap

func (e *RehydrateError) Unwrap() error

Unwrap returns the underlying error for errors.Unwrap.

type ReplaceReducer

type ReplaceReducer[T any] struct{}

ReplaceReducer always returns the incoming value (last-write-wins). This is the default reducer for scalar keys.

func (ReplaceReducer[T]) Reduce

func (ReplaceReducer[T]) Reduce(_, incoming T) T

Reduce returns the incoming value, discarding the existing value.

func (ReplaceReducer[T]) Zero

func (ReplaceReducer[T]) Zero() T

Zero returns the zero value of T.

type ResumeOption

type ResumeOption interface {
	// contains filtered or unexported methods
}

ResumeOption is an option that can be passed to Resume().

func WithApproval

func WithApproval(nodeName string, approval *ApprovalResponse) ResumeOption

WithApproval provides an approval response for a node interrupt. This is a Resume-only option - approvals are provided when continuing after an interrupt.

func WithCheckpoint

func WithCheckpoint(cp *checkpoint.Checkpoint) ResumeOption

WithCheckpoint resumes from a specific checkpoint instead of auto-restoring from the latest checkpoint. This is a Resume-only option.

Example:

savedCp, _ := checkpointer.Load(ctx, runID)
compiled.Resume(ctx, runID, graph.WithCheckpoint(savedCp))

func WithResumeValue

func WithResumeValue(nodeName string, key string, value any) ResumeOption

WithResumeValue is a convenience function that sets a state value and auto-approves a node for simple human-input scenarios. This is a Resume-only option.

Example:

compiled.Resume(ctx, runID,
    graph.WithResumeValue("wait_for_answer", answerKey.Name(), "Paris"),
)

func WithStateUpdates

func WithStateUpdates(updates map[string]any) ResumeOption

WithStateUpdates applies state updates when resuming. This is a Resume-only option for human-in-the-loop workflows.

Example:

savedCp, _ := checkpointer.Load(ctx, runID)
compiled.Resume(ctx, savedCp, runID,
    graph.WithStateUpdates(map[string]any{"answer": "Paris"}),
    graph.WithApproval("wait_node", approval),
)

type RetryPolicy

type RetryPolicy struct {
	MaxAttempts int
	Delay       time.Duration
	MaxDelay    time.Duration
	Multiplier  float64
	Retryable   func(error) bool // Optional: determines if error should trigger retry
}

RetryPolicy configures automatic retry behavior for node execution.

func DefaultRetryPolicy

func DefaultRetryPolicy() *RetryPolicy

DefaultRetryPolicy returns a sensible default retry policy. Uses DefaultRetryMaxAttempts (3), DefaultRetryDelay (100ms), DefaultRetryMaxDelay (5s), and DefaultRetryMultiplier (2.0).

type RetryPolicyBuilder

type RetryPolicyBuilder struct {
	// contains filtered or unexported fields
}

RetryPolicyBuilder provides a fluent API for constructing retry policies.

Example:

policy := graph.NewRetryPolicyBuilder().
    WithMaxAttempts(5).
    WithExponentialBackoff(time.Second, 2.0).
    Build()

func NewRetryPolicyBuilder

func NewRetryPolicyBuilder() *RetryPolicyBuilder

NewRetryPolicyBuilder creates a new retry policy builder with sensible defaults:

  • MaxAttempts: DefaultRetryMaxAttempts (3)
  • Delay: DefaultRetryDelay (100ms base delay)
  • MaxDelay: DefaultRetryMaxDelay (5s)
  • Multiplier: DefaultRetryMultiplier (2.0 exponential backoff)

func (*RetryPolicyBuilder) Build

func (b *RetryPolicyBuilder) Build() *RetryPolicy

Build creates the RetryPolicy.

func (*RetryPolicyBuilder) WithConstantBackoff

func (b *RetryPolicyBuilder) WithConstantBackoff(delay time.Duration) *RetryPolicyBuilder

WithConstantBackoff configures constant delay between retries.

Example:

WithConstantBackoff(time.Second) // 1s, 1s, 1s, ...

func (*RetryPolicyBuilder) WithExponentialBackoff

func (b *RetryPolicyBuilder) WithExponentialBackoff(delay time.Duration, multiplier float64) *RetryPolicyBuilder

WithExponentialBackoff configures exponential backoff. Wait time = delay * (multiplier ^ attempt).

Example:

WithExponentialBackoff(time.Second, 2.0) // 1s, 2s, 4s, 8s, ...

func (*RetryPolicyBuilder) WithLinearBackoff

func (b *RetryPolicyBuilder) WithLinearBackoff(delay time.Duration) *RetryPolicyBuilder

WithLinearBackoff configures linear backoff. Wait time = delay * attempt.

Example:

WithLinearBackoff(time.Second) // 1s, 2s, 3s, 4s, ...

func (*RetryPolicyBuilder) WithMaxAttempts

func (b *RetryPolicyBuilder) WithMaxAttempts(n int) *RetryPolicyBuilder

WithMaxAttempts sets the maximum number of execution attempts.

func (*RetryPolicyBuilder) WithMaxDelay

func (b *RetryPolicyBuilder) WithMaxDelay(d time.Duration) *RetryPolicyBuilder

WithMaxDelay sets the maximum delay between retries.

func (*RetryPolicyBuilder) WithRetryableFunc

func (b *RetryPolicyBuilder) WithRetryableFunc(fn func(error) bool) *RetryPolicyBuilder

WithRetryableFunc sets a function to determine if an error should trigger retry. If not set, all errors are considered retryable.

type RunFunc

type RunFunc func(ctx context.Context, input []message.Message) iter.Seq2[message.Message, error]

RunFunc is the function signature for graph execution. Input is []message.Message (conversation history), output is Message (response).

type RunMiddleware

type RunMiddleware func(next RunFunc) RunFunc

RunMiddleware wraps the entire graph execution (Run/Resume). Unlike node middleware which runs for every node, run middleware intercepts the input before execution starts and the final output after. This is useful for:

  • Input validation/guardrails (check user input once at start)
  • Output validation/guardrails (check final output once at end)
  • Logging/observability at the run level
  • Request/response transformation

func ChainRunMiddleware

func ChainRunMiddleware(middleware ...RunMiddleware) RunMiddleware

ChainRunMiddleware combines multiple run middleware into one. Middleware are applied in order, so the first middleware is the outermost layer.

Example:

combined := graph.ChainRunMiddleware(
    inputValidationMiddleware,
    outputValidationMiddleware,
    loggingMiddleware,
)
graph.WithRunMiddleware(combined)

This produces: inputValidation(outputValidation(logging(run))) Execution flows: inputValidation → outputValidation → logging → run

type RunOption

type RunOption interface {
	// contains filtered or unexported methods
}

RunOption is an option that can be passed to Run().

func WithInitialValue

func WithInitialValue[T any](key Key[T], value T) RunOption

WithInitialValue sets an initial state value when starting graph execution. This is a Run-only option.

Example:

compiled.Run(ctx, messages,
    graph.WithInitialValue(agent.SessionIDKey, "session-123"),
)

func WithRunID

func WithRunID(id string) RunOption

WithRunID sets the run ID for checkpointing when starting a new execution. This is a Run-only option.

Example:

compiled.Run(ctx, input, graph.WithRunID("workflow-123"))

type RuntimeMetrics

type RuntimeMetrics struct {

	// CurrentSuperstep tracks the current iteration (superstep) number.
	// In Pregel BSP model, each superstep represents one round of computation.
	CurrentSuperstep int64

	// CompletedNodes lists node names that have finished execution.
	CompletedNodes []string

	// PausedNodes lists node names that are currently paused (e.g., waiting for human input).
	PausedNodes []string

	// ResumingNodes lists node names that are being resumed from a paused state.
	// These nodes should skip interrupt checks.
	ResumingNodes []string

	// ActiveNodes lists node names currently being executed.
	ActiveNodes []string

	// FailedNodes tracks nodes that encountered errors.
	FailedNodes []string

	// TotalMessages counts total messages sent between nodes.
	TotalMessages int64

	// ExecutionTimeNs tracks total execution time in nanoseconds.
	ExecutionTimeNs int64
	// contains filtered or unexported fields
}

RuntimeMetrics tracks execution metrics for a running or completed graph. Thread-safe for concurrent access during execution.

func NewRuntimeMetrics

func NewRuntimeMetrics() *RuntimeMetrics

NewRuntimeMetrics creates a new runtime metrics tracker.

func (*RuntimeMetrics) AddActive

func (rm *RuntimeMetrics) AddActive(nodeName string)

AddActive marks a node as actively executing.

func (*RuntimeMetrics) AddCompleted

func (rm *RuntimeMetrics) AddCompleted(nodeName string)

AddCompleted marks a node as completed.

func (*RuntimeMetrics) AddFailed

func (rm *RuntimeMetrics) AddFailed(nodeName string)

AddFailed marks a node as failed.

func (*RuntimeMetrics) AddMessage

func (rm *RuntimeMetrics) AddMessage()

AddMessage increments the message counter.

func (*RuntimeMetrics) AddMessages

func (rm *RuntimeMetrics) AddMessages(n int64)

AddMessages increments the message counter by n.

func (*RuntimeMetrics) AddPaused

func (rm *RuntimeMetrics) AddPaused(nodeName string)

AddPaused marks a node as paused.

func (*RuntimeMetrics) ClearResuming

func (rm *RuntimeMetrics) ClearResuming(nodeName string)

ClearResuming removes a node from the resuming list (after it executes).

func (*RuntimeMetrics) GetSuperstep

func (rm *RuntimeMetrics) GetSuperstep() int64

GetSuperstep returns the current superstep number.

func (*RuntimeMetrics) IsResuming

func (rm *RuntimeMetrics) IsResuming(nodeName string) bool

IsResuming checks if a node is currently being resumed.

func (*RuntimeMetrics) Reset

func (rm *RuntimeMetrics) Reset()

Reset clears all metrics to initial state.

func (*RuntimeMetrics) ResumePaused

func (rm *RuntimeMetrics) ResumePaused(nodeName string)

ResumePaused removes a node from the paused list and marks it as resuming.

func (*RuntimeMetrics) SetExecutionTime

func (rm *RuntimeMetrics) SetExecutionTime(ns int64)

SetExecutionTime sets the total execution time.

func (*RuntimeMetrics) SetSuperstep

func (rm *RuntimeMetrics) SetSuperstep(step int64)

SetSuperstep updates the current superstep number.

func (*RuntimeMetrics) Snapshot

func (rm *RuntimeMetrics) Snapshot() RuntimeMetricsSnapshot

Snapshot creates a read-only snapshot of the current metrics.

type RuntimeMetricsSnapshot

type RuntimeMetricsSnapshot struct {
	CurrentSuperstep int64
	CompletedNodes   []string
	PausedNodes      []string
	ResumingNodes    []string
	ActiveNodes      []string
	FailedNodes      []string
	TotalMessages    int64
	ExecutionTimeNs  int64
}

RuntimeMetricsSnapshot is a read-only snapshot of runtime metrics.

type Scope

type Scope interface {
	ReadOnlyScope

	// Stream emits a Message immediately to the graph's output iterator.
	// Use for partial/streaming results during node execution.
	Stream(value message.Message)
}

Scope provides the execution context for a node. It embeds ReadOnlyScope for state access and adds typed output streaming. Output type is fixed to Message for agent workflows.

func GetScope

func GetScope(ctx context.Context) Scope

GetScope retrieves the Scope from context. Returns nil if scope is not available. This is primarily used by tools that need streaming access.

Example usage in a tool:

func (t *MyTool) Run(ctx context.Context, input string) (string, error) {
    if scope := graph.GetScope(ctx); scope != nil {
        scope.Stream(graph.NewAIMessageFromText("progress..."))
    }
    return result, nil
}

type SharedOption

type SharedOption func(*runConfig)

SharedOption implements both RunOption and ResumeOption interfaces. This allows common options (like WithMaxConcurrency) to work with both Run() and Resume() without explicit conversion.

func WithCheckpointInterval

func WithCheckpointInterval(interval int) SharedOption

WithCheckpointInterval sets how often checkpoints are saved. This option works with both Run and Resume.

func WithFailOnCheckpointError

func WithFailOnCheckpointError(fail bool) SharedOption

WithFailOnCheckpointError configures checkpoint error handling. This option works with both Run and Resume.

func WithManagedValues

func WithManagedValues(values ...ManagedValue) SharedOption

WithManagedValues attaches ephemeral runtime values to the graph execution. This option works with both Run and Resume.

Example:

compiled.Run(ctx, input, graph.WithManagedValues(apiKeyMV))
compiled.Resume(ctx, runID, graph.WithManagedValues(apiKeyMV))

func WithMaxConcurrency

func WithMaxConcurrency(n int) SharedOption

WithMaxConcurrency sets the maximum number of nodes that can execute in parallel. This option works with both Run and Resume.

func WithMaxIterations

func WithMaxIterations(n int) SharedOption

WithMaxIterations sets the maximum number of supersteps before stopping. This option works with both Run and Resume.

type SkipZeroReducer

type SkipZeroReducer[T comparable, R Reducer[T]] struct {
	Inner R
}

SkipZeroReducer wraps a reducer to skip zero-value inputs. This preserves the existing value when the incoming value is the zero value.

func NewSkipZeroReducer

func NewSkipZeroReducer[T comparable, R Reducer[T]](inner R) SkipZeroReducer[T, R]

NewSkipZeroReducer creates a SkipZeroReducer wrapper around a reducer.

func (SkipZeroReducer[T, R]) Reduce

func (r SkipZeroReducer[T, R]) Reduce(existing, incoming T) T

Reduce skips the incoming value if it's the zero value.

func (SkipZeroReducer[T, R]) Zero

func (r SkipZeroReducer[T, R]) Zero() T

Zero delegates to the inner reducer.

type StateKey

type StateKey interface {
	// Name returns the unique name of this key.
	Name() string

	// ReducerFunc returns the type-erased reducer for this key.
	ReducerFunc() ReducerFunc
	// contains filtered or unexported methods
}

StateKey is the interface that all state keys must implement.

type StateUpdate

type StateUpdate struct {
	From string  // Source node
	To   string  // Target node
	Data Updates // State changes
}

StateUpdate represents a state change being sent between nodes. This is the graph-layer equivalent of a Pregel message.

type StaticManagedValue

type StaticManagedValue[T any] struct {
	// contains filtered or unexported fields
}

StaticManagedValue provides thread-safe storage for ephemeral values. Values are stored in memory and NOT checkpointed.

func NewManagedValue

func NewManagedValue[T any](name string, value T, opts ...ManagedValueOption) *StaticManagedValue[T]

NewManagedValue creates a managed value with a static value.

Example:

configMV := graph.NewManagedValue("runtime_config", &Config{APIKey: "sk_live_..."})
timeoutMV := graph.NewManagedValue("timeout", 30*time.Second)

func (*StaticManagedValue[T]) Descriptor

Descriptor returns the checkpoint metadata for this managed value.

func (*StaticManagedValue[T]) Get

func (m *StaticManagedValue[T]) Get(_ context.Context) (T, error)

Get retrieves the current value.

func (*StaticManagedValue[T]) Name

func (m *StaticManagedValue[T]) Name() string

Name returns the unique identifier for this managed value.

func (*StaticManagedValue[T]) Rehydrate

func (m *StaticManagedValue[T]) Rehydrate(ctx context.Context) error

Rehydrate executes the optional rehydration callback registered via options.

func (*StaticManagedValue[T]) Set

func (m *StaticManagedValue[T]) Set(_ context.Context, value T) error

Set updates the stored value.

type Store

type Store interface {
	Get(ctx context.Context, key string) (any, error)
	Set(ctx context.Context, key string, value any) error
	Delete(ctx context.Context, key string) error
}

Store interface for state persistence.

type SumReducer

type SumReducer[T ~int | ~int64 | ~float64] struct{}

SumReducer adds numeric values.

func (SumReducer[T]) Reduce

func (SumReducer[T]) Reduce(existing, incoming T) T

Reduce adds the incoming value to the existing value.

func (SumReducer[T]) Zero

func (SumReducer[T]) Zero() T

Zero returns 0.

type Topology

type Topology struct {
	Nodes       []NodeInfo `json:"nodes"`
	Edges       []EdgeInfo `json:"edges"`
	EntryPoints []string   `json:"entry_points"`
	ExitPoints  []string   `json:"exit_points"` // Nodes that can route to END
}

Topology provides a complete view of the graph structure.

type Updates

type Updates map[string]any

Updates is a map of state changes.

type ValidationError

type ValidationError struct {
	Type    ValidationErrorType
	Node    string
	Message string
}

ValidationError represents a graph validation error.

func (ValidationError) Error

func (e ValidationError) Error() string

Error implements the error interface.

type ValidationErrorType

type ValidationErrorType string

ValidationErrorType classifies validation errors.

const (
	ErrorTypeCycle            ValidationErrorType = "CYCLE"
	ErrorTypeDisconnected     ValidationErrorType = "DISCONNECTED"
	ErrorTypeDuplicateKey     ValidationErrorType = "DUPLICATE_KEY"
	ErrorTypeInvalidEntryNode ValidationErrorType = "INVALID_ENTRY_NODE"
	ErrorTypeInvalidEndNode   ValidationErrorType = "INVALID_END_NODE"
	ErrorTypeMissingNode      ValidationErrorType = "MISSING_NODE"
	ErrorTypeInvalidBranch    ValidationErrorType = "INVALID_BRANCH"
	ErrorTypeInvalidEdge      ValidationErrorType = "INVALID_EDGE"
	ErrorTypeDuplicateNode    ValidationErrorType = "DUPLICATE_NODE"
)

Validation error type constants.

type ValidationLevel

type ValidationLevel int

ValidationLevel determines the strictness of validation.

const (
	// ValidationLevelNone skips all validation.
	ValidationLevelNone ValidationLevel = iota
	// ValidationLevelBasic performs basic structural validation.
	ValidationLevelBasic
	// ValidationLevelStrict performs comprehensive validation including cycle detection.
	ValidationLevelStrict
)

type ValidationOptions

type ValidationOptions struct {
	Level                  ValidationLevel
	AllowCycles            bool
	AllowDisconnectedNodes bool
}

ValidationOptions configures graph validation behavior.

func DefaultValidationOptions

func DefaultValidationOptions() ValidationOptions

DefaultValidationOptions returns the default validation configuration.

func StrictValidationOptions

func StrictValidationOptions() ValidationOptions

StrictValidationOptions returns strict validation configuration.

Directories

Path Synopsis
Package middleware provides reusable middleware for graph node execution.
Package middleware provides reusable middleware for graph node execution.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL