Documentation
¶
Overview ¶
Package workgraph provides some low-level utilities for coordinating a number of workers that are all collaborating to produce different parts of some overall result, with dynamically-discovered dependencies between those workers.
Workers and requests form a bipartite graph. Every request has exactly one worker responsible for resolving it, and every worker is waiting for zero or one requests to be resolved. If worker A attempts to wait for a result that will be produced by worker B which is waiting for another result that A is responsible for then all requests in that chain immediately fail to avoid deadlock.
This is a "nuts-and-bolts" abstraction intended to be used as an implementation detail of a higher-level system, and is not intended to be treated as a cross-cutting concern that appears in another library's exported API. Use idomatic Go features like blocking calls or channels to represent relationships between concurrent work in larger scopes.
Index ¶
- func NewRequest[T any](responsibleWorker *Worker) (Resolver[T], Promise[T])
- func OnceFunc[T any](f func(*Worker) (T, error)) func(*Worker) (T, error)
- func WithNewAsyncWorker(f func(*Worker), delegatedResults ...ResolverContainer)
- type AnyResolver
- type ErrSelfDependency
- type ErrUnresolved
- type Once
- type Promise
- type RequestID
- type Resolver
- func (r Resolver[T]) ContainedResolvers() iter.Seq[AnyResolver]
- func (r Resolver[T]) Report(resolvingWorker *Worker, val T, err error)
- func (r Resolver[T]) ReportError(resolvingWorker *Worker, err error)
- func (r Resolver[T]) ReportSuccess(resolvingWorker *Worker, val T)
- func (r Resolver[T]) RequestID() RequestID
- type ResolverContainer
- type Worker
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewRequest ¶
NewRequest begins a new request and returns both its resolver and its promise.
The given worker is initially responsible for resolving the request.
func OnceFunc ¶
OnceFunc returns a function that, when called for the first time, will run f using a newly-created Worker, and then that and all subsequent calls will return whatever that function returns.
This is a convenience wrapper for Once in situations where the underlying RequestID is unimportant and so just a function pointer is sufficient, and where it's helpful to capture the function to run inside the result so it can be called from many different locations.
func WithNewAsyncWorker ¶
func WithNewAsyncWorker(f func(*Worker), delegatedResults ...ResolverContainer)
WithNewSyncWorker is a helper wrapper around NewWorker for the common case of associating a new worker with a new goroutine.
The Worker passed to the given function is immediately responsible for any resolvers given as additional arguments, as if those had been passed to NewWorker.
Types ¶
type AnyResolver ¶
type AnyResolver interface {
// contains filtered or unexported methods
}
AnyResolver is an interface implemented by all instantiations of the generic type Resolver, regardless of their result type.
This is used along with ResolverContainer to delegate resolvers from one worker to another, where it doesn't matter what value type each resolver has.
type ErrSelfDependency ¶
type ErrSelfDependency struct {
// RequestIDs are the identifiers of the requests included in the
// dependency cycle. Callers may use this in conjunction with their own
// records of the meaning of each request ID to return a higher-level error
// that describes the set of requested operations that together caused the
// problem.
RequestIDs []RequestID
}
ErrSelfDependency is returned by Promise.Await if a direct or indirect self-dependency is created in the worker-and-request graph by this or some other call to Promise.Await.
All Await calls blocking on any result in the detected dependency cycle immediately fail with this error.
func (ErrSelfDependency) Error ¶
func (err ErrSelfDependency) Error() string
type ErrUnresolved ¶
type ErrUnresolved struct {
// RequestID is the request that was unresolved. This is always the ID of
// the request whose [Promise] the Await method was called on.
RequestID RequestID
}
ErrUnresolved is returned by Promise.Await if the Worker responsible for resolving the request is garbage-collected before the request is resolved.
This suggests a bug in the implementation of the responsible worker, since it should ensure that all requests it is responsible for are either resolved or delegated to another worker before its Worker object goes out of scope.
func (ErrUnresolved) Error ¶
func (err ErrUnresolved) Error() string
type Once ¶
type Once[T any] struct { // contains filtered or unexported fields }
Once is similar in principle to the standard library's sync.Once, but implemented in terms of the workgraph concepts so that it can provide similar guarantees such as detecting when a Once execution ends up indirectly depending on its own result.
func (*Once[T]) Do ¶
Do calls the function f if and only if Do is being called for the first time on this instance of Once.
Given a specific instance of Once, only the first call with invoke the given f, even if f has a different value on each call.
forWorker is the handle for the worker that the result is requested on behalf of. The given function is called with its own separate Worker that is responsible for providing the return value. If f directly or indirectly causes another call to Do on the same Once then all affected calls will fail with ErrSelfDependency.
type Promise ¶
type Promise[T any] struct { // contains filtered or unexported fields }
Promise is a handle through which many different workers can wait for the result of a request to become available.
type RequestID ¶
type RequestID struct {
// contains filtered or unexported fields
}
RequestID represents an opaque but comparable unique identifier for a request, whose resolver may or may not still be live.
RequestID values are used in some error types returned by this package when reporting situations that could cause deadlock. Callers can therefore maintain a lookup table from RequestID to some higher-level representation of the meaning of a request to allow including more relevant context in externally-facing error results.
Use Resolver.RequestID to find the identity of the request that a particular resolver is associated with.
var NoRequest RequestID
NoRequest is the zero value of RequestID, representing the absence of any request.
No actual request has this request ID; this value can be used, for example, when a caller asks for the request ID for a request that hasn't actually started yet, such as with Once.RequestID.
func (RequestID) Equal ¶
Equal returns true if other is the same RequestID as the receiver.
This is equivalent to using the "==" operator to compare two values, but is implemented here to work better with libraries like Google's "go-cmp" which try to perform deep comparison when no Equal method is present.
type Resolver ¶
type Resolver[T any] struct { // contains filtered or unexported fields }
A Resolver is used by the Worker that is responsible for resolving a request to report its result, thereby unblocking any other workers that are waiting for its resolution.
func (Resolver[T]) ContainedResolvers ¶
func (r Resolver[T]) ContainedResolvers() iter.Seq[AnyResolver]
ContainedResolvers implements ResolverContainer, reporting the reciever itself as the only resolver in the container.
func (Resolver[T]) Report ¶
Report resolves the request with both a result value and an error, both of which will be returned from any Promise.Await calls for the associated request.
func (Resolver[T]) ReportError ¶
ReportSuccess is a helper for Resolver.Report which automatically sets the value part of the result to the zero value of T, suggesting an error result without any useful accompanying error.
func (Resolver[T]) ReportSuccess ¶
ReportSuccess is a helper for Resolver.Report which automatically sets the error to nil, suggesting a successful result.
type ResolverContainer ¶
type ResolverContainer interface {
ContainedResolvers() iter.Seq[AnyResolver]
}
ResolverContainer is implemented by types that in some sense "contain" Resolver objects, allowing the responsibility for all of those results to be passed in aggregate to a new task when calling NewWorker.
*Resolver itself implements this interface, so callers with no need for any higher-level aggregation can pass individual Resolver values directly instead of implementing this interface themselves.
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
A Worker represents a specific linear codepath that will ultimately resolve zero or more results.
It's ultimately up to the caller to decide what exactly "linear codepath" means. The simplest mental model is for each Worker to belong to one goroutine and for that worker to go out of scope once the goroutine exits, with no other goroutine interacting with it.
However, the only hard constraint is that each worker can only be waiting on zero or one results at a time, and so two goroutines can potentially share a single worker as long as they somehow arrange for at most one of them to interact with the worker at a time.
A Worker object must be kept live (i.e. not garbage collected) until it has either provided or delegated all of the results that it's responsible for, or else those results will all fail with an error. This is a best-effort mechanism to reclaim other workers that could otherwise be blocked indefinitely, but should not be relied on in any "happy path" because the Go garbage collection details are intentionally underspecified to allow for future improvements.
func NewWorker ¶
func NewWorker(delegatedResolvers ...ResolverContainer) *Worker
NewWorker allocates a new Worker, optionally transferring responsibility for resolving some requests.
Callers are responsible for ensuring that a caller passing resolvers to this function was previously considered to be responsible for those resolvers. Although there are no immediate checks that the caller was already responsible for the given requests (the relationship between codepaths and workers is the caller's concern), incorrect use of this can potentially be detected later if the previous responsible worker subsequently attempts to resolve the request that was delegated.