Documentation
¶
Overview ¶
Package worker handles interactions with SDK side workers, representing the worker services, communicating with those services, and SDK environments.
Index ¶
- type B
- func (b *B) Cleanup(wk *W)
- func (b *B) DataOrTimerDone()
- func (b *B) Init()
- func (b *B) LogValue() slog.Value
- func (b *B) ProcessOn(ctx context.Context, wk *W) <-chan struct{}
- func (b *B) Progress(ctx context.Context, wk *W) (*fnpb.ProcessBundleProgressResponse, error)
- func (b *B) Respond(resp *fnpb.InstructionResponse)
- func (b *B) Split(ctx context.Context, wk *W, fraction float64, allowedSplits []int64) (*fnpb.ProcessBundleSplitResponse, error)
- type SideInputKey
- type W
- func (wk *W) Connected() bool
- func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error
- func (wk *W) Data(data fnpb.BeamFnData_DataServer) error
- func (wk *W) Endpoint() string
- func (wk *W) GetProcessBundleDescriptor(ctx context.Context, req *fnpb.GetProcessBundleDescriptorRequest) (*fnpb.ProcessBundleDescriptor, error)
- func (wk *W) GetProvisionInfo(_ context.Context, _ *fnpb.GetProvisionInfoRequest) (*fnpb.GetProvisionInfoResponse, error)
- func (wk *W) LogValue() slog.Value
- func (wk *W) Logging(stream fnpb.BeamFnLogging_LoggingServer) error
- func (wk *W) MonitoringMetadata(ctx context.Context, unknownIDs []string) *fnpb.MonitoringInfosMetadataResponse
- func (wk *W) NextInst() string
- func (wk *W) Serve()
- func (wk *W) State(state fnpb.BeamFnState_StateServer) error
- func (wk *W) Stop()
- func (wk *W) Stopped() bool
- func (wk *W) String() string
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type B ¶
type B struct {
InstID string // ID for the instruction processing this bundle.
PBDID string // ID for the ProcessBundleDescriptor
// InputTransformID is where data is being sent to in the SDK.
InputTransformID string
Input []*engine.Block // Data and Timers for this bundle.
EstimatedInputElements int
HasTimers []string
// IterableSideInputData is a map from transformID + inputID, to window, to data.
IterableSideInputData map[SideInputKey]map[typex.Window][][]byte
// MultiMapSideInputData is a map from transformID + inputID, to window, to data key, to data values.
MultiMapSideInputData map[SideInputKey]map[typex.Window]map[string][][]byte
// OutputCount is the number of data or timer outputs this bundle has.
// We need to see this many closed data channels before the bundle is complete.
OutputCount int
// DataWait is how we determine if a bundle is finished, by waiting for each of
// a Bundle's DataSinks to produce their last output.
// After this point we can "commit" the bundle's output for downstream use.
DataWait chan struct{}
OutputData engine.TentativeData
Resp chan *fnpb.ProcessBundleResponse
BundleErr error
SinkToPCollection map[string]string
// contains filtered or unexported fields
}
B represents an extant ProcessBundle instruction sent to an SDK worker. Generally manipulated by another package to interact with a worker.
func (*B) DataOrTimerDone ¶
func (b *B) DataOrTimerDone()
DataOrTimerDone indicates a final element has been received from a Data or Timer output.
func (*B) Init ¶
func (b *B) Init()
Init initializes the bundle's internal state for waiting on all data and for relaying a response back.
func (*B) ProcessOn ¶
ProcessOn executes the given bundle on the given W. The returned channel is closed once all expected data is returned.
Assumes the bundle is initialized (all maps are non-nil, and data waitgroup is set, response channel initialized) Assumes the bundle descriptor is already registered with the W.
While this method mostly manipulates a W, putting it on a B avoids mixing the workers public GRPC APIs up with local calls.
func (*B) Progress ¶
Progress sends a progress request for the given bundle to the passed in worker, blocking on the response.
func (*B) Respond ¶
func (b *B) Respond(resp *fnpb.InstructionResponse)
type SideInputKey ¶
type SideInputKey struct {
TransformID, Local string
}
SideInputKey is for data lookups for a given bundle.
type W ¶
type W struct {
fnpb.UnimplementedBeamFnControlServer
fnpb.UnimplementedBeamFnDataServer
fnpb.UnimplementedBeamFnStateServer
fnpb.UnimplementedBeamFnLoggingServer
fnpb.UnimplementedProvisionServiceServer
ID, Env string
JobKey, ArtifactEndpoint string
EnvPb *pipepb.Environment
PipelineOptions *structpb.Struct
InstReqs chan *fnpb.InstructionRequest
DataReqs chan *fnpb.Elements
Descriptors map[string]*fnpb.ProcessBundleDescriptor // Stages keyed by PBDID
// contains filtered or unexported fields
}
A W manages worker environments, sending them work that they're able to execute, and manages the server side handlers for FnAPI RPCs.
func (*W) Control ¶
func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error
Control relays instructions to SDKs and back again, coordinated via unique instructionIDs.
Requests come from the runner, and are sent to the client in the SDK.
func (*W) Data ¶
func (wk *W) Data(data fnpb.BeamFnData_DataServer) error
Data relays elements and timer bytes to SDKs and back again, coordinated via ProcessBundle instructionIDs, and receiving input transforms.
Data is multiplexed on a single stream for all active bundles on a worker.
func (*W) GetProcessBundleDescriptor ¶
func (wk *W) GetProcessBundleDescriptor(ctx context.Context, req *fnpb.GetProcessBundleDescriptorRequest) (*fnpb.ProcessBundleDescriptor, error)
func (*W) GetProvisionInfo ¶
func (wk *W) GetProvisionInfo(_ context.Context, _ *fnpb.GetProvisionInfoRequest) (*fnpb.GetProvisionInfoResponse, error)
func (*W) Logging ¶
func (wk *W) Logging(stream fnpb.BeamFnLogging_LoggingServer) error
Logging relates SDK worker messages back to the job that spawned them. Messages are received from the SDK,
func (*W) MonitoringMetadata ¶
func (wk *W) MonitoringMetadata(ctx context.Context, unknownIDs []string) *fnpb.MonitoringInfosMetadataResponse
MonitoringMetadata is a convenience method to request the metadata for monitoring shortIDs.