readers

package
v0.0.0-...-b3910b2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 13, 2025 License: GPL-2.0 Imports: 33 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AuthConfig

type AuthConfig struct {
	Type          string            // "bearer", "basic", "apikey", "oauth2", "custom"
	Token         string            // Bearer token or API key
	Username      string            // For basic auth
	Password      string            // For basic auth
	HeaderName    string            // Custom header name for API key
	HeaderValue   string            // Custom header value
	QueryParam    string            // Query parameter name for API key
	CustomHeaders map[string]string // Additional custom headers
}

AuthConfig defines authentication configuration

type CSVReader

type CSVReader struct {
	// contains filtered or unexported fields
}

CSVReader implements core.DataSource for CSV files. It supports header detection, delimiter configuration, type inference, and statistics.

func NewCSVReader

func NewCSVReader(r io.ReadCloser, options ...ReaderOptionCSV) (*CSVReader, error)

NewCSVReader creates a CSVReader with default or overridden options. Accepts functional options for configuration. Returns a ready-to-use reader or an error.

func (*CSVReader) Close

func (c *CSVReader) Close() error

Close implements the core.DataSource interface. Closes the underlying reader.

func (*CSVReader) Read

func (c *CSVReader) Read(ctx context.Context) (core.Record, error)

Read implements the core.DataSource interface. Reads the next record from the CSV file. Thread-safe and context-aware.

func (*CSVReader) Stats

func (c *CSVReader) Stats() CSVReaderStats

Stats returns CSV reader performance stats.

type CSVReaderError

type CSVReaderError struct {
	Op  string // Operation that failed (e.g., "read", "read_headers", "read_record")
	Err error  // Underlying error
}

CSVReaderError wraps structured error information for the CSV reader.

func (*CSVReaderError) Error

func (e *CSVReaderError) Error() string

func (*CSVReaderError) Unwrap

func (e *CSVReaderError) Unwrap() error

type CSVReaderOptions

type CSVReaderOptions struct {
	Comma            rune // Field delimiter (default ',')
	Comment          rune // Comment character (optional)
	FieldsPerRecord  int  // Number of expected fields per record (optional)
	LazyQuotes       bool // Allow lazy quotes in CSV
	TrimLeadingSpace bool // Trim leading space in fields
	HasHeaders       bool // Whether the first row is a header
}

CSVReaderOptions configures the CSV reader.

type CSVReaderStats

type CSVReaderStats struct {
	RecordsRead     int64            // Total records read
	ReadDuration    time.Duration    // Total time spent reading
	LastReadTime    time.Time        // Time of last read
	NullValueCounts map[string]int64 // Count of null values per field
}

CSVReaderStats holds statistics about the CSV reader's performance.

type HTTPReader

type HTTPReader struct {
	// contains filtered or unexported fields
}

HTTPReader implements core.DataSource for HTTP APIs

func NewAuthenticatedHTTPReader

func NewAuthenticatedHTTPReader(url, token string) (*HTTPReader, error)

NewAuthenticatedHTTPReader creates an HTTP reader with bearer token authentication

func NewHTTPReader

func NewHTTPReader(url string, options ...ReaderOptionHTTP) (*HTTPReader, error)

NewHTTPReader creates a new HTTP API reader with configurable options

func NewHTTPReaderFromURL

func NewHTTPReaderFromURL(url string) (*HTTPReader, error)

NewHTTPReaderFromURL creates a basic HTTP reader for a single URL

func NewPaginatedHTTPReader

func NewPaginatedHTTPReader(url string, pageSize int, paginationType string) (*HTTPReader, error)

NewPaginatedHTTPReader creates an HTTP reader with pagination support

func (*HTTPReader) Close

func (hr *HTTPReader) Close() error

Close implements the core.DataSource interface

func (*HTTPReader) Read

func (hr *HTTPReader) Read(ctx context.Context) (core.Record, error)

Read implements the core.DataSource interface

func (*HTTPReader) Stats

func (hr *HTTPReader) Stats() HTTPReaderStats

Stats returns HTTP reader performance statistics

type HTTPReaderError

type HTTPReaderError struct {
	Op         string // Operation that failed (e.g., "request", "auth", "parse", "pagination")
	StatusCode int    // HTTP status code if applicable
	URL        string // URL being accessed when error occurred
	Err        error  // Underlying error
}

HTTPReaderError provides structured error information for HTTP reader operations

func (*HTTPReaderError) Error

func (e *HTTPReaderError) Error() string

func (*HTTPReaderError) Unwrap

func (e *HTTPReaderError) Unwrap() error

type HTTPReaderOptions

type HTTPReaderOptions struct {
	Method           string            // HTTP method (default: GET)
	Headers          map[string]string // Additional headers
	QueryParams      map[string]string // Query parameters
	Body             io.Reader         // Request body for POST/PUT
	Auth             *AuthConfig       // Authentication configuration
	Pagination       *PaginationConfig // Pagination configuration
	Timeout          time.Duration     // Request timeout
	RetryAttempts    int               // Number of retry attempts
	RetryDelay       time.Duration     // Base delay between retries
	RateLimit        time.Duration     // Minimum time between requests
	ResponseFormat   string            // "json", "jsonl", "csv", "xml"
	DataPath         string            // JSON path to extract data array
	BufferSize       int               // Buffer size for response reading
	MaxResponseSize  int64             // Maximum response size in bytes
	FollowRedirects  bool              // Follow HTTP redirects
	ValidStatusCodes []int             // Valid HTTP status codes
	UserAgent        string            // User agent string
	AcceptEncoding   string            // Accept encoding header
	CustomClient     *http.Client      // Custom HTTP client
}

HTTPReaderOptions configures the HTTP reader

type HTTPReaderStats

type HTTPReaderStats struct {
	RequestCount    int64            // Total HTTP requests made
	RecordsRead     int64            // Total records read
	BytesRead       int64            // Total bytes read
	ReadDuration    time.Duration    // Total time spent reading
	LastReadTime    time.Time        // Time of last read
	RetryCount      int64            // Number of retries performed
	RateLimitHits   int64            // Number of rate limit hits
	NullValueCounts map[string]int64 // Count of null values per field
	ResponseTimes   []time.Duration  // Response times for monitoring
}

HTTPReaderStats holds statistics about the HTTP reader's performance

type JSONReader

type JSONReader struct {
	// contains filtered or unexported fields
}

JSONReader implements core.DataSource for line-delimited JSON files. It supports buffered reading, batch configuration, and statistics.

func NewJSONReader

func NewJSONReader(r io.ReadCloser, options ...ReaderOptionJSON) *JSONReader

NewJSONReader creates a new line-delimited JSON reader with optional config. Accepts functional options for configuration. Returns a ready-to-use reader.

func (*JSONReader) Close

func (j *JSONReader) Close() error

Close implements the core.DataSource interface. Closes the underlying reader.

func (*JSONReader) Read

func (j *JSONReader) Read(ctx context.Context) (core.Record, error)

Read implements the core.DataSource interface, returning one JSON record per line. Thread-safe and context-aware.

func (*JSONReader) Stats

func (j *JSONReader) Stats() JSONReaderStats

Stats returns reader metrics like bytes read, nulls, and durations.

type JSONReaderError

type JSONReaderError struct {
	Op  string // Operation that failed (e.g., "read", "scan", "unmarshal")
	Err error  // Underlying error
}

JSONReaderError wraps detailed context for JSONReader operations.

func (*JSONReaderError) Error

func (e *JSONReaderError) Error() string

func (*JSONReaderError) Unwrap

func (e *JSONReaderError) Unwrap() error

type JSONReaderOptions

type JSONReaderOptions struct {
	BufferSize int // Optional buffer size in bytes for scanning
}

JSONReaderOptions configures optional behavior for the reader.

type JSONReaderStats

type JSONReaderStats struct {
	RecordsRead     int64            // Total records read
	BytesRead       int64            // Total bytes read
	ReadDuration    time.Duration    // Total time spent reading
	LastReadTime    time.Time        // Time of last read
	NullValueCounts map[string]int64 // Count of null values per field
}

JSONReaderStats provides metrics about JSONReader activity.

type MongoReadMode

type MongoReadMode string

MongoReadMode defines how data should be read from MongoDB

const (
	ModeFind      MongoReadMode = "find"      // Standard find query
	ModeAggregate MongoReadMode = "aggregate" // Aggregation pipeline
	ModeWatch     MongoReadMode = "watch"     // Change stream
	ModeBulk      MongoReadMode = "bulk"      // Bulk read with pagination
)

type MongoReader

type MongoReader struct {
	// contains filtered or unexported fields
}

MongoReader implements core.DataSource for MongoDB collections

func NewMongoAggregationReader

func NewMongoAggregationReader(uri, database, collection string, pipeline []bson.M) (*MongoReader, error)

NewMongoAggregationReader creates a MongoDB reader with aggregation pipeline

func NewMongoChangeStreamReader

func NewMongoChangeStreamReader(uri, database, collection string) (*MongoReader, error)

NewMongoChangeStreamReader creates a MongoDB change stream reader

func NewMongoFilteredReader

func NewMongoFilteredReader(uri, database, collection string, filter, projection bson.M) (*MongoReader, error)

NewMongoFilteredReader creates a MongoDB reader with filter and projection

func NewMongoReader

func NewMongoReader(options ...ReaderOptionMongo) (*MongoReader, error)

NewMongoReader creates a new MongoDB reader with configurable options

func NewMongoReaderFromURI

func NewMongoReaderFromURI(uri, database, collection string) (*MongoReader, error)

NewMongoReaderFromURI creates a basic MongoDB reader from a URI

func (*MongoReader) Close

func (mr *MongoReader) Close() error

Close implements the core.DataSource interface

func (*MongoReader) Connect

func (mr *MongoReader) Connect(ctx context.Context) error

Connect establishes connection to MongoDB

func (*MongoReader) Read

func (mr *MongoReader) Read(ctx context.Context) (core.Record, error)

Read implements the core.DataSource interface

func (*MongoReader) Stats

func (mr *MongoReader) Stats() MongoReaderStats

Stats returns MongoDB reader performance statistics

type MongoReaderError

type MongoReaderError struct {
	Op         string // Operation that failed (e.g., "connect", "query", "decode", "aggregate")
	Collection string // Collection being accessed when error occurred
	Err        error  // Underlying error
}

MongoReaderError provides structured error information for MongoDB reader operations

func (*MongoReaderError) Error

func (e *MongoReaderError) Error() string

func (*MongoReaderError) Unwrap

func (e *MongoReaderError) Unwrap() error

type MongoReaderOptions

type MongoReaderOptions struct {
	URI              string                   // MongoDB connection URI
	Database         string                   // Database name
	Collection       string                   // Collection name
	Mode             MongoReadMode            // Read mode
	Filter           bson.M                   // Query filter for find operations
	Projection       bson.M                   // Field projection
	Sort             bson.M                   // Sort specification
	Pipeline         []bson.M                 // Aggregation pipeline stages
	BatchSize        int32                    // Batch size for cursor
	Limit            int64                    // Maximum number of documents to read
	Skip             int64                    // Number of documents to skip
	Timeout          time.Duration            // Operation timeout
	MaxPoolSize      uint64                   // Connection pool size
	MinPoolSize      uint64                   // Minimum connections in pool
	MaxConnIdleTime  time.Duration            // Max idle time for connections
	ReadPreference   string                   // Read preference: primary, secondary, etc.
	ReadConcern      string                   // Read concern level
	AuthDatabase     string                   // Authentication database
	Username         string                   // Authentication username
	Password         string                   // Authentication password
	TLS              bool                     // Enable TLS
	TLSInsecure      bool                     // Skip TLS verification
	ReplicaSetName   string                   // Replica set name
	RetryReads       bool                     // Enable read retries
	RetryWrites      bool                     // Enable write retries
	Compressors      []string                 // Compression algorithms
	ZlibLevel        int                      // Zlib compression level
	CustomClientOpts []*options.ClientOptions // Custom client options
	AllowDiskUse     bool                     // Allow aggregation to use disk
	Hint             interface{}              // Index hint for queries
	MaxTimeMS        int64                    // Maximum execution time
	Comment          string                   // Query comment for profiling
	Collation        *options.Collation       // Collation options
}

MongoReaderOptions configures the MongoDB reader

type MongoReaderStats

type MongoReaderStats struct {
	RecordsRead     int64            // Total records read
	QueriesExecuted int64            // Total queries executed
	ReadDuration    time.Duration    // Total time spent reading
	LastReadTime    time.Time        // Time of last read
	BatchesRead     int64            // Number of batches processed
	BytesRead       int64            // Estimated bytes read
	NullValueCounts map[string]int64 // Count of null values per field
	ErrorCount      int64            // Number of errors encountered
}

MongoReaderStats holds statistics about the MongoDB reader's performance

type PaginationConfig

type PaginationConfig struct {
	Type         string // "offset", "cursor", "page", "link_header", "none"
	LimitParam   string // Parameter name for limit/page size
	OffsetParam  string // Parameter name for offset
	PageParam    string // Parameter name for page number
	CursorParam  string // Parameter name for cursor
	PageSize     int    // Number of records per page
	MaxPages     int    // Maximum pages to fetch (0 = unlimited)
	NextURLField string // JSON field containing next page URL
	CursorField  string // JSON field containing next cursor
	TotalField   string // JSON field containing total count
	HasMoreField string // JSON field indicating more data available
}

PaginationConfig defines pagination behavior

type ParquetReader

type ParquetReader struct {
	// contains filtered or unexported fields
}

ParquetReader implements DataSource for Parquet files Supports optional column projection and safe resource management

func NewParquetReader

func NewParquetReader(filename string, options ...ReaderOption) (*ParquetReader, error)

NewParquetReader opens a Parquet file and prepares an Arrow RecordReader

func (*ParquetReader) Close

func (p *ParquetReader) Close() error

Close releases resources and closes the underlying file

func (*ParquetReader) Read

func (p *ParquetReader) Read(ctx context.Context) (core.Record, error)

Read reads the next record from the Parquet file, returning core.Record or io.EOF

func (*ParquetReader) Schema

func (p *ParquetReader) Schema() *arrow.Schema

Schema returns the Arrow schema of the Parquet file

func (*ParquetReader) Stats

func (p *ParquetReader) Stats() ReaderStats

Stats returns statistics about the Parquet reader's performance such as number of records read, batches processed, and null value counts This can be useful for monitoring and debugging the reading process

type ParquetReaderError

type ParquetReaderError struct {
	Op  string // Operation that failed (e.g., "read", "load_batch", "open_file", "schema")
	Err error  // Underlying error
}

ParquetReaderError provides structured error information for parquet reader operations

func (*ParquetReaderError) Error

func (e *ParquetReaderError) Error() string

func (*ParquetReaderError) Unwrap

func (e *ParquetReaderError) Unwrap() error

type ParquetReaderOptions

type ParquetReaderOptions struct {
	BatchSize     int64
	Columns       []string
	UseArrowTypes bool              // Return native Arrow types vs Go primitives
	PreloadBatch  bool              // Preload next batch for better performance
	MemoryLimit   int64             // Memory usage limit for batches
	ParallelRead  bool              // Enable parallel column reading
	Metadata      map[string]string // Custom metadata filters
}

ParquetReaderOptions configures the Parquet reader BatchSize: rows per batch Columns: optional list of column names to project

type PostgresReader

type PostgresReader struct {
	// contains filtered or unexported fields
}

PostgresReader implements core.DataSource for PostgreSQL databases. Supports streaming query results with configurable batch processing, connection pooling, and cursor-based streaming.

func NewPostgresReader

func NewPostgresReader(options ...PostgresReaderOption) (*PostgresReader, error)

NewPostgresReader creates a new PostgreSQL reader with the given options. Accepts functional options for configuration. Returns a ready-to-use reader or an error.

func (*PostgresReader) Close

func (p *PostgresReader) Close() error

Close releases all resources held by the PostgreSQL reader

func (*PostgresReader) Read

func (p *PostgresReader) Read(ctx context.Context) (core.Record, error)

Read implements the core.DataSource interface. Reads the next record from the PostgreSQL query result. Thread-safe.

func (*PostgresReader) Schema

func (p *PostgresReader) Schema() map[string]string

Schema returns information about the columns in the query result. Returns a map of column name to database type name.

func (*PostgresReader) Stats

Stats returns statistics about the PostgreSQL reader's performance

type PostgresReaderError

type PostgresReaderError struct {
	Op  string // Operation that failed (e.g., "connect", "query", "scan", "read")
	Err error  // Underlying error
}

PostgresReaderError provides structured error information for Postgres reader operations

func (*PostgresReaderError) Error

func (e *PostgresReaderError) Error() string

func (*PostgresReaderError) Unwrap

func (e *PostgresReaderError) Unwrap() error

type PostgresReaderOption

type PostgresReaderOption func(*PostgresReaderOptions)

PostgresReaderOption represents a configuration function for PostgresReaderOptions

func WithPostgresBatchSize

func WithPostgresBatchSize(size int) PostgresReaderOption

WithPostgresBatchSize sets the batch size for query results.

func WithPostgresConnectionPool

func WithPostgresConnectionPool(maxOpen, maxIdle int) PostgresReaderOption

WithPostgresConnectionPool configures the connection pool.

func WithPostgresConnectionTimeout

func WithPostgresConnectionTimeout(lifetime, idleTime time.Duration) PostgresReaderOption

WithPostgresConnectionTimeout sets connection and idle timeouts.

func WithPostgresCursor

func WithPostgresCursor(useCursor bool, cursorName string) PostgresReaderOption

WithPostgresCursor enables or disables server-side cursor usage for large results.

func WithPostgresDSN

func WithPostgresDSN(dsn string) PostgresReaderOption

WithPostgresDSN sets the PostgreSQL connection string.

func WithPostgresHealthCheckInterval

func WithPostgresHealthCheckInterval(interval time.Duration) PostgresReaderOption

Add functional option for health check interval

func WithPostgresMetadata

func WithPostgresMetadata(metadata map[string]string) PostgresReaderOption

WithPostgresMetadata sets user metadata for the reader.

func WithPostgresQuery

func WithPostgresQuery(query string, params ...interface{}) PostgresReaderOption

WithPostgresQuery sets the SQL query and optional parameters.

func WithPostgresQueryTimeout

func WithPostgresQueryTimeout(timeout time.Duration) PostgresReaderOption

WithPostgresQueryTimeout sets the query execution timeout.

type PostgresReaderOptions

type PostgresReaderOptions struct {
	DSN                 string            // Database connection string
	Query               string            // SQL query to execute
	Params              []interface{}     // Optional query parameters
	BatchSize           int               // Records to fetch per batch (used for cursor queries)
	ConnMaxLifetime     time.Duration     // Maximum connection lifetime
	ConnMaxIdleTime     time.Duration     // Maximum connection idle time
	MaxOpenConns        int               // Maximum open connections
	MaxIdleConns        int               // Maximum idle connections
	QueryTimeout        time.Duration     // Query execution timeout
	Metadata            map[string]string // Custom metadata
	UseCursor           bool              // Use server-side cursor for large results
	CursorName          string            // Name for the cursor (if UseCursor is true)
	HealthCheckInterval time.Duration
}

PostgresReaderOptions configures the Postgres reader

type PostgresReaderStats

type PostgresReaderStats struct {
	RecordsRead     int64
	QueryDuration   time.Duration
	ReadDuration    time.Duration
	LastReadTime    time.Time
	NullValueCounts map[string]int64
	ConnectionTime  time.Duration
}

PostgresReaderStats holds statistics about the Postgres reader's performance

type ReaderOption

type ReaderOption func(*ParquetReaderOptions)

ReaderOption represents a configuration function

func WithBatchSize

func WithBatchSize(size int64) ReaderOption

Functional option functions

func WithColumnProjection

func WithColumnProjection(columns ...string) ReaderOption

func WithColumns

func WithColumns(columns []string) ReaderOption

func WithMemoryLimit

func WithMemoryLimit(limit int64) ReaderOption

func WithMetadata

func WithMetadata(metadata map[string]string) ReaderOption

func WithParallelRead

func WithParallelRead(parallel bool) ReaderOption

func WithPreloadBatch

func WithPreloadBatch(preload bool) ReaderOption

func WithUseArrowTypes

func WithUseArrowTypes(useArrow bool) ReaderOption

type ReaderOptionCSV

type ReaderOptionCSV func(*CSVReaderOptions)

ReaderOptionCSV allows functional customization of CSVReader.

func WithCSVComma

func WithCSVComma(r rune) ReaderOptionCSV

WithCSVComma sets the field delimiter for the CSV reader.

func WithCSVHasHeaders

func WithCSVHasHeaders(hasHeaders bool) ReaderOptionCSV

WithCSVHasHeaders enables or disables header row detection.

func WithCSVTrimSpace

func WithCSVTrimSpace(trim bool) ReaderOptionCSV

WithCSVTrimSpace enables or disables trimming of leading spaces.

type ReaderOptionHTTP

type ReaderOptionHTTP func(*HTTPReaderOptions)

ReaderOptionHTTP is a functional option for HTTPReaderOptions

func WithHTTPAPIKey

func WithHTTPAPIKey(headerName, apiKey string) ReaderOptionHTTP

func WithHTTPAuth

func WithHTTPAuth(auth *AuthConfig) ReaderOptionHTTP

func WithHTTPBasicAuth

func WithHTTPBasicAuth(username, password string) ReaderOptionHTTP

func WithHTTPBearerToken

func WithHTTPBearerToken(token string) ReaderOptionHTTP

func WithHTTPClient

func WithHTTPClient(client *http.Client) ReaderOptionHTTP

func WithHTTPDataPath

func WithHTTPDataPath(path string) ReaderOptionHTTP

func WithHTTPHeaders

func WithHTTPHeaders(headers map[string]string) ReaderOptionHTTP

func WithHTTPMethod

func WithHTTPMethod(method string) ReaderOptionHTTP

Functional option functions

func WithHTTPPagination

func WithHTTPPagination(pagination *PaginationConfig) ReaderOptionHTTP

func WithHTTPQueryParams

func WithHTTPQueryParams(params map[string]string) ReaderOptionHTTP

func WithHTTPRateLimit

func WithHTTPRateLimit(delay time.Duration) ReaderOptionHTTP

func WithHTTPResponseFormat

func WithHTTPResponseFormat(format string) ReaderOptionHTTP

func WithHTTPRetries

func WithHTTPRetries(attempts int, delay time.Duration) ReaderOptionHTTP

func WithHTTPTimeout

func WithHTTPTimeout(timeout time.Duration) ReaderOptionHTTP

func WithHTTPUserAgent

func WithHTTPUserAgent(userAgent string) ReaderOptionHTTP

type ReaderOptionJSON

type ReaderOptionJSON func(*JSONReaderOptions)

ReaderOptionJSON is a functional option for JSONReaderOptions.

func WithBufferSize

func WithBufferSize(size int) ReaderOptionJSON

WithBufferSize sets the buffer size for the JSONReader's scanner.

type ReaderOptionMongo

type ReaderOptionMongo func(*MongoReaderOptions)

ReaderOptionMongo is a functional option for MongoReaderOptions

func WithMongoAllowDiskUse

func WithMongoAllowDiskUse(allow bool) ReaderOptionMongo

func WithMongoAuth

func WithMongoAuth(username, password, authDB string) ReaderOptionMongo

Authentication options

func WithMongoBatchSize

func WithMongoBatchSize(batchSize int32) ReaderOptionMongo

func WithMongoCollation

func WithMongoCollation(collation *options.Collation) ReaderOptionMongo

func WithMongoCollection

func WithMongoCollection(collection string) ReaderOptionMongo

func WithMongoComment

func WithMongoComment(comment string) ReaderOptionMongo

func WithMongoDB

func WithMongoDB(database string) ReaderOptionMongo

func WithMongoFilter

func WithMongoFilter(filter bson.M) ReaderOptionMongo

Query options

func WithMongoHint

func WithMongoHint(hint interface{}) ReaderOptionMongo

Advanced options

func WithMongoLimit

func WithMongoLimit(limit int64) ReaderOptionMongo

func WithMongoMaxTime

func WithMongoMaxTime(maxTimeMS int64) ReaderOptionMongo

func WithMongoPipeline

func WithMongoPipeline(pipeline []bson.M) ReaderOptionMongo

func WithMongoPoolSize

func WithMongoPoolSize(min, max uint64) ReaderOptionMongo

func WithMongoProjection

func WithMongoProjection(projection bson.M) ReaderOptionMongo

func WithMongoReadConcern

func WithMongoReadConcern(concern string) ReaderOptionMongo

func WithMongoReadPreference

func WithMongoReadPreference(preference string) ReaderOptionMongo

func WithMongoSkip

func WithMongoSkip(skip int64) ReaderOptionMongo

func WithMongoSort

func WithMongoSort(sort bson.M) ReaderOptionMongo

func WithMongoTLS

func WithMongoTLS(enabled, insecure bool) ReaderOptionMongo

TLS options

func WithMongoTimeout

func WithMongoTimeout(timeout time.Duration) ReaderOptionMongo

Performance options

func WithMongoURI

func WithMongoURI(uri string) ReaderOptionMongo

Connection options

type ReaderOptionS3

type ReaderOptionS3 func(*S3ReaderOptions)

ReaderOptionS3 represents a configuration function for S3Reader

func WithS3BatchSize

func WithS3BatchSize(batchSize int) ReaderOptionS3

func WithS3Bucket

func WithS3Bucket(bucket string) ReaderOptionS3

Functional option functions

func WithS3BufferSize

func WithS3BufferSize(bufferSize int64) ReaderOptionS3

func WithS3Credentials

func WithS3Credentials(creds aws.Credentials) ReaderOptionS3

func WithS3Endpoint

func WithS3Endpoint(endpoint string) ReaderOptionS3

func WithS3FilePattern

func WithS3FilePattern(pattern string) ReaderOptionS3

func WithS3IncludeMetadata

func WithS3IncludeMetadata(include bool) ReaderOptionS3

func WithS3MaxKeys

func WithS3MaxKeys(maxKeys int32) ReaderOptionS3

func WithS3PathStyle

func WithS3PathStyle(pathStyle bool) ReaderOptionS3

func WithS3Prefix

func WithS3Prefix(prefix string) ReaderOptionS3

func WithS3Profile

func WithS3Profile(profile string) ReaderOptionS3

func WithS3Recursive

func WithS3Recursive(recursive bool) ReaderOptionS3

func WithS3Region

func WithS3Region(region string) ReaderOptionS3

func WithS3SortOrder

func WithS3SortOrder(order SortOrder) ReaderOptionS3

func WithS3Suffix

func WithS3Suffix(suffix string) ReaderOptionS3

type ReaderStats

type ReaderStats struct {
	RecordsRead     int64
	BatchesRead     int64
	BytesRead       int64
	ReadDuration    time.Duration
	LastReadTime    time.Time
	NullValueCounts map[string]int64
}

ReaderStats holds statistics about the Parquet reader's performance

type S3Object

type S3Object struct {
	Key          string
	Size         int64
	LastModified time.Time
	ETag         string
	Metadata     map[string]string
}

S3Object represents an S3 object with metadata

type S3Reader

type S3Reader struct {
	// contains filtered or unexported fields
}

S3Reader implements core.DataSource for reading from Amazon S3

func NewS3Reader

func NewS3Reader(options ...ReaderOptionS3) (*S3Reader, error)

NewS3Reader creates a new S3 reader with the specified options

func (*S3Reader) Close

func (s *S3Reader) Close() error

Close implements the core.DataSource interface

func (*S3Reader) Objects

func (s *S3Reader) Objects() []S3Object

Objects returns the list of S3 objects that will be/have been processed

func (*S3Reader) Read

func (s *S3Reader) Read(ctx context.Context) (core.Record, error)

Read implements the core.DataSource interface

func (*S3Reader) Stats

func (s *S3Reader) Stats() S3ReaderStats

Stats returns S3 reader performance statistics

type S3ReaderError

type S3ReaderError struct {
	Op  string // Operation that failed (e.g., "list_objects", "get_object", "read")
	Err error  // Underlying error
}

S3ReaderError provides structured error information for S3 reader operations

func (*S3ReaderError) Error

func (e *S3ReaderError) Error() string

func (*S3ReaderError) Unwrap

func (e *S3ReaderError) Unwrap() error

type S3ReaderOptions

type S3ReaderOptions struct {
	Bucket          string            // S3 bucket name
	Prefix          string            // Key prefix filter
	Suffix          string            // Key suffix filter (e.g., ".csv", ".json")
	MaxKeys         int32             // Maximum number of objects to list
	Region          string            // AWS region
	Profile         string            // AWS profile to use
	Credentials     aws.Credentials   // Explicit credentials
	EndpointURL     string            // Custom S3 endpoint (for S3-compatible services)
	ForcePathStyle  bool              // Use path-style addressing
	FilePattern     string            // Regex pattern for file matching
	Recursive       bool              // Process subdirectories recursively
	SortOrder       SortOrder         // Order to process files
	BatchSize       int               // Number of files to process in parallel
	BufferSize      int64             // Buffer size for reading objects
	Metadata        map[string]string // Custom metadata filters
	IncludeMetadata bool              // Include S3 object metadata in records
}

S3ReaderOptions configures the S3 reader behavior

type S3ReaderStats

type S3ReaderStats struct {
	ObjectsListed  int64         // Total objects discovered
	ObjectsRead    int64         // Total objects successfully read
	RecordsRead    int64         // Total records read across all objects
	BytesRead      int64         // Total bytes read
	ReadDuration   time.Duration // Total time spent reading
	LastReadTime   time.Time     // Time of last read operation
	ObjectErrors   int64         // Number of objects that failed to read
	CurrentObject  string        // Currently processing object
	ProcessedFiles []string      // List of successfully processed files
}

S3ReaderStats holds statistics about the S3 reader's performance

type SortOrder

type SortOrder string

SortOrder defines how files should be ordered for processing

const (
	SortByName         SortOrder = "name"          // Sort by object key
	SortByLastModified SortOrder = "last_modified" // Sort by modification time
	SortBySize         SortOrder = "size"          // Sort by object size
	SortNone           SortOrder = "none"          // No sorting (S3 order)
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL