workgraph

package
v0.0.0-...-b3453ef Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 9, 2025 License: MIT Imports: 6 Imported by: 0

Documentation

Overview

Package workgraph provides some low-level utilities for coordinating a number of workers that are all collaborating to produce different parts of some overall result, with dynamically-discovered dependencies between those workers.

Workers and requests form a bipartite graph. Every request has exactly one worker responsible for resolving it, and every worker is waiting for zero or one requests to be resolved. If worker A attempts to wait for a result that will be produced by worker B which is waiting for another result that A is responsible for then all requests in that chain immediately fail to avoid deadlock.

This is a "nuts-and-bolts" abstraction intended to be used as an implementation detail of a higher-level system, and is not intended to be treated as a cross-cutting concern that appears in another library's exported API. Use idomatic Go features like blocking calls or channels to represent relationships between concurrent work in larger scopes.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewRequest

func NewRequest[T any](responsibleWorker *Worker) (Resolver[T], Promise[T])

NewRequest begins a new request and returns both its resolver and its promise.

The given worker is initially responsible for resolving the request.

func OnceFunc

func OnceFunc[T any](f func(*Worker) (T, error)) func(*Worker) (T, error)

OnceFunc returns a function that, when called for the first time, will run f using a newly-created Worker, and then that and all subsequent calls will return whatever that function returns.

This is a convenience wrapper for Once in situations where the underlying RequestID is unimportant and so just a function pointer is sufficient, and where it's helpful to capture the function to run inside the result so it can be called from many different locations.

func WithNewAsyncWorker

func WithNewAsyncWorker(f func(*Worker), delegatedResults ...ResolverContainer)

WithNewSyncWorker is a helper wrapper around NewWorker for the common case of associating a new worker with a new goroutine.

The Worker passed to the given function is immediately responsible for any resolvers given as additional arguments, as if those had been passed to NewWorker.

Types

type AnyResolver

type AnyResolver interface {
	// contains filtered or unexported methods
}

AnyResolver is an interface implemented by all instantiations of the generic type Resolver, regardless of their result type.

This is used along with ResolverContainer to delegate resolvers from one worker to another, where it doesn't matter what value type each resolver has.

type ErrSelfDependency

type ErrSelfDependency struct {
	// RequestIDs are the identifiers of the requests included in the
	// dependency cycle. Callers may use this in conjunction with their own
	// records of the meaning of each request ID to return a higher-level error
	// that describes the set of requested operations that together caused the
	// problem.
	RequestIDs []RequestID
}

ErrSelfDependency is returned by Promise.Await if a direct or indirect self-dependency is created in the worker-and-request graph by this or some other call to Promise.Await.

All Await calls blocking on any result in the detected dependency cycle immediately fail with this error.

func (ErrSelfDependency) Error

func (err ErrSelfDependency) Error() string

type ErrUnresolved

type ErrUnresolved struct {
	// RequestID is the request that was unresolved. This is always the ID of
	// the request whose [Promise] the Await method was called on.
	RequestID RequestID
}

ErrUnresolved is returned by Promise.Await if the Worker responsible for resolving the request is garbage-collected before the request is resolved.

This suggests a bug in the implementation of the responsible worker, since it should ensure that all requests it is responsible for are either resolved or delegated to another worker before its Worker object goes out of scope.

func (ErrUnresolved) Error

func (err ErrUnresolved) Error() string

type Once

type Once[T any] struct {
	// contains filtered or unexported fields
}

Once is similar in principle to the standard library's sync.Once, but implemented in terms of the workgraph concepts so that it can provide similar guarantees such as detecting when a Once execution ends up indirectly depending on its own result.

func (*Once[T]) Do

func (o *Once[T]) Do(forWorker *Worker, f func(*Worker) (T, error)) (T, error)

Do calls the function f if and only if Do is being called for the first time on this instance of Once.

Given a specific instance of Once, only the first call with invoke the given f, even if f has a different value on each call.

forWorker is the handle for the worker that the result is requested on behalf of. The given function is called with its own separate Worker that is responsible for providing the return value. If f directly or indirectly causes another call to Do on the same Once then all affected calls will fail with ErrSelfDependency.

func (*Once[T]) RequestID

func (o *Once[T]) RequestID() RequestID

RequestID returns the identifier of the internal request that represents the completion of all calls to Once.Do on this object.

If Once.Do has not yet been called, this returns NoRequest to indicate that no request has started yet.

type Promise

type Promise[T any] struct {
	// contains filtered or unexported fields
}

Promise is a handle through which many different workers can wait for the result of a request to become available.

func (Promise[T]) Await

func (rc Promise[T]) Await(requestingWorker *Worker) (T, error)

Await blocks until the associated request has been resolved, or until a problem forces it to resolve with a usage error to avoid deadlocking.

type RequestID

type RequestID struct {
	// contains filtered or unexported fields
}

RequestID represents an opaque but comparable unique identifier for a request, whose resolver may or may not still be live.

RequestID values are used in some error types returned by this package when reporting situations that could cause deadlock. Callers can therefore maintain a lookup table from RequestID to some higher-level representation of the meaning of a request to allow including more relevant context in externally-facing error results.

Use Resolver.RequestID to find the identity of the request that a particular resolver is associated with.

var NoRequest RequestID

NoRequest is the zero value of RequestID, representing the absence of any request.

No actual request has this request ID; this value can be used, for example, when a caller asks for the request ID for a request that hasn't actually started yet, such as with Once.RequestID.

func (RequestID) Equal

func (rid RequestID) Equal(other RequestID) bool

Equal returns true if other is the same RequestID as the receiver.

This is equivalent to using the "==" operator to compare two values, but is implemented here to work better with libraries like Google's "go-cmp" which try to perform deep comparison when no Equal method is present.

func (RequestID) GoString

func (rid RequestID) GoString() string

func (RequestID) String

func (rid RequestID) String() string

String returns a human-oriented string representation of the result ID.

This is intended for debug messages only. Do not use the result as a unique key for a RequestID; this type is comparable so it can act as its own unique key.

type Resolver

type Resolver[T any] struct {
	// contains filtered or unexported fields
}

A Resolver is used by the Worker that is responsible for resolving a request to report its result, thereby unblocking any other workers that are waiting for its resolution.

func (Resolver[T]) ContainedResolvers

func (r Resolver[T]) ContainedResolvers() iter.Seq[AnyResolver]

ContainedResolvers implements ResolverContainer, reporting the reciever itself as the only resolver in the container.

func (Resolver[T]) Report

func (r Resolver[T]) Report(resolvingWorker *Worker, val T, err error)

Report resolves the request with both a result value and an error, both of which will be returned from any Promise.Await calls for the associated request.

func (Resolver[T]) ReportError

func (r Resolver[T]) ReportError(resolvingWorker *Worker, err error)

ReportSuccess is a helper for Resolver.Report which automatically sets the value part of the result to the zero value of T, suggesting an error result without any useful accompanying error.

func (Resolver[T]) ReportSuccess

func (r Resolver[T]) ReportSuccess(resolvingWorker *Worker, val T)

ReportSuccess is a helper for Resolver.Report which automatically sets the error to nil, suggesting a successful result.

func (Resolver[T]) RequestID

func (r Resolver[T]) RequestID() RequestID

RequestID returns a unique identifier for the request that this resolver belongs to.

This can be compared with RequestID values in errors returned by this library in situations that would otherwise cause a deadlock.

type ResolverContainer

type ResolverContainer interface {
	ContainedResolvers() iter.Seq[AnyResolver]
}

ResolverContainer is implemented by types that in some sense "contain" Resolver objects, allowing the responsibility for all of those results to be passed in aggregate to a new task when calling NewWorker.

*Resolver itself implements this interface, so callers with no need for any higher-level aggregation can pass individual Resolver values directly instead of implementing this interface themselves.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

A Worker represents a specific linear codepath that will ultimately resolve zero or more results.

It's ultimately up to the caller to decide what exactly "linear codepath" means. The simplest mental model is for each Worker to belong to one goroutine and for that worker to go out of scope once the goroutine exits, with no other goroutine interacting with it.

However, the only hard constraint is that each worker can only be waiting on zero or one results at a time, and so two goroutines can potentially share a single worker as long as they somehow arrange for at most one of them to interact with the worker at a time.

A Worker object must be kept live (i.e. not garbage collected) until it has either provided or delegated all of the results that it's responsible for, or else those results will all fail with an error. This is a best-effort mechanism to reclaim other workers that could otherwise be blocked indefinitely, but should not be relied on in any "happy path" because the Go garbage collection details are intentionally underspecified to allow for future improvements.

func NewWorker

func NewWorker(delegatedResolvers ...ResolverContainer) *Worker

NewWorker allocates a new Worker, optionally transferring responsibility for resolving some requests.

Callers are responsible for ensuring that a caller passing resolvers to this function was previously considered to be responsible for those resolvers. Although there are no immediate checks that the caller was already responsible for the given requests (the relationship between codepaths and workers is the caller's concern), incorrect use of this can potentially be detected later if the previous responsible worker subsequently attempts to resolve the request that was delegated.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL