worker

package
v0.0.0-...-86dd8ac Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 11, 2026 License: AGPL-3.0 Imports: 78 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetPodAddr

func GetPodAddr() (string, error)

GetPodAddr gets the IP from the POD_IP env var. Returns an error if it fails to retrieve an IP.

func GetProcCurrentCPUMillicores

func GetProcCurrentCPUMillicores(cpuTime float64, prevCPUTime float64, systemCPUTime float64, prevSystemCPUTime float64) float64

func GetSystemCPU

func GetSystemCPU() (float64, error)

func IsCRIURestoreError

func IsCRIURestoreError(err error) bool

func NewBackendRepositoryClient

func NewBackendRepositoryClient(ctx context.Context, config types.AppConfig, token string) (pb.BackendRepositoryServiceClient, error)

NewBackendRepositoryClient creates a new backend repository client

func NewContainerRepositoryClient

func NewContainerRepositoryClient(ctx context.Context, config types.AppConfig, token string) (pb.ContainerRepositoryServiceClient, error)

NewContainerRepositoryClient creates a new container repository client

func NewWorkerRepositoryClient

func NewWorkerRepositoryClient(ctx context.Context, config types.AppConfig, token string) (pb.WorkerRepositoryServiceClient, error)

NewWorkerRepositoryClient creates a new worker repository client

Types

type AssignedGpuDevices

type AssignedGpuDevices struct {
}

type CRIUManager

type CRIUManager interface {
	Available() bool
	CreateCheckpoint(ctx context.Context, runtime runtime.Runtime, checkpointId string, request *types.ContainerRequest) (string, error)
	RestoreCheckpoint(ctx context.Context, runtime runtime.Runtime, opts *RestoreOpts) (int, error)
}

func InitializeCRIUManager

func InitializeCRIUManager(ctx context.Context, config types.CRIUConfig) (CRIUManager, error)

InitializeCRIUManager initializes a new CRIU manager that can be used to checkpoint and restore containers -- depending on the mode, it will use either cedana or nvidia cuda checkpoint under the hood

func InitializeNvidiaCRIU

func InitializeNvidiaCRIU(ctx context.Context, config types.CRIUConfig) (CRIUManager, error)

type CedanaCRIUManager

type CedanaCRIUManager struct {
	// contains filtered or unexported fields
}

func InitializeCedanaCRIU

func InitializeCedanaCRIU(
	ctx context.Context,
	c config.Config,
) (*CedanaCRIUManager, error)

func (*CedanaCRIUManager) Available

func (c *CedanaCRIUManager) Available() bool

func (*CedanaCRIUManager) CreateCheckpoint

func (c *CedanaCRIUManager) CreateCheckpoint(ctx context.Context, rt runtime.Runtime, checkpointId string, request *types.ContainerRequest) (string, error)

func (*CedanaCRIUManager) RestoreCheckpoint

func (c *CedanaCRIUManager) RestoreCheckpoint(ctx context.Context, rt runtime.Runtime, opts *RestoreOpts) (int, error)

type ContainerInstance

type ContainerInstance struct {
	Id                         string
	StubId                     string
	BundlePath                 string
	Overlay                    *common.ContainerOverlay
	Spec                       *specs.Spec
	Err                        error
	ExitCode                   int
	Port                       int
	OutputWriter               *common.OutputWriter
	LogBuffer                  *common.LogBuffer
	Request                    *types.ContainerRequest
	StopReason                 types.StopContainerReason
	SandboxProcessManager      *goproc.GoProcClient
	SandboxProcessManagerReady bool
	ContainerIp                string
	Runtime                    runtime.Runtime
	OOMWatcher                 runtime.OOMWatcher
}

type ContainerLogMessage

type ContainerLogMessage struct {
	Level   string  `json:"level"`
	Message string  `json:"message"`
	TaskID  *string `json:"task_id"`
}

type ContainerLogger

type ContainerLogger struct {
	// contains filtered or unexported fields
}

func (*ContainerLogger) CaptureLogs

func (r *ContainerLogger) CaptureLogs(request *types.ContainerRequest, logChan chan common.LogRecord) error

func (*ContainerLogger) Log

func (r *ContainerLogger) Log(containerId, stubId string, format string, args ...any) error

func (*ContainerLogger) Read

func (r *ContainerLogger) Read(containerId string, buffer []byte) (int64, error)

type ContainerMountManager

type ContainerMountManager struct {
	// contains filtered or unexported fields
}

func NewContainerMountManager

func NewContainerMountManager(config types.AppConfig) *ContainerMountManager

func (*ContainerMountManager) RemoveContainerMounts

func (c *ContainerMountManager) RemoveContainerMounts(containerId string)

RemoveContainerMounts removes all mounts for a container

func (*ContainerMountManager) SetupContainerMounts

func (c *ContainerMountManager) SetupContainerMounts(ctx context.Context, request *types.ContainerRequest, outputLogger *slog.Logger) error

SetupContainerMounts initializes any external storage for a container

type ContainerNetworkManager

type ContainerNetworkManager struct {
	// contains filtered or unexported fields
}

func NewContainerNetworkManager

func NewContainerNetworkManager(ctx context.Context, workerId string, workerRepoClient pb.WorkerRepositoryServiceClient, containerRepoClient pb.ContainerRepositoryServiceClient, config types.AppConfig, containerInstances *common.SafeMap[*ContainerInstance]) (*ContainerNetworkManager, error)

func (*ContainerNetworkManager) ExposePort

func (m *ContainerNetworkManager) ExposePort(containerId string, hostPort, containerPort int) error

func (*ContainerNetworkManager) Setup

func (m *ContainerNetworkManager) Setup(containerId string, spec *specs.Spec, request *types.ContainerRequest) error

func (*ContainerNetworkManager) TearDown

func (m *ContainerNetworkManager) TearDown(containerId string) error

func (*ContainerNetworkManager) UpdateNetworkPermissions

func (m *ContainerNetworkManager) UpdateNetworkPermissions(containerId string, request *types.ContainerRequest) error

type ContainerNvidiaManager

type ContainerNvidiaManager struct {
	// contains filtered or unexported fields
}

func (*ContainerNvidiaManager) AssignGPUDevices

func (c *ContainerNvidiaManager) AssignGPUDevices(containerId string, gpuCount uint32) ([]int, error)

func (*ContainerNvidiaManager) GetContainerGPUDevices

func (c *ContainerNvidiaManager) GetContainerGPUDevices(containerId string) []int

func (*ContainerNvidiaManager) InjectEnvVars

func (c *ContainerNvidiaManager) InjectEnvVars(env []string) []string

func (*ContainerNvidiaManager) InjectMounts

func (c *ContainerNvidiaManager) InjectMounts(mounts []specs.Mount) []specs.Mount

func (*ContainerNvidiaManager) UnassignGPUDevices

func (c *ContainerNvidiaManager) UnassignGPUDevices(containerId string)

type ContainerOptions

type ContainerOptions struct {
	BundlePath   string
	HostBindPort int
	BindPorts    []int
	InitialSpec  *specs.Spec
}

type ContainerResources

type ContainerResources interface {
	GetCPU(request *types.ContainerRequest) *specs.LinuxCPU
	GetMemory(request *types.ContainerRequest) *specs.LinuxMemory
}

type ContainerRuntimeServer

type ContainerRuntimeServer struct {
	pb.UnimplementedContainerServiceServer
	// contains filtered or unexported fields
}

ContainerRuntimeServer is a runtime-agnostic container server that works with any OCI runtime

func NewContainerRuntimeServer

func NewContainerRuntimeServer(opts *ContainerRuntimeServerOpts) (*ContainerRuntimeServer, error)

NewContainerRuntimeServer creates a new runtime-agnostic container server

func (*ContainerRuntimeServer) ContainerArchive

ContainerArchive archives a container's filesystem

func (*ContainerRuntimeServer) ContainerCheckpoint

ContainerCheckpoint creates a checkpoint of a running container

func (*ContainerRuntimeServer) ContainerExec

ContainerExec executes a command inside a running container

func (*ContainerRuntimeServer) ContainerKill

ContainerKill kills and removes a container using the worker's configured runtime

func (*ContainerRuntimeServer) ContainerSandboxExec

func (*ContainerRuntimeServer) ContainerSandboxKill

func (*ContainerRuntimeServer) ContainerSandboxListFiles

func (*ContainerRuntimeServer) ContainerSandboxStatFile

func (*ContainerRuntimeServer) ContainerSandboxStatus

func (*ContainerRuntimeServer) ContainerSandboxStderr

func (*ContainerRuntimeServer) ContainerSandboxStdout

func (*ContainerRuntimeServer) ContainerStatus

ContainerStatus returns the status of a container

func (*ContainerRuntimeServer) ContainerStreamLogs

ContainerStreamLogs streams container logs

func (*ContainerRuntimeServer) ContainerSyncWorkspace

ContainerSyncWorkspace syncs workspace files

func (*ContainerRuntimeServer) Start

func (s *ContainerRuntimeServer) Start() error

func (*ContainerRuntimeServer) Stop

func (s *ContainerRuntimeServer) Stop() error

type ContainerRuntimeServerOpts

type ContainerRuntimeServerOpts struct {
	PodAddr                 string
	Runtime                 runtime.Runtime // The runtime configured for this worker pool
	ContainerInstances      *common.SafeMap[*ContainerInstance]
	ImageClient             *ImageClient
	ContainerRepoClient     pb.ContainerRepositoryServiceClient
	ContainerNetworkManager *ContainerNetworkManager
	CreateCheckpoint        func(ctx context.Context, opts *CreateCheckpointOpts) error
}

type CreateCheckpointOpts

type CreateCheckpointOpts struct {
	Request           *types.ContainerRequest
	CheckpointId      string
	ContainerIp       string
	OutputLogger      *slog.Logger
	CheckpointPIDChan chan int
	WaitForSignal     bool
}

type ErrCRIURestoreFailed

type ErrCRIURestoreFailed struct {
	Stderr string
}

func (*ErrCRIURestoreFailed) Error

func (e *ErrCRIURestoreFailed) Error() string

type FileCacheManager

type FileCacheManager struct {
	// contains filtered or unexported fields
}

func NewFileCacheManager

func NewFileCacheManager(config types.AppConfig, client *blobcache.BlobCacheClient) *FileCacheManager

func (*FileCacheManager) CacheAvailable

func (cm *FileCacheManager) CacheAvailable() bool

CacheAvailable checks if the file cache is available

func (*FileCacheManager) CacheFilesInPath

func (cm *FileCacheManager) CacheFilesInPath(sourcePath string)

CacheFilesInPath caches files from a specified source path

func (*FileCacheManager) EnableVolumeCaching

func (cm *FileCacheManager) EnableVolumeCaching(workspaceName string, volumeCacheMap map[string]string, spec *specs.Spec) error

func (*FileCacheManager) GetClient

func (cm *FileCacheManager) GetClient() *blobcache.BlobCacheClient

GetClient returns the blobcache client instance.

type FileLock

type FileLock struct {
	// contains filtered or unexported fields
}

func NewFileLock

func NewFileLock(path string) *FileLock

func (*FileLock) Acquire

func (fl *FileLock) Acquire() error

func (*FileLock) Release

func (fl *FileLock) Release() error

type GPUInfoClient

type GPUInfoClient interface {
	AvailableGPUDevices() ([]int, error)
	GetGPUMemoryUsage(deviceIndex int) (GPUMemoryUsageStats, error)
}

type GPUInfoStat

type GPUInfoStat struct {
	MemoryUsed  uint64
	MemoryTotal uint64
}

type GPUManager

type GPUManager interface {
	AssignGPUDevices(containerId string, gpuCount uint32) ([]int, error)
	GetContainerGPUDevices(containerId string) []int
	UnassignGPUDevices(containerId string)
	InjectEnvVars(env []string) []string
	InjectMounts(mounts []specs.Mount) []specs.Mount
}

func NewContainerNvidiaManager

func NewContainerNvidiaManager(gpuCount uint32) GPUManager

type GPUMemoryUsageStats

type GPUMemoryUsageStats struct {
	UsedCapacity  int64
	TotalCapacity int64
}

type GvisorResources

type GvisorResources struct {
	*StandardResources
}

func NewGvisorResources

func NewGvisorResources() *GvisorResources

func (*GvisorResources) GetCPU

func (g *GvisorResources) GetCPU(request *types.ContainerRequest) *specs.LinuxCPU

func (*GvisorResources) GetMemory

func (g *GvisorResources) GetMemory(request *types.ContainerRequest) *specs.LinuxMemory

type ImageClient

type ImageClient struct {
	// contains filtered or unexported fields
}

func NewImageClient

func NewImageClient(config types.AppConfig, workerId string, workerRepoClient pb.WorkerRepositoryServiceClient, fileCacheManager *FileCacheManager) (*ImageClient, error)

func (*ImageClient) Archive

func (c *ImageClient) Archive(ctx context.Context, bundlePath *PathInfo, imageId string, progressChan chan int) error

Generate and upload archived version of the image for distribution

func (*ImageClient) BuildAndArchiveImage

func (c *ImageClient) BuildAndArchiveImage(ctx context.Context, outputLogger *slog.Logger, request *types.ContainerRequest) error

func (*ImageClient) Cleanup

func (c *ImageClient) Cleanup() error

func (*ImageClient) GetCLIPImageMetadata

func (c *ImageClient) GetCLIPImageMetadata(imageId string) (*clipCommon.ImageMetadata, bool)

GetCLIPImageMetadata extracts CLIP image metadata from the archive

func (*ImageClient) GetSourceImageRef

func (c *ImageClient) GetSourceImageRef(imageId string) (string, bool)

GetSourceImageRef retrieves the cached source image reference for a v2 image

func (*ImageClient) PullAndArchiveImage

func (c *ImageClient) PullAndArchiveImage(ctx context.Context, outputLogger *slog.Logger, request *types.ContainerRequest) error

func (*ImageClient) PullLazy

func (c *ImageClient) PullLazy(ctx context.Context, request *types.ContainerRequest, outputLogger *slog.Logger) (time.Duration, error)

type NvidiaCRIUManager

type NvidiaCRIUManager struct {
	// contains filtered or unexported fields
}

func (*NvidiaCRIUManager) Available

func (c *NvidiaCRIUManager) Available() bool

func (*NvidiaCRIUManager) CreateCheckpoint

func (c *NvidiaCRIUManager) CreateCheckpoint(ctx context.Context, rt runtime.Runtime, checkpointId string, request *types.ContainerRequest) (string, error)

func (*NvidiaCRIUManager) RestoreCheckpoint

func (c *NvidiaCRIUManager) RestoreCheckpoint(ctx context.Context, rt runtime.Runtime, opts *RestoreOpts) (int, error)

type NvidiaInfoClient

type NvidiaInfoClient struct{}

func (*NvidiaInfoClient) AvailableGPUDevices

func (c *NvidiaInfoClient) AvailableGPUDevices() ([]int, error)

func (*NvidiaInfoClient) GetGPUMemoryUsage

func (c *NvidiaInfoClient) GetGPUMemoryUsage(deviceIndex int) (GPUMemoryUsageStats, error)

GetGpuMemoryUsage retrieves the memory usage of a specific NVIDIA GPU. It returns the total and used memory in bytes.

type PathInfo

type PathInfo struct {
	Path string
	// contains filtered or unexported fields
}

func NewPathInfo

func NewPathInfo(path string) *PathInfo

func (*PathInfo) GetSize

func (p *PathInfo) GetSize() float64

type ProcUtil

type ProcUtil struct {
	procfs.Proc
}

func NewProcUtil

func NewProcUtil(pid int) (*ProcUtil, error)

type ProcessMonitor

type ProcessMonitor struct {
	// contains filtered or unexported fields
}

func NewProcessMonitor

func NewProcessMonitor(pid int, devices []specs.LinuxDeviceCgroup, gpuDeviceIds []int) *ProcessMonitor

func (*ProcessMonitor) GetStatistics

func (m *ProcessMonitor) GetStatistics() (*ProcessStats, error)

type ProcessStats

type ProcessStats struct {
	CPU    uint64 // in millicores
	Memory process.MemoryInfoStat
	IO     process.IOCountersStat
	NetIO  net.IOCountersStat
	GPU    GPUInfoStat
}

type RestoreOpts

type RestoreOpts struct {
	// contains filtered or unexported fields
}

type RuncResources

type RuncResources struct {
	*StandardResources
}

func NewRuncResources

func NewRuncResources() *RuncResources

type StagedFile

type StagedFile struct {
	Path    string
	Content string
}

type StandardResources

type StandardResources struct {
	// contains filtered or unexported fields
}

func NewStandardResources

func NewStandardResources() *StandardResources

func (*StandardResources) GetCPU

func (r *StandardResources) GetCPU(request *types.ContainerRequest) *specs.LinuxCPU

func (*StandardResources) GetMemory

func (r *StandardResources) GetMemory(request *types.ContainerRequest) *specs.LinuxMemory

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

func NewWorker

func NewWorker() (*Worker, error)

func (*Worker) IsCRIUAvailable

func (s *Worker) IsCRIUAvailable(gpuCount uint32) bool

func (*Worker) Run

func (s *Worker) Run() error

func (*Worker) RunContainer

func (s *Worker) RunContainer(ctx context.Context, request *types.ContainerRequest) error

Spawn a single container and stream output to stdout/stderr

type WorkerUsageMetrics

type WorkerUsageMetrics struct {
	// contains filtered or unexported fields
}

func NewWorkerUsageMetrics

func NewWorkerUsageMetrics(
	ctx context.Context,
	workerId string,
	config types.MonitoringConfig,
	gpuType string,
) (*WorkerUsageMetrics, error)

func (*WorkerUsageMetrics) EmitContainerUsage

func (wm *WorkerUsageMetrics) EmitContainerUsage(ctx context.Context, request *types.ContainerRequest)

Periodically send metrics to track container duration

type WorkspaceStorageManager

type WorkspaceStorageManager struct {
	// contains filtered or unexported fields
}

func NewWorkspaceStorageManager

func NewWorkspaceStorageManager(ctx context.Context, config types.StorageConfig, poolConfig types.WorkerPoolConfig, containerInstances *common.SafeMap[*ContainerInstance], cacheClient *blobcache.BlobCacheClient) (*WorkspaceStorageManager, error)

func (*WorkspaceStorageManager) Cleanup

func (sm *WorkspaceStorageManager) Cleanup() error

func (*WorkspaceStorageManager) Create

func (sm *WorkspaceStorageManager) Create(workspaceName string, storage storage.Storage)

func (*WorkspaceStorageManager) Mount

func (sm *WorkspaceStorageManager) Mount(workspaceName string, workspaceStorage *types.WorkspaceStorage) (storage.Storage, error)

func (*WorkspaceStorageManager) Unmount

func (sm *WorkspaceStorageManager) Unmount(workspaceName string) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL