Documentation
¶
Overview ¶
Package graph provides structured errors for the graph package.
Package graph provides a simplified API for building executable workflows.
Index ¶
- Constants
- Variables
- func Collect[T any](seq iter.Seq2[T, error]) ([]T, error)
- func Get[T any](scope ReadOnlyScope, key Key[T]) T
- func GetList[T any](scope ReadOnlyScope, key Key[[]T]) []T
- func GetManaged[T any](ctx context.Context, scope ReadOnlyScope, mv ManagedValueAccessor[T]) T
- func GetMessages(scope ReadOnlyScope) []message.Message
- func Last[T any](seq iter.Seq2[T, error]) (T, error)
- func LastMessage(scope ReadOnlyScope) message.Message
- func LastStructured[T any, O fmt.Stringer](seq iter.Seq2[O, error]) (*T, error)
- func ScopeGet[T any](scope Scope, key Key[T]) T
- func ScopeGetList[T any](scope Scope, key Key[[]T]) []T
- func SetValue[T any](key Key[T], value T) func(*CommandBuilder) *CommandBuilder
- func WithScope(ctx context.Context, scope Scope) context.Context
- type AppendReducer
- type ApprovalDecision
- type ApprovalGuard
- type ApprovalResponse
- type BSPState
- func (s *BSPState) ApplyPendingWrites(pending []checkpoint.PendingWrite)
- func (s *BSPState) CommitBarrier()
- func (s *BSPState) GetCommitted(key string) (any, bool)
- func (s *BSPState) HasPendingWrites() bool
- func (s *BSPState) PendingWrites() []checkpoint.PendingWrite
- func (s *BSPState) ReadView() ReadOnlyScope
- func (s *BSPState) Snapshot() map[string]any
- func (s *BSPState) Write(nodeName string, updates Updates)
- type BuildOption
- type Builder
- func (b *Builder) Build(opts ...BuildOption) (*Graph, error)
- func (b *Builder) InterruptAfter(nodeName string, opts ...InterruptOption) *Builder
- func (b *Builder) InterruptBefore(nodeName string, opts ...InterruptOption) *Builder
- func (b *Builder) Node(name string, fn NodeFunc, targets ...string) *Builder
- func (b *Builder) Start(names ...string) *Builder
- func (b *Builder) Validate(opts ...ValidationOptions) []ValidationError
- func (b *Builder) WithCheckpointer(cp checkpoint.Checkpointer, runID string) *Builder
- func (b *Builder) WithExecutor(exec Executor) *Builder
- func (b *Builder) WithNodeMiddleware(mw ...NodeMiddleware) *Builder
- func (b *Builder) WithRunMiddleware(mw ...RunMiddleware) *Builder
- func (b *Builder) WithStore(store Store) *Builder
- type CheckpointConfig
- type CheckpointStateError
- type Command
- type CommandBuilder
- type DistributedBackend
- type EdgeInfo
- type ExecutionConfig
- type Executor
- type ExecutorConfig
- type ExecutorNode
- type FirstReducer
- type Graph
- func (g *Graph) GetMetrics() *Metrics
- func (g *Graph) GetNodeInfo(name string) (*NodeInfo, error)
- func (g *Graph) GetNodes() []string
- func (g *Graph) GetTopology() *Topology
- func (g *Graph) MermaidFlowchart(direction string) string
- func (g *Graph) Resume(ctx context.Context, runID string, opts ...ResumeOption) iter.Seq2[message.Message, error]
- func (g *Graph) Run(ctx context.Context, input []message.Message, opts ...RunOption) iter.Seq2[message.Message, error]
- type InputMapper
- type InterruptConfig
- type InterruptError
- type InterruptOption
- type Key
- type KeyOption
- type KeyRegistry
- type LastReducer
- type ManagedValue
- type ManagedValueAccessor
- type ManagedValueError
- type ManagedValueOption
- type ManagedValueProvider
- func (p *ManagedValueProvider[T]) Descriptor() checkpoint.ManagedValueDescriptor
- func (p *ManagedValueProvider[T]) Get(ctx context.Context) (T, error)
- func (p *ManagedValueProvider[T]) Invalidate()
- func (p *ManagedValueProvider[T]) Name() string
- func (p *ManagedValueProvider[T]) Rehydrate(ctx context.Context) error
- func (p *ManagedValueProvider[T]) Set(_ context.Context, _ T) error
- type ManagedValueProviderOption
- type ManagedValueRegistry
- type MaxReducer
- type MergeMapReducer
- type Metrics
- type MinReducer
- type Namespace
- type NodeFunc
- type NodeInfo
- type NodeMiddleware
- type OutputMapper
- type PregelExecutor
- func (e *PregelExecutor) Run(ctx context.Context, cfg *ExecutorConfig, input []message.Message, ...) iter.Seq2[message.Message, error]
- func (e *PregelExecutor) WithBackend(backend DistributedBackend) *PregelExecutor
- func (e *PregelExecutor) WithMaxSteps(maxSteps int) *PregelExecutor
- func (e *PregelExecutor) WithMaxWorkers(n int) *PregelExecutor
- type PrependReducer
- type ReadOnlyScope
- type Reducer
- type ReducerFunc
- type RehydrateError
- type ReplaceReducer
- type ResumeOption
- type RetryPolicy
- type RetryPolicyBuilder
- func (b *RetryPolicyBuilder) Build() *RetryPolicy
- func (b *RetryPolicyBuilder) WithConstantBackoff(delay time.Duration) *RetryPolicyBuilder
- func (b *RetryPolicyBuilder) WithExponentialBackoff(delay time.Duration, multiplier float64) *RetryPolicyBuilder
- func (b *RetryPolicyBuilder) WithLinearBackoff(delay time.Duration) *RetryPolicyBuilder
- func (b *RetryPolicyBuilder) WithMaxAttempts(n int) *RetryPolicyBuilder
- func (b *RetryPolicyBuilder) WithMaxDelay(d time.Duration) *RetryPolicyBuilder
- func (b *RetryPolicyBuilder) WithRetryableFunc(fn func(error) bool) *RetryPolicyBuilder
- type RunFunc
- type RunMiddleware
- type RunOption
- type RuntimeMetrics
- func (rm *RuntimeMetrics) AddActive(nodeName string)
- func (rm *RuntimeMetrics) AddCompleted(nodeName string)
- func (rm *RuntimeMetrics) AddFailed(nodeName string)
- func (rm *RuntimeMetrics) AddMessage()
- func (rm *RuntimeMetrics) AddMessages(n int64)
- func (rm *RuntimeMetrics) AddPaused(nodeName string)
- func (rm *RuntimeMetrics) ClearResuming(nodeName string)
- func (rm *RuntimeMetrics) GetSuperstep() int64
- func (rm *RuntimeMetrics) IsResuming(nodeName string) bool
- func (rm *RuntimeMetrics) Reset()
- func (rm *RuntimeMetrics) ResumePaused(nodeName string)
- func (rm *RuntimeMetrics) SetExecutionTime(ns int64)
- func (rm *RuntimeMetrics) SetSuperstep(step int64)
- func (rm *RuntimeMetrics) Snapshot() RuntimeMetricsSnapshot
- type RuntimeMetricsSnapshot
- type Scope
- type SharedOption
- type SkipZeroReducer
- type StateKey
- type StateUpdate
- type StaticManagedValue
- func (m *StaticManagedValue[T]) Descriptor() checkpoint.ManagedValueDescriptor
- func (m *StaticManagedValue[T]) Get(_ context.Context) (T, error)
- func (m *StaticManagedValue[T]) Name() string
- func (m *StaticManagedValue[T]) Rehydrate(ctx context.Context) error
- func (m *StaticManagedValue[T]) Set(_ context.Context, value T) error
- type Store
- type SumReducer
- type Topology
- type Updates
- type ValidationError
- type ValidationErrorType
- type ValidationLevel
- type ValidationOptions
Constants ¶
const ( // DefaultMaxWorkers is the default number of parallel workers for executing // graph nodes in the Pregel runtime. This controls concurrency of node execution // within each superstep. // // Why 4? Matches typical CPU core count for development machines. Production // deployments should tune this based on workload characteristics: // - CPU-bound tasks: set to runtime.NumCPU() // - I/O-bound tasks (API calls): can be 10-100x higher // - Memory-constrained: reduce based on per-node memory footprint DefaultMaxWorkers = 4 // DefaultMaxSteps is the default maximum number of supersteps before the // graph execution terminates. This prevents infinite loops in cyclic graphs // or runaway agent behavior. // // Why 100? Sufficient for most agent workflows: // - Simple ReAct agents: 5-20 steps typical // - Complex multi-agent: 20-50 steps typical // - Research/exploration: may need 100+ (use WithMaxSteps to override) // If exceeded, execution stops with ErrMaxIterationsExceeded. DefaultMaxSteps = 100 // DefaultCheckpointInterval is the default interval for saving checkpoints // during graph execution. Value of 1 means checkpoint after every superstep. // // Why 1? Ensures maximum recoverability at the cost of checkpoint overhead. // For long-running graphs with expensive checkpointing, increase via // graph.WithCheckpointInterval() to reduce I/O overhead. DefaultCheckpointInterval = 1 // DefaultResultChanSize buffers outputs to prevent backpressure when the yield // consumer is slower than the producer. This provides smoother execution flow // without blocking nodes. Typical agents produce <10 results/superstep. // // Why 100? Sized for ~10 supersteps worth of buffering (10 results/step * 10 steps). // Large enough to prevent blocking during brief consumer slowdowns, small enough // to avoid excessive memory usage (~8KB for typical output types). DefaultResultChanSize = 100 )
const ( // DefaultRetryMaxAttempts is the default maximum number of retry attempts // for node execution. After this many failures, the error propagates. // // Why 3? Industry standard for transient failures (network timeouts, rate limits). // First attempt + 2 retries = 3 total attempts, sufficient for most transient issues. DefaultRetryMaxAttempts = 3 // DefaultRetryDelay is the default base delay between retry attempts. // Combined with exponential backoff (2.0 multiplier), delays are: // 100ms -> 200ms -> 400ms -> ... (capped at DefaultRetryMaxDelay) DefaultRetryDelay = 100 * time.Millisecond // DefaultRetryMaxDelay is the maximum delay between retry attempts. // Prevents excessive wait times during extended outages. // // Why 5s? Balances patience for recovery with user experience. // Longer delays rarely help—if service is down >5s, manual intervention needed. DefaultRetryMaxDelay = 5 * time.Second // DefaultRetryMultiplier is the exponential backoff multiplier. // Each retry delay = previous delay * multiplier (until MaxDelay). // // Why 2.0? Standard exponential backoff. Quickly backs off to reduce // load on failing services while not being overly aggressive. DefaultRetryMultiplier = 2.0 )
const ApprovalsKey = "__approvals__"
ApprovalsKey is the reserved state key for storing node approvals.
const END = "__end__"
END is the terminal node constant.
const MessagesKeyName = "messages"
MessagesKeyName is the string name used for the messages state key. Use this constant when initializing state maps in tests or when accessing the raw state map.
Variables ¶
var ( ErrNoEntryPoint = errors.New("graph: no entry point defined") ErrNodeNotFound = errors.New("graph: node not found") ErrDuplicateNode = errors.New("graph: duplicate node name") ErrDuplicateKey = errors.New("graph: duplicate key") ErrInvalidTarget = errors.New("graph: invalid target node") )
Sentinel errors for graph validation.
var ErrEmptySequence = errors.New("graph: empty sequence")
ErrEmptySequence is returned when trying to get the last element of an empty sequence.
var ErrNamespaceViolation = fmt.Errorf("graph: namespace violation")
ErrNamespaceViolation is returned when a node attempts to access or update keys outside its namespace.
Functions ¶
func Collect ¶
Collect gathers all values from an iterator sequence into a slice. Returns an error if the sequence produces an error.
Example:
results, err := graph.Collect(g.Run(ctx, input))
if err != nil {
return fmt.Errorf("execution failed: %w", err)
}
func Get ¶
func Get[T any](scope ReadOnlyScope, key Key[T]) T
Get returns the typed value for a key from the scope. If no value exists, returns the reducer's zero value.
func GetList ¶
func GetList[T any](scope ReadOnlyScope, key Key[[]T]) []T
GetList returns the typed slice for a list key from the scope. If no value exists, returns nil.
func GetManaged ¶
func GetManaged[T any](ctx context.Context, scope ReadOnlyScope, mv ManagedValueAccessor[T]) T
GetManaged retrieves a managed value from the view with type safety. The managed value must have been passed via WithManagedValues.
This follows the same pattern as Get(view, key) for regular state.
Example:
var configMV = graph.NewManagedValue("config", defaultConfig)
func myNode(ctx context.Context, view graph.ReadOnlyScope) (*graph.Command, error) {
config := graph.GetManaged(ctx, view, configMV)
// use config...
return graph.Set(resultKey, result).End()
}
func GetMessages ¶
func GetMessages(scope ReadOnlyScope) []message.Message
GetMessages retrieves the message history from a ReadOnlyScope. This is a convenience function - you can also use scope.Messages() directly.
func Last ¶
Last returns the last value from an iterator sequence. Returns an error if the sequence produces an error or is empty.
ERROR HANDLING:
- Returns immediately when iterator yields any error (err != nil)
- Use this for blocking/non-streaming execution
Example:
lastMsg, err := graph.Last(g.Run(ctx, input))
if err != nil {
return fmt.Errorf("execution failed: %w", err)
}
func LastMessage ¶
func LastMessage(scope ReadOnlyScope) message.Message
LastMessage returns the last message from the history, or nil if empty.
func LastStructured ¶
LastStructured extracts the last value from an iterator and unmarshals content into the specified type T. The output type O must implement fmt.Stringer. This is useful for extracting structured output from agent execution.
Example with agent:
type MovieReview struct {
Title string `json:"title"`
Rating int `json:"rating"`
Summary string `json:"summary"`
}
agent, _ := agent.NewReAct(model, agent.WithOutputSchema(outputSchema))
review, err := graph.LastStructured[MovieReview](agent.Run(ctx, messages))
if err != nil {
return err
}
fmt.Printf("Rating: %d/5\n", review.Rating)
func ScopeGet ¶
ScopeGet returns the typed value for a key from the scope. This is a convenience function that works with Scope.
func ScopeGetList ¶
ScopeGetList returns the typed list for a slice key from the scope.
func SetValue ¶
func SetValue[T any](key Key[T], value T) func(*CommandBuilder) *CommandBuilder
SetValue returns a type-safe update function for use with With(). The generic parameter T is inferred from Key[T], ensuring type safety.
Example:
cmd.With(graph.SetValue(statusKey, "done")) // ✅ Type-safe cmd.With(graph.SetValue(statusKey, 42)) // ❌ Compile error
Types ¶
type AppendReducer ¶
type AppendReducer[T any] struct{}
AppendReducer concatenates slices. This is the default reducer for list keys.
func (AppendReducer[T]) Reduce ¶
func (AppendReducer[T]) Reduce(existing, incoming []T) []T
Reduce appends incoming slice to existing slice.
type ApprovalDecision ¶
type ApprovalDecision string
ApprovalDecision represents the approval outcome.
const ( ApprovalApproved ApprovalDecision = "approved" ApprovalRejected ApprovalDecision = "rejected" )
ApprovalDecision constants.
type ApprovalGuard ¶
type ApprovalGuard func(ctx context.Context, scope ReadOnlyScope) (needsApproval bool, reason string, err error)
ApprovalGuard determines if approval is needed. Uses ReadOnlyScope (not Scope) since it doesn't need streaming access.
type ApprovalResponse ¶
type ApprovalResponse struct {
Decision ApprovalDecision
Reason string
User string
Timestamp time.Time
Edits Updates
Annotations map[string]any
}
ApprovalResponse represents a human approval decision.
type BSPState ¶
type BSPState struct {
// contains filtered or unexported fields
}
BSPState manages state with proper BSP semantics: - All reads within a superstep see the same snapshot (from previous superstep) - All writes are buffered and only become visible after barrier commit - This ensures deterministic parallel execution regardless of scheduling order
Optimizations: - Copy-on-write: readSnapshot only recreated when writes occur - Version tracking: avoid unnecessary snapshot copies - Reducer-based merging: uses registered reducers for state updates - Atomic version checking: skip locking in ReadView when possible
func NewBSPState ¶
func NewBSPState(initial map[string]any, keyRegistry KeyRegistry) *BSPState
NewBSPState creates a new BSP-compliant state manager.
func (*BSPState) ApplyPendingWrites ¶
func (s *BSPState) ApplyPendingWrites(pending []checkpoint.PendingWrite)
ApplyPendingWrites applies externally provided pending writes to committed state. Used when restoring from a checkpoint with Committed=false. The writes are applied directly to committed state (since the checkpoint was saved after node execution but before barrier commit).
func (*BSPState) CommitBarrier ¶
func (s *BSPState) CommitBarrier()
CommitBarrier commits all buffered writes and creates a new read snapshot. This is called at the end of each superstep (barrier synchronization point). Optimized: only creates new snapshot if there were writes.
func (*BSPState) GetCommitted ¶
GetCommitted returns a value from committed state. Used for extracting final output after execution completes.
func (*BSPState) HasPendingWrites ¶
HasPendingWrites returns true if there are uncommitted writes in the buffer.
func (*BSPState) PendingWrites ¶
func (s *BSPState) PendingWrites() []checkpoint.PendingWrite
PendingWrites returns write buffer contents as checkpoint.PendingWrite slice. Used for two-phase commit checkpointing - captures writes before barrier commit.
func (*BSPState) ReadView ¶
func (s *BSPState) ReadView() ReadOnlyScope
ReadView returns a ReadOnlyScope that reads from the current superstep's snapshot. This scope is safe for concurrent reads - it reads from immutable snapshot. Optimized: uses atomic pointer to cached view, avoiding lock acquisition when snapshot hasn't changed.
type BuildOption ¶
type BuildOption func(*buildConfig)
BuildOption configures a Build call.
func WithStrictValidation ¶
func WithStrictValidation() BuildOption
WithStrictValidation enables strict validation mode. This includes cycle detection and disconnected node detection.
func WithValidation ¶
func WithValidation(opts ValidationOptions) BuildOption
WithValidation sets custom validation options.
func WithoutValidation ¶
func WithoutValidation() BuildOption
WithoutValidation disables validation (use with caution). Only use this for trusted graphs or when validation overhead is unacceptable.
type Builder ¶
type Builder struct {
// contains filtered or unexported fields
}
Builder is a fluent workflow builder. Create with New(), add nodes, then Build() to get an executable Graph. Input is []Message (conversation history), output is Message (response).
func New ¶
New creates a graph builder with the given state keys. Keys are automatically registered. messagesKey is always included and used as the output key. Duplicate keys will cause Build() to fail.
By default, messagesKey is included. Additional keys can be provided.
func (*Builder) Build ¶
func (b *Builder) Build(opts ...BuildOption) (*Graph, error)
Build compiles and validates the builder. Returns an executable Graph or an error.
func (*Builder) InterruptAfter ¶
func (b *Builder) InterruptAfter(nodeName string, opts ...InterruptOption) *Builder
InterruptAfter adds an interrupt after the specified node. When execution completes this node, it will pause and yield an interrupt event.
func (*Builder) InterruptBefore ¶
func (b *Builder) InterruptBefore(nodeName string, opts ...InterruptOption) *Builder
InterruptBefore adds an interrupt before the specified node. When execution reaches this node, it will pause and yield an interrupt event.
func (*Builder) Node ¶
Node adds a node to the graph. Targets are the possible next nodes (use END for terminal).
func (*Builder) Validate ¶
func (b *Builder) Validate(opts ...ValidationOptions) []ValidationError
Validate performs validation on the graph and returns any errors found. This is useful for more detailed error reporting than Build() provides.
func (*Builder) WithCheckpointer ¶
func (b *Builder) WithCheckpointer(cp checkpoint.Checkpointer, runID string) *Builder
WithCheckpointer sets the checkpointer and run ID.
func (*Builder) WithExecutor ¶
WithExecutor sets a custom executor.
func (*Builder) WithNodeMiddleware ¶
func (b *Builder) WithNodeMiddleware(mw ...NodeMiddleware) *Builder
WithNodeMiddleware adds node-level middleware to the builder. Node middleware wraps each node execution and runs for every node. For middleware that should wrap the entire Run/Resume operation, use WithRunMiddleware.
func (*Builder) WithRunMiddleware ¶
func (b *Builder) WithRunMiddleware(mw ...RunMiddleware) *Builder
WithRunMiddleware adds run-level middleware to the builder. Run middleware wraps the entire Run/Resume operation, intercepting:
- Input before execution starts
- Output after execution completes
This is useful for:
- Input validation/guardrails (check user input once at start)
- Output validation/guardrails (check final output once at end)
- Logging/observability at the run level
- Request/response transformation
Middleware is applied in order: first added = outermost wrapper.
type CheckpointConfig ¶
type CheckpointConfig struct {
// Checkpointer handles state persistence.
Checkpointer checkpoint.Checkpointer
// RunID identifies this execution run for checkpointing.
RunID string
}
CheckpointConfig holds checkpointing configuration. This enables state persistence, fault tolerance, and resume capabilities.
type CheckpointStateError ¶
type CheckpointStateError struct {
UnknownKeys []string // Keys in checkpoint that are not registered in the graph
}
CheckpointStateError indicates that a checkpoint contains invalid state.
func (*CheckpointStateError) Error ¶
func (e *CheckpointStateError) Error() string
type Command ¶
type Command struct {
Updates Updates // State changes
Next []string // Next nodes to execute (or END)
}
Command is what a node returns: state updates and next targets.
DESIGN NOTE - High-Level Workflow Control: Command is a graph-layer abstraction that combines state updates (Updates) with routing decisions (Next). It provides workflow control semantics with BSP state management guarantees.
KEY CHARACTERISTICS:
- Specifies both state updates AND routing targets
- Graph-specific semantics (not generic like pregel.Message)
- Always uses Updates (map[string]any) for state changes
- Converted to pregel.Message[Updates] by executor adapter
RELATIONSHIP TO PREGEL LAYER: The executor adapter (pregelVertexAdapter.Run in executor.go) converts Command into pregel.Message[Updates] for BSP execution. The Command.Updates become Message.Data, and Command.Next becomes the Message.To field.
Create with:
- To("next") - just routing
- Set(key, val).To("next") - with updates
- Fail(err) - error
Example - simple routing:
return graph.To("next")
return graph.To(graph.END)
Example - with one update:
return graph.Set(StatusKey, "done").To("next")
Example - with multiple updates:
return graph.Set(Key1, val1).
With(graph.SetValue(Key2, val2)).
To("next")
Example - conditional:
if done {
return graph.To(graph.END)
}
return graph.Set(CountKey, count+1).To("process")
Example - error:
if err != nil {
return graph.Fail(err)
}
type CommandBuilder ¶
type CommandBuilder struct {
// contains filtered or unexported fields
}
CommandBuilder accumulates updates before creating a Command.
func Cmd ¶
func Cmd() *CommandBuilder
Cmd creates an empty CommandBuilder for incremental building. Use this when you need to build commands conditionally.
Example:
cmd := graph.Cmd()
if turn >= maxTurns {
return cmd.To(graph.END)
}
resp, err := model.Generate(ctx, req)
if err != nil {
return graph.Fail(err)
}
cmd.With(graph.SetValue(TurnKey, turn+1))
return graph.Reply(resp.Message).To("next")
func Reply ¶
func Reply(msg message.Message) *CommandBuilder
Reply creates a CommandBuilder that appends a message to the conversation history. This is a convenience function for the most common agent operation: adding a response.
Example:
// Simple reply and end
return graph.Reply(graph.NewAIMessageFromText("Hello!")).End()
// Reply and continue to next node
return graph.Reply(aiMessage).To("tools")
// Reply with additional state updates
return graph.Reply(aiMessage).With(graph.SetValue(turnKey, turn+1)).To("next")
func ReplyAll ¶
func ReplyAll(msgs ...message.Message) *CommandBuilder
ReplyAll creates a CommandBuilder that appends multiple messages to the conversation history. Useful when you need to add multiple messages at once (e.g., tool results).
Example:
toolResults := []graph.Message{result1, result2}
return graph.ReplyAll(toolResults...).To("agent")
func Set ¶
func Set[T any](key Key[T], value T) *CommandBuilder
Set creates a CommandBuilder with a typed value (starter function). The generic parameter T is inferred from Key[T], ensuring type safety.
For list keys with AppendReducer, pass a slice containing the new items. The reducer will merge them with the existing list.
Example:
graph.Set(statusKey, "done").To("next") // ✅ Type-safe
graph.Set(myListKey, []Item{item}).To("next") // ✅ Appends via reducer
func (*CommandBuilder) End ¶
func (b *CommandBuilder) End() (*Command, error)
End is shorthand for To(END).
Example:
return graph.Set(key, val).End()
func (*CommandBuilder) To ¶
func (b *CommandBuilder) To(targets ...string) (*Command, error)
To creates a Command with the accumulated updates and specified targets.
Example:
return graph.Set(key, val).To("next")
func (*CommandBuilder) With ¶
func (b *CommandBuilder) With(fn func(*CommandBuilder) *CommandBuilder) *CommandBuilder
With applies a type-safe update function to the builder. Use with SetValue, AppendValue for compile-time type checking.
Example:
cmd.With(graph.SetValue(statusKey, "done")) // ✅ Type-safe cmd.With(graph.SetValue(statusKey, 42)) // ❌ Compile error
type DistributedBackend ¶
type DistributedBackend interface {
// Send delivers state updates to target nodes in a distributed execution.
// This abstracts away the underlying message-passing mechanism.
Send(ctx context.Context, updates []StateUpdate) error
// Receive retrieves pending state updates for a specific node.
// Returns nil if no updates are pending.
Receive(ctx context.Context, node string) ([]StateUpdate, error)
// Clear removes all pending updates for a node.
Clear(ctx context.Context, node string) error
// Close releases resources held by the backend.
Close() error
}
DistributedBackend abstracts the underlying distributed execution mechanism. This interface hides Pregel implementation details from graph-layer users, allowing them to work with distributed execution without understanding supersteps, vertices, or message buses.
The graph package provides adapters to convert common backends (Redis, gRPC, etc.) into this interface, keeping the graph API clean and implementation-agnostic.
func NewPregelBackend ¶
func NewPregelBackend(bus pregel.MessageBus[Updates]) DistributedBackend
NewPregelBackend creates a DistributedBackend from a Pregel MessageBus. This adapter allows existing Pregel message buses (Redis, in-memory, etc.) to work with the graph-layer API without exposing Pregel types to users.
type ExecutionConfig ¶
type ExecutionConfig struct {
// Nodes contains all graph nodes indexed by name.
Nodes map[string]ExecutorNode
// EntryPoints are the starting nodes for execution.
EntryPoints []string
// NodeMiddleware wraps node execution.
// This runs for every node during graph execution.
NodeMiddleware []NodeMiddleware
// Store provides state storage.
Store Store
// KeyRegistry holds type-erased reducers for state merging.
KeyRegistry KeyRegistry
}
ExecutionConfig holds node execution configuration. This includes the graph structure, entry points, middleware, and output settings. Types are fixed for Message-based agent workflows.
type Executor ¶
type Executor interface {
Run(ctx context.Context, cfg *ExecutorConfig, input []message.Message, opts ...runOption) iter.Seq2[message.Message, error]
}
Executor runs the graph.
type ExecutorConfig ¶
type ExecutorConfig struct {
// Execution contains node and execution settings.
Execution ExecutionConfig
// Checkpoint contains checkpointing settings.
Checkpoint CheckpointConfig
// Interrupt contains interrupt settings for human-in-the-loop workflows.
Interrupt InterruptConfig
}
ExecutorConfig provides the executor with graph configuration. It composes focused configuration structs for better separation of concerns. Types are fixed for Message-based agent workflows.
type ExecutorNode ¶
ExecutorNode represents a node for the executor. Output type is fixed to Message.
type FirstReducer ¶
type FirstReducer[T comparable] struct{}
FirstReducer keeps the earliest non-zero value.
func (FirstReducer[T]) Reduce ¶
func (FirstReducer[T]) Reduce(existing, incoming T) T
Reduce returns existing if non-zero, otherwise incoming.
type Graph ¶
type Graph struct {
// contains filtered or unexported fields
}
Graph is an executable workflow with immutable structure. Created by calling Build() on a Builder.
func (*Graph) GetMetrics ¶
GetMetrics returns static graph metrics.
func (*Graph) GetNodeInfo ¶
GetNodeInfo returns detailed information about a specific node.
func (*Graph) GetTopology ¶
GetTopology returns a comprehensive view of the compiled graph structure.
func (*Graph) MermaidFlowchart ¶
MermaidFlowchart generates a Mermaid flowchart representation. Direction can be "TD" (top-down), "LR" (left-right), "BT", "RL".
func (*Graph) Resume ¶
func (g *Graph) Resume(ctx context.Context, runID string, opts ...ResumeOption) iter.Seq2[message.Message, error]
Resume continues execution from a checkpoint without providing new input. This is the correct way to resume a paused graph - the checkpoint state is restored without being overwritten by a zero-value input.
Parameters:
- runID: The run ID for checkpointing (required)
- opts: Optional resume options for human-in-the-loop workflows
Example:
// Resume from the latest checkpoint (auto-restore)
for output, err := range compiled.Resume(ctx, runID) {
// process output
}
// Resume from a specific checkpoint
savedCp, _ := checkpointer.Load(ctx, runID)
for output, err := range compiled.Resume(ctx, runID, graph.WithCheckpoint(savedCp)) {
// process output
}
// Resume with human input (human-in-the-loop)
for output, err := range compiled.Resume(ctx, runID,
graph.WithCheckpoint(savedCp),
graph.WithResumeValue("wait_node", answerKey.Name(), "user input"),
) {
// process output
}
type InputMapper ¶
InputMapper maps parent graph state to subgraph input. Used with Subgraph to transform parent state into the input type expected by the child graph. Uses ReadOnlyScope (not Scope) since it's a read-only operation.
type InterruptConfig ¶
type InterruptConfig struct {
// Before maps node names to interrupt configs that trigger before node execution.
Before map[string]*interruptConfig
// After maps node names to interrupt configs that trigger after node execution.
After map[string]*interruptConfig
}
InterruptConfig holds interrupt configuration for human-in-the-loop workflows. Interrupts pause execution to await human approval before or after specific nodes.
type InterruptError ¶
InterruptError signals that execution has paused for approval.
func (*InterruptError) Error ¶
func (e *InterruptError) Error() string
type InterruptOption ¶
type InterruptOption func(*interruptConfig)
InterruptOption configures an interrupt.
func WithApprovalGuard ¶
func WithApprovalGuard(guard ApprovalGuard) InterruptOption
WithApprovalGuard sets a guard function for the interrupt.
func WithFeedbackAnnotation ¶
func WithFeedbackAnnotation(enabled bool) InterruptOption
WithFeedbackAnnotation enables recording approval in message history.
type Key ¶
type Key[T any] struct { // contains filtered or unexported fields }
Key defines a typed state channel with an associated reducer. The reducer determines how values are merged during state updates.
func NewCounterKey ¶
NewCounterKey creates a counter key with Sum semantics.
func NewListKey ¶
NewListKey creates a list state key with Append semantics by default. The key stores []T and appends incoming slices.
func NewMapKey ¶
func NewMapKey[K comparable, V any](name string, opts ...KeyOption[map[K]V]) Key[map[K]V]
NewMapKey creates a map key with MergeMap semantics.
func (Key[T]) ReducerFunc ¶
func (k Key[T]) ReducerFunc() ReducerFunc
ReducerFunc returns the type-erased reducer for runtime use.
type KeyOption ¶
KeyOption configures a Key.
func WithReducer ¶
WithReducer sets a custom reducer for the key.
type KeyRegistry ¶
type KeyRegistry map[string]ReducerFunc
KeyRegistry holds type-erased reducers for all registered keys.
func NewKeyRegistry ¶
func NewKeyRegistry() KeyRegistry
NewKeyRegistry creates a new empty KeyRegistry.
func (KeyRegistry) Register ¶
func (r KeyRegistry) Register(name string, reducer ReducerFunc)
Register adds a reducer for a key name.
type LastReducer ¶
type LastReducer[T any] = ReplaceReducer[T]
LastReducer is an alias for ReplaceReducer - keeps the most recent value.
type ManagedValue ¶
type ManagedValue interface {
// Name returns the unique identifier for this managed value.
Name() string
// Descriptor returns metadata stored in checkpoints for validation.
Descriptor() checkpoint.ManagedValueDescriptor
// Rehydrate refreshes the managed value after checkpoint restore. Default is
// no-op. Implementations should be idempotent.
Rehydrate(ctx context.Context) error
}
ManagedValue is the base interface for all managed values. This non-generic interface allows managed values to be passed to WithManagedValues.
Managed values represent ephemeral runtime state that is NOT included in checkpoints. Use managed values for:
- Runtime configuration (API keys, timeouts, feature flags)
- Session state (auth tokens, user sessions)
- Metrics collectors (runtime statistics)
- Resource handles (connections, caches)
- Computed values (derived state)
Unlike regular state keys, managed values:
- Are NOT persisted to checkpoints
- Are lost on process restart
- Must be reinitialized at runtime
- Are perfect for ephemeral/sensitive data
type ManagedValueAccessor ¶
type ManagedValueAccessor[T any] interface { ManagedValue // Get retrieves the current value. Get(ctx context.Context) (T, error) // Set updates the value (for mutable managed values). Set(ctx context.Context, value T) error }
ManagedValueAccessor is the generic interface for type-safe access to managed values.
type ManagedValueError ¶
type ManagedValueError struct {
// MissingValues contains the names of the missing managed values.
MissingValues []string
// IsRequired indicates if these are required (vs checkpoint-related) values.
IsRequired bool
}
ManagedValueError represents errors related to managed values. Use errors.As to extract the MissingValues field for programmatic handling.
func (*ManagedValueError) Error ¶
func (e *ManagedValueError) Error() string
Error implements the error interface.
func (*ManagedValueError) Is ¶
func (e *ManagedValueError) Is(target error) bool
Is enables comparison with sentinel errors.
type ManagedValueOption ¶
type ManagedValueOption func(*managedValueConfig)
ManagedValueOption configures metadata or lifecycle hooks for a managed value.
func WithManagedValueRehydrator ¶
func WithManagedValueRehydrator(fn func(context.Context) error) ManagedValueOption
WithManagedValueRehydrator registers a callback that rebuilds the managed value after checkpoint restores. The callback runs before execution resumes whenever the checkpoint lists this managed value. Use it to re-open network connections or refresh credentials.
func WithManagedValueRequired ¶
func WithManagedValueRequired() ManagedValueOption
WithManagedValueRequired marks the managed value as required when resuming from checkpoints. Missing required values during restore will abort execution.
type ManagedValueProvider ¶
type ManagedValueProvider[T any] struct { // contains filtered or unexported fields }
ManagedValueProvider computes its value dynamically with optional caching. Use this for derived state, expensive computations, or values that need refresh.
func NewManagedValueProvider ¶
func NewManagedValueProvider[T any](name string, provider func(ctx context.Context) (T, error), opts ...ManagedValueProviderOption) *ManagedValueProvider[T]
NewManagedValueProvider creates a managed value that computes its value dynamically. By default, the provider is called on every access. Use WithCacheTTL to enable caching.
Example:
// Always fresh (no caching)
currentTimeMV := graph.NewManagedValueProvider("current_time", func(ctx context.Context) (time.Time, error) {
return time.Now(), nil
})
// With caching (refreshes every 5 seconds)
cachedTimeMV := graph.NewManagedValueProvider("cached_time", func(ctx context.Context) (time.Time, error) {
return time.Now(), nil
}, graph.WithCacheTTL(5*time.Second))
func (*ManagedValueProvider[T]) Descriptor ¶
func (p *ManagedValueProvider[T]) Descriptor() checkpoint.ManagedValueDescriptor
Descriptor returns the checkpoint metadata for this managed value.
func (*ManagedValueProvider[T]) Get ¶
func (p *ManagedValueProvider[T]) Get(ctx context.Context) (T, error)
Get retrieves the value, using cache if enabled and not expired.
func (*ManagedValueProvider[T]) Invalidate ¶
func (p *ManagedValueProvider[T]) Invalidate()
Invalidate clears the cache, forcing a refresh on next Get. This is a no-op if caching is not enabled.
func (*ManagedValueProvider[T]) Name ¶
func (p *ManagedValueProvider[T]) Name() string
Name returns the unique identifier for this managed value.
type ManagedValueProviderOption ¶
type ManagedValueProviderOption func(*managedValueProviderConfig)
ManagedValueProviderOption configures a ManagedValueProvider.
func WithCacheTTL ¶
func WithCacheTTL(ttl time.Duration) ManagedValueProviderOption
WithCacheTTL enables caching with the specified TTL. Without this option, the provider is called on every access.
Example:
// Cache for 5 minutes
mv := graph.NewManagedValueProvider("config", fetchConfig, graph.WithCacheTTL(5*time.Minute))
func WithProviderManagedValueOptions ¶
func WithProviderManagedValueOptions(opts ...ManagedValueOption) ManagedValueProviderOption
WithProviderManagedValueOptions applies generic managed value options when constructing a ManagedValueProvider (e.g., mark as required, register rehydrators).
type ManagedValueRegistry ¶
type ManagedValueRegistry struct {
// contains filtered or unexported fields
}
ManagedValueRegistry holds managed values for a graph execution (internal).
func NewManagedValueRegistry ¶
func NewManagedValueRegistry() *ManagedValueRegistry
NewManagedValueRegistry creates a new registry for managed values.
type MaxReducer ¶
MaxReducer keeps the larger value.
func (MaxReducer[T]) Reduce ¶
func (MaxReducer[T]) Reduce(existing, incoming T) T
Reduce returns the maximum of existing and incoming.
type MergeMapReducer ¶
type MergeMapReducer[K comparable, V any] struct{}
MergeMapReducer unions two maps; later keys overwrite earlier.
func (MergeMapReducer[K, V]) Reduce ¶
func (MergeMapReducer[K, V]) Reduce(existing, incoming map[K]V) map[K]V
Reduce merges incoming map into existing map.
func (MergeMapReducer[K, V]) Zero ¶
func (MergeMapReducer[K, V]) Zero() map[K]V
Zero returns nil (empty map).
type Metrics ¶
type Metrics struct {
TotalNodes int `json:"total_nodes"`
TotalEdges int `json:"total_edges"`
AverageFanOut float64 `json:"average_fan_out"`
MaxFanOut int `json:"max_fan_out"`
AverageFanIn float64 `json:"average_fan_in"`
MaxFanIn int `json:"max_fan_in"`
CyclomaticComplexity int `json:"cyclomatic_complexity"`
NodesByType map[string]int `json:"nodes_by_type"`
}
Metrics provides static graph metrics.
type MinReducer ¶
MinReducer keeps the smaller value.
func (MinReducer[T]) Reduce ¶
func (MinReducer[T]) Reduce(existing, incoming T) T
Reduce returns the minimum of existing and incoming.
type Namespace ¶
type Namespace struct {
// contains filtered or unexported fields
}
Namespace represents a state namespace for isolation.
func NewNamespace ¶
NewNamespace creates a namespace with the given name.
type NodeFunc ¶
NodeFunc is the typed signature for node logic. Output type is fixed to Message for agent workflows. Read state via Scope, optionally stream partial outputs, return a Command.
Example:
func myNode(ctx context.Context, scope graph.Scope) (*graph.Command, error) {
messages := scope.Messages()
scope.Stream(partialMessage) // Stream partial output
return graph.Reply(finalMessage).End()
}
func Compose ¶
Compose combines multiple wrappers around a NodeFunc. Wrappers are applied right-to-left (last wrapper runs first).
Example:
g.Node("fetch", graph.Compose(
fetchNode,
func(fn graph.NodeFunc) graph.NodeFunc { return graph.WithRetry(fn, policy) },
func(fn graph.NodeFunc) graph.NodeFunc { return graph.WithNamespace(fn, ns, false) },
), "next")
This is equivalent to: WithRetry(WithNamespace(fetchNode, ns, false), policy)
func Subgraph ¶
func Subgraph( sub *Graph, inputMapper InputMapper, outputMapper OutputMapper, ) NodeFunc
Subgraph creates a NodeFunc that executes a compiled subgraph. Use with Node() to add a subgraph while maintaining the fluent builder pattern:
child, _ := graph.New(...).Build()
parent.Node("validate", graph.Subgraph(child,
func(ctx context.Context, view graph.ReadOnlyScope) ([]graph.Message, error) {
return graph.GetMessages(view), nil
},
func(ctx context.Context, out graph.Message) (graph.Updates, error) {
return graph.Updates{graph.MessagesKeyName: []graph.Message{out}}, nil
},
), "next")
The InputMapper transforms parent state into subgraph input. The OutputMapper transforms subgraph output into parent state updates.
func WithNamespace ¶
WithNamespace wraps a NodeFunc to filter state access by namespace. The wrapped function only sees keys from the specified namespace. If includeGlobal is true, global (non-namespaced) keys are also visible.
Example:
agentNS := graph.NewNamespace("agent1")
g.Node("agent1", graph.WithNamespace(agentNode, agentNS, false), "next")
func WithRetry ¶
func WithRetry(fn NodeFunc, policy *RetryPolicy) NodeFunc
WithRetry wraps a NodeFunc with retry logic. On failure, retries according to the policy with exponential backoff.
Example:
g.Node("fetch", graph.WithRetry(fetchNode, graph.DefaultRetryPolicy()), "process")
type NodeInfo ¶
type NodeInfo struct {
Name string `json:"name"`
Type string `json:"type"` // "standard", "entry", "terminal"
IncomingEdges int `json:"incoming_edges"`
OutgoingEdges int `json:"outgoing_edges"`
DeclaredTargets []string `json:"declared_targets,omitempty"`
IsEntryPoint bool `json:"is_entry_point"`
HasInterrupt bool `json:"has_interrupt"`
}
NodeInfo contains metadata about a node in the graph.
type NodeMiddleware ¶
NodeMiddleware wraps node execution. This runs for every node during graph execution. Output type is fixed to Message for agent workflows.
func ChainNodeMiddleware ¶
func ChainNodeMiddleware(middleware ...NodeMiddleware) NodeMiddleware
ChainNodeMiddleware combines multiple node middleware into one. Middleware are applied in order, so the first middleware is the outermost layer.
Example:
import graphmw "github.com/hupe1980/agentmesh/pkg/graph/middleware"
combined := graph.ChainNodeMiddleware(
graphmw.LoggingMiddleware(logger),
graphmw.TimingMiddleware(metricsCallback),
graphmw.RecoveryMiddleware(panicHandler),
)
graph.WithNodeMiddleware(combined)
This produces: logging(timing(recovery(node))) Execution flows: logging → timing → recovery → node
type OutputMapper ¶
OutputMapper maps subgraph output to parent graph state updates. Used with Subgraph to transform child graph output into state updates for the parent.
type PregelExecutor ¶
type PregelExecutor struct {
// contains filtered or unexported fields
}
PregelExecutor executes graphs using the Pregel BSP runtime.
func NewPregelExecutor ¶
func NewPregelExecutor() *PregelExecutor
NewPregelExecutor creates a new Pregel executor with default settings. Uses DefaultMaxWorkers (4) and DefaultMaxSteps (100).
func (*PregelExecutor) Run ¶
func (e *PregelExecutor) Run(ctx context.Context, cfg *ExecutorConfig, input []message.Message, opts ...runOption) iter.Seq2[message.Message, error]
Run executes the graph using the Pregel BSP runtime.
func (*PregelExecutor) WithBackend ¶
func (e *PregelExecutor) WithBackend(backend DistributedBackend) *PregelExecutor
WithBackend sets a custom distributed backend for multi-node execution. The backend abstracts the underlying message-passing mechanism, allowing the graph to run across multiple machines without exposing Pregel concepts.
Use NewPregelBackend() to adapt existing Pregel MessageBus implementations.
func (*PregelExecutor) WithMaxSteps ¶
func (e *PregelExecutor) WithMaxSteps(maxSteps int) *PregelExecutor
WithMaxSteps sets the maximum number of execution steps (iterations).
func (*PregelExecutor) WithMaxWorkers ¶
func (e *PregelExecutor) WithMaxWorkers(n int) *PregelExecutor
WithMaxWorkers sets the maximum number of parallel workers.
type PrependReducer ¶
type PrependReducer[T any] struct{}
PrependReducer inserts incoming slice at the front of existing slice.
func (PrependReducer[T]) Reduce ¶
func (PrependReducer[T]) Reduce(existing, incoming []T) []T
Reduce prepends incoming slice before existing slice.
type ReadOnlyScope ¶
type ReadOnlyScope interface {
// NodeName returns the name of the currently executing node.
// Returns empty string if not in a node execution context.
NodeName() string
// GetValue returns the raw value for a key name.
GetValue(name string) (any, bool)
// Messages returns the current conversation history.
// This is the primary way to access the message list.
Messages() []message.Message
// LastMessage returns the most recent message in the conversation history.
// Returns nil if there are no messages.
LastMessage() message.Message
// ManagedValues returns the managed values registry, or nil if not configured.
ManagedValues() *ManagedValueRegistry
// ToMap returns regular state values as a map for template rendering.
// Only includes checkpointed state values, NOT managed values.
// Managed values (ephemeral runtime state) are excluded for safety.
ToMap() map[string]any
}
ReadOnlyScope provides read-only access to state and execution context. This is the base interface that Scope embeds.
type Reducer ¶
type Reducer[T any] interface { // Zero returns the identity element for T (used when no prior value exists). Zero() T // Reduce merges an incoming value into the existing state. Reduce(existing, incoming T) T }
Reducer specifies how values are combined for a state key. Implementations must be deterministic and ideally commutative for parallel writes.
type ReducerFunc ¶
type ReducerFunc struct {
// ZeroFn returns the zero value for this reducer.
ZeroFn func() any
// ReduceFn merges incoming into existing and returns the result.
ReduceFn func(existing, incoming any) any
}
ReducerFunc is a type-erased reducer function for runtime use. It wraps the generic Reducer[T] for storage in maps and runtime dispatch.
func WrapReducer ¶
func WrapReducer[T any](r Reducer[T]) ReducerFunc
WrapReducer creates a type-erased ReducerFunc from a generic Reducer[T]. This is used at graph build time to store reducers in a registry.
type RehydrateError ¶
type RehydrateError struct {
// Name is the name of the managed value that failed to rehydrate.
Name string
// Cause is the underlying error.
Cause error
}
RehydrateError represents an error that occurred while rehydrating a managed value. Use errors.As to extract the Name and Cause fields for programmatic handling.
func (*RehydrateError) Error ¶
func (e *RehydrateError) Error() string
Error implements the error interface.
func (*RehydrateError) Is ¶
func (e *RehydrateError) Is(target error) bool
Is enables comparison with sentinel errors.
func (*RehydrateError) Unwrap ¶
func (e *RehydrateError) Unwrap() error
Unwrap returns the underlying error for errors.Unwrap.
type ReplaceReducer ¶
type ReplaceReducer[T any] struct{}
ReplaceReducer always returns the incoming value (last-write-wins). This is the default reducer for scalar keys.
func (ReplaceReducer[T]) Reduce ¶
func (ReplaceReducer[T]) Reduce(_, incoming T) T
Reduce returns the incoming value, discarding the existing value.
type ResumeOption ¶
type ResumeOption interface {
// contains filtered or unexported methods
}
ResumeOption is an option that can be passed to Resume().
func WithApproval ¶
func WithApproval(nodeName string, approval *ApprovalResponse) ResumeOption
WithApproval provides an approval response for a node interrupt. This is a Resume-only option - approvals are provided when continuing after an interrupt.
func WithCheckpoint ¶
func WithCheckpoint(cp *checkpoint.Checkpoint) ResumeOption
WithCheckpoint resumes from a specific checkpoint instead of auto-restoring from the latest checkpoint. This is a Resume-only option.
Example:
savedCp, _ := checkpointer.Load(ctx, runID) compiled.Resume(ctx, runID, graph.WithCheckpoint(savedCp))
func WithResumeValue ¶
func WithResumeValue(nodeName string, key string, value any) ResumeOption
WithResumeValue is a convenience function that sets a state value and auto-approves a node for simple human-input scenarios. This is a Resume-only option.
Example:
compiled.Resume(ctx, runID,
graph.WithResumeValue("wait_for_answer", answerKey.Name(), "Paris"),
)
func WithStateUpdates ¶
func WithStateUpdates(updates map[string]any) ResumeOption
WithStateUpdates applies state updates when resuming. This is a Resume-only option for human-in-the-loop workflows.
Example:
savedCp, _ := checkpointer.Load(ctx, runID)
compiled.Resume(ctx, savedCp, runID,
graph.WithStateUpdates(map[string]any{"answer": "Paris"}),
graph.WithApproval("wait_node", approval),
)
type RetryPolicy ¶
type RetryPolicy struct {
MaxAttempts int
Delay time.Duration
MaxDelay time.Duration
Multiplier float64
Retryable func(error) bool // Optional: determines if error should trigger retry
}
RetryPolicy configures automatic retry behavior for node execution.
func DefaultRetryPolicy ¶
func DefaultRetryPolicy() *RetryPolicy
DefaultRetryPolicy returns a sensible default retry policy. Uses DefaultRetryMaxAttempts (3), DefaultRetryDelay (100ms), DefaultRetryMaxDelay (5s), and DefaultRetryMultiplier (2.0).
type RetryPolicyBuilder ¶
type RetryPolicyBuilder struct {
// contains filtered or unexported fields
}
RetryPolicyBuilder provides a fluent API for constructing retry policies.
Example:
policy := graph.NewRetryPolicyBuilder().
WithMaxAttempts(5).
WithExponentialBackoff(time.Second, 2.0).
Build()
func NewRetryPolicyBuilder ¶
func NewRetryPolicyBuilder() *RetryPolicyBuilder
NewRetryPolicyBuilder creates a new retry policy builder with sensible defaults:
- MaxAttempts: DefaultRetryMaxAttempts (3)
- Delay: DefaultRetryDelay (100ms base delay)
- MaxDelay: DefaultRetryMaxDelay (5s)
- Multiplier: DefaultRetryMultiplier (2.0 exponential backoff)
func (*RetryPolicyBuilder) Build ¶
func (b *RetryPolicyBuilder) Build() *RetryPolicy
Build creates the RetryPolicy.
func (*RetryPolicyBuilder) WithConstantBackoff ¶
func (b *RetryPolicyBuilder) WithConstantBackoff(delay time.Duration) *RetryPolicyBuilder
WithConstantBackoff configures constant delay between retries.
Example:
WithConstantBackoff(time.Second) // 1s, 1s, 1s, ...
func (*RetryPolicyBuilder) WithExponentialBackoff ¶
func (b *RetryPolicyBuilder) WithExponentialBackoff(delay time.Duration, multiplier float64) *RetryPolicyBuilder
WithExponentialBackoff configures exponential backoff. Wait time = delay * (multiplier ^ attempt).
Example:
WithExponentialBackoff(time.Second, 2.0) // 1s, 2s, 4s, 8s, ...
func (*RetryPolicyBuilder) WithLinearBackoff ¶
func (b *RetryPolicyBuilder) WithLinearBackoff(delay time.Duration) *RetryPolicyBuilder
WithLinearBackoff configures linear backoff. Wait time = delay * attempt.
Example:
WithLinearBackoff(time.Second) // 1s, 2s, 3s, 4s, ...
func (*RetryPolicyBuilder) WithMaxAttempts ¶
func (b *RetryPolicyBuilder) WithMaxAttempts(n int) *RetryPolicyBuilder
WithMaxAttempts sets the maximum number of execution attempts.
func (*RetryPolicyBuilder) WithMaxDelay ¶
func (b *RetryPolicyBuilder) WithMaxDelay(d time.Duration) *RetryPolicyBuilder
WithMaxDelay sets the maximum delay between retries.
func (*RetryPolicyBuilder) WithRetryableFunc ¶
func (b *RetryPolicyBuilder) WithRetryableFunc(fn func(error) bool) *RetryPolicyBuilder
WithRetryableFunc sets a function to determine if an error should trigger retry. If not set, all errors are considered retryable.
type RunFunc ¶
RunFunc is the function signature for graph execution. Input is []message.Message (conversation history), output is Message (response).
type RunMiddleware ¶
RunMiddleware wraps the entire graph execution (Run/Resume). Unlike node middleware which runs for every node, run middleware intercepts the input before execution starts and the final output after. This is useful for:
- Input validation/guardrails (check user input once at start)
- Output validation/guardrails (check final output once at end)
- Logging/observability at the run level
- Request/response transformation
func ChainRunMiddleware ¶
func ChainRunMiddleware(middleware ...RunMiddleware) RunMiddleware
ChainRunMiddleware combines multiple run middleware into one. Middleware are applied in order, so the first middleware is the outermost layer.
Example:
combined := graph.ChainRunMiddleware(
inputValidationMiddleware,
outputValidationMiddleware,
loggingMiddleware,
)
graph.WithRunMiddleware(combined)
This produces: inputValidation(outputValidation(logging(run))) Execution flows: inputValidation → outputValidation → logging → run
type RunOption ¶
type RunOption interface {
// contains filtered or unexported methods
}
RunOption is an option that can be passed to Run().
func WithInitialValue ¶
WithInitialValue sets an initial state value when starting graph execution. This is a Run-only option.
Example:
compiled.Run(ctx, messages,
graph.WithInitialValue(agent.SessionIDKey, "session-123"),
)
type RuntimeMetrics ¶
type RuntimeMetrics struct {
// CurrentSuperstep tracks the current iteration (superstep) number.
// In Pregel BSP model, each superstep represents one round of computation.
CurrentSuperstep int64
// CompletedNodes lists node names that have finished execution.
CompletedNodes []string
// PausedNodes lists node names that are currently paused (e.g., waiting for human input).
PausedNodes []string
// ResumingNodes lists node names that are being resumed from a paused state.
// These nodes should skip interrupt checks.
ResumingNodes []string
// ActiveNodes lists node names currently being executed.
ActiveNodes []string
// FailedNodes tracks nodes that encountered errors.
FailedNodes []string
// TotalMessages counts total messages sent between nodes.
TotalMessages int64
// ExecutionTimeNs tracks total execution time in nanoseconds.
ExecutionTimeNs int64
// contains filtered or unexported fields
}
RuntimeMetrics tracks execution metrics for a running or completed graph. Thread-safe for concurrent access during execution.
func NewRuntimeMetrics ¶
func NewRuntimeMetrics() *RuntimeMetrics
NewRuntimeMetrics creates a new runtime metrics tracker.
func (*RuntimeMetrics) AddActive ¶
func (rm *RuntimeMetrics) AddActive(nodeName string)
AddActive marks a node as actively executing.
func (*RuntimeMetrics) AddCompleted ¶
func (rm *RuntimeMetrics) AddCompleted(nodeName string)
AddCompleted marks a node as completed.
func (*RuntimeMetrics) AddFailed ¶
func (rm *RuntimeMetrics) AddFailed(nodeName string)
AddFailed marks a node as failed.
func (*RuntimeMetrics) AddMessage ¶
func (rm *RuntimeMetrics) AddMessage()
AddMessage increments the message counter.
func (*RuntimeMetrics) AddMessages ¶
func (rm *RuntimeMetrics) AddMessages(n int64)
AddMessages increments the message counter by n.
func (*RuntimeMetrics) AddPaused ¶
func (rm *RuntimeMetrics) AddPaused(nodeName string)
AddPaused marks a node as paused.
func (*RuntimeMetrics) ClearResuming ¶
func (rm *RuntimeMetrics) ClearResuming(nodeName string)
ClearResuming removes a node from the resuming list (after it executes).
func (*RuntimeMetrics) GetSuperstep ¶
func (rm *RuntimeMetrics) GetSuperstep() int64
GetSuperstep returns the current superstep number.
func (*RuntimeMetrics) IsResuming ¶
func (rm *RuntimeMetrics) IsResuming(nodeName string) bool
IsResuming checks if a node is currently being resumed.
func (*RuntimeMetrics) Reset ¶
func (rm *RuntimeMetrics) Reset()
Reset clears all metrics to initial state.
func (*RuntimeMetrics) ResumePaused ¶
func (rm *RuntimeMetrics) ResumePaused(nodeName string)
ResumePaused removes a node from the paused list and marks it as resuming.
func (*RuntimeMetrics) SetExecutionTime ¶
func (rm *RuntimeMetrics) SetExecutionTime(ns int64)
SetExecutionTime sets the total execution time.
func (*RuntimeMetrics) SetSuperstep ¶
func (rm *RuntimeMetrics) SetSuperstep(step int64)
SetSuperstep updates the current superstep number.
func (*RuntimeMetrics) Snapshot ¶
func (rm *RuntimeMetrics) Snapshot() RuntimeMetricsSnapshot
Snapshot creates a read-only snapshot of the current metrics.
type RuntimeMetricsSnapshot ¶
type RuntimeMetricsSnapshot struct {
CurrentSuperstep int64
CompletedNodes []string
PausedNodes []string
ResumingNodes []string
ActiveNodes []string
FailedNodes []string
TotalMessages int64
ExecutionTimeNs int64
}
RuntimeMetricsSnapshot is a read-only snapshot of runtime metrics.
type Scope ¶
type Scope interface {
ReadOnlyScope
// Stream emits a Message immediately to the graph's output iterator.
// Use for partial/streaming results during node execution.
Stream(value message.Message)
}
Scope provides the execution context for a node. It embeds ReadOnlyScope for state access and adds typed output streaming. Output type is fixed to Message for agent workflows.
func GetScope ¶
GetScope retrieves the Scope from context. Returns nil if scope is not available. This is primarily used by tools that need streaming access.
Example usage in a tool:
func (t *MyTool) Run(ctx context.Context, input string) (string, error) {
if scope := graph.GetScope(ctx); scope != nil {
scope.Stream(graph.NewAIMessageFromText("progress..."))
}
return result, nil
}
type SharedOption ¶
type SharedOption func(*runConfig)
SharedOption implements both RunOption and ResumeOption interfaces. This allows common options (like WithMaxConcurrency) to work with both Run() and Resume() without explicit conversion.
func WithCheckpointInterval ¶
func WithCheckpointInterval(interval int) SharedOption
WithCheckpointInterval sets how often checkpoints are saved. This option works with both Run and Resume.
func WithFailOnCheckpointError ¶
func WithFailOnCheckpointError(fail bool) SharedOption
WithFailOnCheckpointError configures checkpoint error handling. This option works with both Run and Resume.
func WithManagedValues ¶
func WithManagedValues(values ...ManagedValue) SharedOption
WithManagedValues attaches ephemeral runtime values to the graph execution. This option works with both Run and Resume.
Example:
compiled.Run(ctx, input, graph.WithManagedValues(apiKeyMV)) compiled.Resume(ctx, runID, graph.WithManagedValues(apiKeyMV))
func WithMaxConcurrency ¶
func WithMaxConcurrency(n int) SharedOption
WithMaxConcurrency sets the maximum number of nodes that can execute in parallel. This option works with both Run and Resume.
func WithMaxIterations ¶
func WithMaxIterations(n int) SharedOption
WithMaxIterations sets the maximum number of supersteps before stopping. This option works with both Run and Resume.
type SkipZeroReducer ¶
type SkipZeroReducer[T comparable, R Reducer[T]] struct { Inner R }
SkipZeroReducer wraps a reducer to skip zero-value inputs. This preserves the existing value when the incoming value is the zero value.
func NewSkipZeroReducer ¶
func NewSkipZeroReducer[T comparable, R Reducer[T]](inner R) SkipZeroReducer[T, R]
NewSkipZeroReducer creates a SkipZeroReducer wrapper around a reducer.
func (SkipZeroReducer[T, R]) Reduce ¶
func (r SkipZeroReducer[T, R]) Reduce(existing, incoming T) T
Reduce skips the incoming value if it's the zero value.
func (SkipZeroReducer[T, R]) Zero ¶
func (r SkipZeroReducer[T, R]) Zero() T
Zero delegates to the inner reducer.
type StateKey ¶
type StateKey interface {
// Name returns the unique name of this key.
Name() string
// ReducerFunc returns the type-erased reducer for this key.
ReducerFunc() ReducerFunc
// contains filtered or unexported methods
}
StateKey is the interface that all state keys must implement.
type StateUpdate ¶
type StateUpdate struct {
From string // Source node
To string // Target node
Data Updates // State changes
}
StateUpdate represents a state change being sent between nodes. This is the graph-layer equivalent of a Pregel message.
type StaticManagedValue ¶
type StaticManagedValue[T any] struct { // contains filtered or unexported fields }
StaticManagedValue provides thread-safe storage for ephemeral values. Values are stored in memory and NOT checkpointed.
func NewManagedValue ¶
func NewManagedValue[T any](name string, value T, opts ...ManagedValueOption) *StaticManagedValue[T]
NewManagedValue creates a managed value with a static value.
Example:
configMV := graph.NewManagedValue("runtime_config", &Config{APIKey: "sk_live_..."})
timeoutMV := graph.NewManagedValue("timeout", 30*time.Second)
func (*StaticManagedValue[T]) Descriptor ¶
func (m *StaticManagedValue[T]) Descriptor() checkpoint.ManagedValueDescriptor
Descriptor returns the checkpoint metadata for this managed value.
func (*StaticManagedValue[T]) Get ¶
func (m *StaticManagedValue[T]) Get(_ context.Context) (T, error)
Get retrieves the current value.
func (*StaticManagedValue[T]) Name ¶
func (m *StaticManagedValue[T]) Name() string
Name returns the unique identifier for this managed value.
type Store ¶
type Store interface {
Get(ctx context.Context, key string) (any, error)
Set(ctx context.Context, key string, value any) error
Delete(ctx context.Context, key string) error
}
Store interface for state persistence.
type SumReducer ¶
SumReducer adds numeric values.
func (SumReducer[T]) Reduce ¶
func (SumReducer[T]) Reduce(existing, incoming T) T
Reduce adds the incoming value to the existing value.
type Topology ¶
type Topology struct {
Nodes []NodeInfo `json:"nodes"`
Edges []EdgeInfo `json:"edges"`
EntryPoints []string `json:"entry_points"`
ExitPoints []string `json:"exit_points"` // Nodes that can route to END
}
Topology provides a complete view of the graph structure.
type ValidationError ¶
type ValidationError struct {
Type ValidationErrorType
Node string
Message string
}
ValidationError represents a graph validation error.
func (ValidationError) Error ¶
func (e ValidationError) Error() string
Error implements the error interface.
type ValidationErrorType ¶
type ValidationErrorType string
ValidationErrorType classifies validation errors.
const ( ErrorTypeCycle ValidationErrorType = "CYCLE" ErrorTypeDisconnected ValidationErrorType = "DISCONNECTED" ErrorTypeDuplicateKey ValidationErrorType = "DUPLICATE_KEY" ErrorTypeInvalidEntryNode ValidationErrorType = "INVALID_ENTRY_NODE" ErrorTypeInvalidEndNode ValidationErrorType = "INVALID_END_NODE" ErrorTypeMissingNode ValidationErrorType = "MISSING_NODE" ErrorTypeInvalidBranch ValidationErrorType = "INVALID_BRANCH" ErrorTypeInvalidEdge ValidationErrorType = "INVALID_EDGE" ErrorTypeDuplicateNode ValidationErrorType = "DUPLICATE_NODE" )
Validation error type constants.
type ValidationLevel ¶
type ValidationLevel int
ValidationLevel determines the strictness of validation.
const ( // ValidationLevelNone skips all validation. ValidationLevelNone ValidationLevel = iota // ValidationLevelBasic performs basic structural validation. ValidationLevelBasic // ValidationLevelStrict performs comprehensive validation including cycle detection. ValidationLevelStrict )
type ValidationOptions ¶
type ValidationOptions struct {
Level ValidationLevel
AllowCycles bool
AllowDisconnectedNodes bool
}
ValidationOptions configures graph validation behavior.
func DefaultValidationOptions ¶
func DefaultValidationOptions() ValidationOptions
DefaultValidationOptions returns the default validation configuration.
func StrictValidationOptions ¶
func StrictValidationOptions() ValidationOptions
StrictValidationOptions returns strict validation configuration.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package middleware provides reusable middleware for graph node execution.
|
Package middleware provides reusable middleware for graph node execution. |