Documentation
¶
Index ¶
- type AuthConfig
- type CSVReader
- type CSVReaderError
- type CSVReaderOptions
- type CSVReaderStats
- type HTTPReader
- func NewAuthenticatedHTTPReader(url, token string) (*HTTPReader, error)
- func NewHTTPReader(url string, options ...ReaderOptionHTTP) (*HTTPReader, error)
- func NewHTTPReaderFromURL(url string) (*HTTPReader, error)
- func NewPaginatedHTTPReader(url string, pageSize int, paginationType string) (*HTTPReader, error)
- type HTTPReaderError
- type HTTPReaderOptions
- type HTTPReaderStats
- type JSONReader
- type JSONReaderError
- type JSONReaderOptions
- type JSONReaderStats
- type MongoReadMode
- type MongoReader
- func NewMongoAggregationReader(uri, database, collection string, pipeline []bson.M) (*MongoReader, error)
- func NewMongoChangeStreamReader(uri, database, collection string) (*MongoReader, error)
- func NewMongoFilteredReader(uri, database, collection string, filter, projection bson.M) (*MongoReader, error)
- func NewMongoReader(options ...ReaderOptionMongo) (*MongoReader, error)
- func NewMongoReaderFromURI(uri, database, collection string) (*MongoReader, error)
- type MongoReaderError
- type MongoReaderOptions
- type MongoReaderStats
- type PaginationConfig
- type ParquetReader
- type ParquetReaderError
- type ParquetReaderOptions
- type PostgresReader
- type PostgresReaderError
- type PostgresReaderOption
- func WithPostgresBatchSize(size int) PostgresReaderOption
- func WithPostgresConnectionPool(maxOpen, maxIdle int) PostgresReaderOption
- func WithPostgresConnectionTimeout(lifetime, idleTime time.Duration) PostgresReaderOption
- func WithPostgresCursor(useCursor bool, cursorName string) PostgresReaderOption
- func WithPostgresDSN(dsn string) PostgresReaderOption
- func WithPostgresHealthCheckInterval(interval time.Duration) PostgresReaderOption
- func WithPostgresMetadata(metadata map[string]string) PostgresReaderOption
- func WithPostgresQuery(query string, params ...interface{}) PostgresReaderOption
- func WithPostgresQueryTimeout(timeout time.Duration) PostgresReaderOption
- type PostgresReaderOptions
- type PostgresReaderStats
- type ReaderOption
- func WithBatchSize(size int64) ReaderOption
- func WithColumnProjection(columns ...string) ReaderOption
- func WithColumns(columns []string) ReaderOption
- func WithMemoryLimit(limit int64) ReaderOption
- func WithMetadata(metadata map[string]string) ReaderOption
- func WithParallelRead(parallel bool) ReaderOption
- func WithPreloadBatch(preload bool) ReaderOption
- func WithUseArrowTypes(useArrow bool) ReaderOption
- type ReaderOptionCSV
- type ReaderOptionHTTP
- func WithHTTPAPIKey(headerName, apiKey string) ReaderOptionHTTP
- func WithHTTPAuth(auth *AuthConfig) ReaderOptionHTTP
- func WithHTTPBasicAuth(username, password string) ReaderOptionHTTP
- func WithHTTPBearerToken(token string) ReaderOptionHTTP
- func WithHTTPClient(client *http.Client) ReaderOptionHTTP
- func WithHTTPDataPath(path string) ReaderOptionHTTP
- func WithHTTPHeaders(headers map[string]string) ReaderOptionHTTP
- func WithHTTPMethod(method string) ReaderOptionHTTP
- func WithHTTPPagination(pagination *PaginationConfig) ReaderOptionHTTP
- func WithHTTPQueryParams(params map[string]string) ReaderOptionHTTP
- func WithHTTPRateLimit(delay time.Duration) ReaderOptionHTTP
- func WithHTTPResponseFormat(format string) ReaderOptionHTTP
- func WithHTTPRetries(attempts int, delay time.Duration) ReaderOptionHTTP
- func WithHTTPTimeout(timeout time.Duration) ReaderOptionHTTP
- func WithHTTPUserAgent(userAgent string) ReaderOptionHTTP
- type ReaderOptionJSON
- type ReaderOptionMongo
- func WithMongoAllowDiskUse(allow bool) ReaderOptionMongo
- func WithMongoAuth(username, password, authDB string) ReaderOptionMongo
- func WithMongoBatchSize(batchSize int32) ReaderOptionMongo
- func WithMongoCollation(collation *options.Collation) ReaderOptionMongo
- func WithMongoCollection(collection string) ReaderOptionMongo
- func WithMongoComment(comment string) ReaderOptionMongo
- func WithMongoDB(database string) ReaderOptionMongo
- func WithMongoFilter(filter bson.M) ReaderOptionMongo
- func WithMongoHint(hint interface{}) ReaderOptionMongo
- func WithMongoLimit(limit int64) ReaderOptionMongo
- func WithMongoMaxTime(maxTimeMS int64) ReaderOptionMongo
- func WithMongoPipeline(pipeline []bson.M) ReaderOptionMongo
- func WithMongoPoolSize(min, max uint64) ReaderOptionMongo
- func WithMongoProjection(projection bson.M) ReaderOptionMongo
- func WithMongoReadConcern(concern string) ReaderOptionMongo
- func WithMongoReadPreference(preference string) ReaderOptionMongo
- func WithMongoSkip(skip int64) ReaderOptionMongo
- func WithMongoSort(sort bson.M) ReaderOptionMongo
- func WithMongoTLS(enabled, insecure bool) ReaderOptionMongo
- func WithMongoTimeout(timeout time.Duration) ReaderOptionMongo
- func WithMongoURI(uri string) ReaderOptionMongo
- type ReaderOptionS3
- func WithS3BatchSize(batchSize int) ReaderOptionS3
- func WithS3Bucket(bucket string) ReaderOptionS3
- func WithS3BufferSize(bufferSize int64) ReaderOptionS3
- func WithS3Credentials(creds aws.Credentials) ReaderOptionS3
- func WithS3Endpoint(endpoint string) ReaderOptionS3
- func WithS3FilePattern(pattern string) ReaderOptionS3
- func WithS3IncludeMetadata(include bool) ReaderOptionS3
- func WithS3MaxKeys(maxKeys int32) ReaderOptionS3
- func WithS3PathStyle(pathStyle bool) ReaderOptionS3
- func WithS3Prefix(prefix string) ReaderOptionS3
- func WithS3Profile(profile string) ReaderOptionS3
- func WithS3Recursive(recursive bool) ReaderOptionS3
- func WithS3Region(region string) ReaderOptionS3
- func WithS3SortOrder(order SortOrder) ReaderOptionS3
- func WithS3Suffix(suffix string) ReaderOptionS3
- type ReaderStats
- type S3Object
- type S3Reader
- type S3ReaderError
- type S3ReaderOptions
- type S3ReaderStats
- type SortOrder
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AuthConfig ¶
type AuthConfig struct {
Type string // "bearer", "basic", "apikey", "oauth2", "custom"
Token string // Bearer token or API key
Username string // For basic auth
Password string // For basic auth
HeaderName string // Custom header name for API key
HeaderValue string // Custom header value
QueryParam string // Query parameter name for API key
CustomHeaders map[string]string // Additional custom headers
}
AuthConfig defines authentication configuration
type CSVReader ¶
type CSVReader struct {
// contains filtered or unexported fields
}
CSVReader implements core.DataSource for CSV files. It supports header detection, delimiter configuration, type inference, and statistics.
func NewCSVReader ¶
func NewCSVReader(r io.ReadCloser, options ...ReaderOptionCSV) (*CSVReader, error)
NewCSVReader creates a CSVReader with default or overridden options. Accepts functional options for configuration. Returns a ready-to-use reader or an error.
func (*CSVReader) Close ¶
Close implements the core.DataSource interface. Closes the underlying reader.
func (*CSVReader) Read ¶
Read implements the core.DataSource interface. Reads the next record from the CSV file. Thread-safe and context-aware.
func (*CSVReader) Stats ¶
func (c *CSVReader) Stats() CSVReaderStats
Stats returns CSV reader performance stats.
type CSVReaderError ¶
type CSVReaderError struct {
Op string // Operation that failed (e.g., "read", "read_headers", "read_record")
Err error // Underlying error
}
CSVReaderError wraps structured error information for the CSV reader.
func (*CSVReaderError) Error ¶
func (e *CSVReaderError) Error() string
func (*CSVReaderError) Unwrap ¶
func (e *CSVReaderError) Unwrap() error
type CSVReaderOptions ¶
type CSVReaderOptions struct {
Comma rune // Field delimiter (default ',')
Comment rune // Comment character (optional)
FieldsPerRecord int // Number of expected fields per record (optional)
LazyQuotes bool // Allow lazy quotes in CSV
TrimLeadingSpace bool // Trim leading space in fields
HasHeaders bool // Whether the first row is a header
}
CSVReaderOptions configures the CSV reader.
type CSVReaderStats ¶
type CSVReaderStats struct {
RecordsRead int64 // Total records read
ReadDuration time.Duration // Total time spent reading
LastReadTime time.Time // Time of last read
NullValueCounts map[string]int64 // Count of null values per field
}
CSVReaderStats holds statistics about the CSV reader's performance.
type HTTPReader ¶
type HTTPReader struct {
// contains filtered or unexported fields
}
HTTPReader implements core.DataSource for HTTP APIs
func NewAuthenticatedHTTPReader ¶
func NewAuthenticatedHTTPReader(url, token string) (*HTTPReader, error)
NewAuthenticatedHTTPReader creates an HTTP reader with bearer token authentication
func NewHTTPReader ¶
func NewHTTPReader(url string, options ...ReaderOptionHTTP) (*HTTPReader, error)
NewHTTPReader creates a new HTTP API reader with configurable options
func NewHTTPReaderFromURL ¶
func NewHTTPReaderFromURL(url string) (*HTTPReader, error)
NewHTTPReaderFromURL creates a basic HTTP reader for a single URL
func NewPaginatedHTTPReader ¶
func NewPaginatedHTTPReader(url string, pageSize int, paginationType string) (*HTTPReader, error)
NewPaginatedHTTPReader creates an HTTP reader with pagination support
func (*HTTPReader) Close ¶
func (hr *HTTPReader) Close() error
Close implements the core.DataSource interface
func (*HTTPReader) Stats ¶
func (hr *HTTPReader) Stats() HTTPReaderStats
Stats returns HTTP reader performance statistics
type HTTPReaderError ¶
type HTTPReaderError struct {
Op string // Operation that failed (e.g., "request", "auth", "parse", "pagination")
StatusCode int // HTTP status code if applicable
URL string // URL being accessed when error occurred
Err error // Underlying error
}
HTTPReaderError provides structured error information for HTTP reader operations
func (*HTTPReaderError) Error ¶
func (e *HTTPReaderError) Error() string
func (*HTTPReaderError) Unwrap ¶
func (e *HTTPReaderError) Unwrap() error
type HTTPReaderOptions ¶
type HTTPReaderOptions struct {
Method string // HTTP method (default: GET)
Headers map[string]string // Additional headers
QueryParams map[string]string // Query parameters
Body io.Reader // Request body for POST/PUT
Auth *AuthConfig // Authentication configuration
Pagination *PaginationConfig // Pagination configuration
Timeout time.Duration // Request timeout
RetryAttempts int // Number of retry attempts
RetryDelay time.Duration // Base delay between retries
RateLimit time.Duration // Minimum time between requests
ResponseFormat string // "json", "jsonl", "csv", "xml"
DataPath string // JSON path to extract data array
BufferSize int // Buffer size for response reading
MaxResponseSize int64 // Maximum response size in bytes
FollowRedirects bool // Follow HTTP redirects
ValidStatusCodes []int // Valid HTTP status codes
UserAgent string // User agent string
AcceptEncoding string // Accept encoding header
CustomClient *http.Client // Custom HTTP client
}
HTTPReaderOptions configures the HTTP reader
type HTTPReaderStats ¶
type HTTPReaderStats struct {
RequestCount int64 // Total HTTP requests made
RecordsRead int64 // Total records read
BytesRead int64 // Total bytes read
ReadDuration time.Duration // Total time spent reading
LastReadTime time.Time // Time of last read
RetryCount int64 // Number of retries performed
RateLimitHits int64 // Number of rate limit hits
NullValueCounts map[string]int64 // Count of null values per field
ResponseTimes []time.Duration // Response times for monitoring
}
HTTPReaderStats holds statistics about the HTTP reader's performance
type JSONReader ¶
type JSONReader struct {
// contains filtered or unexported fields
}
JSONReader implements core.DataSource for line-delimited JSON files. It supports buffered reading, batch configuration, and statistics.
func NewJSONReader ¶
func NewJSONReader(r io.ReadCloser, options ...ReaderOptionJSON) *JSONReader
NewJSONReader creates a new line-delimited JSON reader with optional config. Accepts functional options for configuration. Returns a ready-to-use reader.
func (*JSONReader) Close ¶
func (j *JSONReader) Close() error
Close implements the core.DataSource interface. Closes the underlying reader.
func (*JSONReader) Read ¶
Read implements the core.DataSource interface, returning one JSON record per line. Thread-safe and context-aware.
func (*JSONReader) Stats ¶
func (j *JSONReader) Stats() JSONReaderStats
Stats returns reader metrics like bytes read, nulls, and durations.
type JSONReaderError ¶
type JSONReaderError struct {
Op string // Operation that failed (e.g., "read", "scan", "unmarshal")
Err error // Underlying error
}
JSONReaderError wraps detailed context for JSONReader operations.
func (*JSONReaderError) Error ¶
func (e *JSONReaderError) Error() string
func (*JSONReaderError) Unwrap ¶
func (e *JSONReaderError) Unwrap() error
type JSONReaderOptions ¶
type JSONReaderOptions struct {
BufferSize int // Optional buffer size in bytes for scanning
}
JSONReaderOptions configures optional behavior for the reader.
type JSONReaderStats ¶
type JSONReaderStats struct {
RecordsRead int64 // Total records read
BytesRead int64 // Total bytes read
ReadDuration time.Duration // Total time spent reading
LastReadTime time.Time // Time of last read
NullValueCounts map[string]int64 // Count of null values per field
}
JSONReaderStats provides metrics about JSONReader activity.
type MongoReadMode ¶
type MongoReadMode string
MongoReadMode defines how data should be read from MongoDB
const ( ModeFind MongoReadMode = "find" // Standard find query ModeAggregate MongoReadMode = "aggregate" // Aggregation pipeline ModeWatch MongoReadMode = "watch" // Change stream ModeBulk MongoReadMode = "bulk" // Bulk read with pagination )
type MongoReader ¶
type MongoReader struct {
// contains filtered or unexported fields
}
MongoReader implements core.DataSource for MongoDB collections
func NewMongoAggregationReader ¶
func NewMongoAggregationReader(uri, database, collection string, pipeline []bson.M) (*MongoReader, error)
NewMongoAggregationReader creates a MongoDB reader with aggregation pipeline
func NewMongoChangeStreamReader ¶
func NewMongoChangeStreamReader(uri, database, collection string) (*MongoReader, error)
NewMongoChangeStreamReader creates a MongoDB change stream reader
func NewMongoFilteredReader ¶
func NewMongoFilteredReader(uri, database, collection string, filter, projection bson.M) (*MongoReader, error)
NewMongoFilteredReader creates a MongoDB reader with filter and projection
func NewMongoReader ¶
func NewMongoReader(options ...ReaderOptionMongo) (*MongoReader, error)
NewMongoReader creates a new MongoDB reader with configurable options
func NewMongoReaderFromURI ¶
func NewMongoReaderFromURI(uri, database, collection string) (*MongoReader, error)
NewMongoReaderFromURI creates a basic MongoDB reader from a URI
func (*MongoReader) Close ¶
func (mr *MongoReader) Close() error
Close implements the core.DataSource interface
func (*MongoReader) Connect ¶
func (mr *MongoReader) Connect(ctx context.Context) error
Connect establishes connection to MongoDB
func (*MongoReader) Stats ¶
func (mr *MongoReader) Stats() MongoReaderStats
Stats returns MongoDB reader performance statistics
type MongoReaderError ¶
type MongoReaderError struct {
Op string // Operation that failed (e.g., "connect", "query", "decode", "aggregate")
Collection string // Collection being accessed when error occurred
Err error // Underlying error
}
MongoReaderError provides structured error information for MongoDB reader operations
func (*MongoReaderError) Error ¶
func (e *MongoReaderError) Error() string
func (*MongoReaderError) Unwrap ¶
func (e *MongoReaderError) Unwrap() error
type MongoReaderOptions ¶
type MongoReaderOptions struct {
URI string // MongoDB connection URI
Database string // Database name
Collection string // Collection name
Mode MongoReadMode // Read mode
Filter bson.M // Query filter for find operations
Projection bson.M // Field projection
Sort bson.M // Sort specification
Pipeline []bson.M // Aggregation pipeline stages
BatchSize int32 // Batch size for cursor
Limit int64 // Maximum number of documents to read
Skip int64 // Number of documents to skip
Timeout time.Duration // Operation timeout
MaxPoolSize uint64 // Connection pool size
MinPoolSize uint64 // Minimum connections in pool
MaxConnIdleTime time.Duration // Max idle time for connections
ReadPreference string // Read preference: primary, secondary, etc.
ReadConcern string // Read concern level
AuthDatabase string // Authentication database
Username string // Authentication username
Password string // Authentication password
TLS bool // Enable TLS
TLSInsecure bool // Skip TLS verification
ReplicaSetName string // Replica set name
RetryReads bool // Enable read retries
RetryWrites bool // Enable write retries
Compressors []string // Compression algorithms
ZlibLevel int // Zlib compression level
CustomClientOpts []*options.ClientOptions // Custom client options
AllowDiskUse bool // Allow aggregation to use disk
Hint interface{} // Index hint for queries
MaxTimeMS int64 // Maximum execution time
Comment string // Query comment for profiling
Collation *options.Collation // Collation options
}
MongoReaderOptions configures the MongoDB reader
type MongoReaderStats ¶
type MongoReaderStats struct {
RecordsRead int64 // Total records read
QueriesExecuted int64 // Total queries executed
ReadDuration time.Duration // Total time spent reading
LastReadTime time.Time // Time of last read
BatchesRead int64 // Number of batches processed
BytesRead int64 // Estimated bytes read
NullValueCounts map[string]int64 // Count of null values per field
ErrorCount int64 // Number of errors encountered
}
MongoReaderStats holds statistics about the MongoDB reader's performance
type PaginationConfig ¶
type PaginationConfig struct {
Type string // "offset", "cursor", "page", "link_header", "none"
LimitParam string // Parameter name for limit/page size
OffsetParam string // Parameter name for offset
PageParam string // Parameter name for page number
CursorParam string // Parameter name for cursor
PageSize int // Number of records per page
MaxPages int // Maximum pages to fetch (0 = unlimited)
NextURLField string // JSON field containing next page URL
CursorField string // JSON field containing next cursor
TotalField string // JSON field containing total count
HasMoreField string // JSON field indicating more data available
}
PaginationConfig defines pagination behavior
type ParquetReader ¶
type ParquetReader struct {
// contains filtered or unexported fields
}
ParquetReader implements DataSource for Parquet files Supports optional column projection and safe resource management
func NewParquetReader ¶
func NewParquetReader(filename string, options ...ReaderOption) (*ParquetReader, error)
NewParquetReader opens a Parquet file and prepares an Arrow RecordReader
func (*ParquetReader) Close ¶
func (p *ParquetReader) Close() error
Close releases resources and closes the underlying file
func (*ParquetReader) Read ¶
Read reads the next record from the Parquet file, returning core.Record or io.EOF
func (*ParquetReader) Schema ¶
func (p *ParquetReader) Schema() *arrow.Schema
Schema returns the Arrow schema of the Parquet file
func (*ParquetReader) Stats ¶
func (p *ParquetReader) Stats() ReaderStats
Stats returns statistics about the Parquet reader's performance such as number of records read, batches processed, and null value counts This can be useful for monitoring and debugging the reading process
type ParquetReaderError ¶
type ParquetReaderError struct {
Op string // Operation that failed (e.g., "read", "load_batch", "open_file", "schema")
Err error // Underlying error
}
ParquetReaderError provides structured error information for parquet reader operations
func (*ParquetReaderError) Error ¶
func (e *ParquetReaderError) Error() string
func (*ParquetReaderError) Unwrap ¶
func (e *ParquetReaderError) Unwrap() error
type ParquetReaderOptions ¶
type ParquetReaderOptions struct {
BatchSize int64
Columns []string
UseArrowTypes bool // Return native Arrow types vs Go primitives
PreloadBatch bool // Preload next batch for better performance
MemoryLimit int64 // Memory usage limit for batches
ParallelRead bool // Enable parallel column reading
Metadata map[string]string // Custom metadata filters
}
ParquetReaderOptions configures the Parquet reader BatchSize: rows per batch Columns: optional list of column names to project
type PostgresReader ¶
type PostgresReader struct {
// contains filtered or unexported fields
}
PostgresReader implements core.DataSource for PostgreSQL databases. Supports streaming query results with configurable batch processing, connection pooling, and cursor-based streaming.
func NewPostgresReader ¶
func NewPostgresReader(options ...PostgresReaderOption) (*PostgresReader, error)
NewPostgresReader creates a new PostgreSQL reader with the given options. Accepts functional options for configuration. Returns a ready-to-use reader or an error.
func (*PostgresReader) Close ¶
func (p *PostgresReader) Close() error
Close releases all resources held by the PostgreSQL reader
func (*PostgresReader) Read ¶
Read implements the core.DataSource interface. Reads the next record from the PostgreSQL query result. Thread-safe.
func (*PostgresReader) Schema ¶
func (p *PostgresReader) Schema() map[string]string
Schema returns information about the columns in the query result. Returns a map of column name to database type name.
func (*PostgresReader) Stats ¶
func (p *PostgresReader) Stats() PostgresReaderStats
Stats returns statistics about the PostgreSQL reader's performance
type PostgresReaderError ¶
type PostgresReaderError struct {
Op string // Operation that failed (e.g., "connect", "query", "scan", "read")
Err error // Underlying error
}
PostgresReaderError provides structured error information for Postgres reader operations
func (*PostgresReaderError) Error ¶
func (e *PostgresReaderError) Error() string
func (*PostgresReaderError) Unwrap ¶
func (e *PostgresReaderError) Unwrap() error
type PostgresReaderOption ¶
type PostgresReaderOption func(*PostgresReaderOptions)
PostgresReaderOption represents a configuration function for PostgresReaderOptions
func WithPostgresBatchSize ¶
func WithPostgresBatchSize(size int) PostgresReaderOption
WithPostgresBatchSize sets the batch size for query results.
func WithPostgresConnectionPool ¶
func WithPostgresConnectionPool(maxOpen, maxIdle int) PostgresReaderOption
WithPostgresConnectionPool configures the connection pool.
func WithPostgresConnectionTimeout ¶
func WithPostgresConnectionTimeout(lifetime, idleTime time.Duration) PostgresReaderOption
WithPostgresConnectionTimeout sets connection and idle timeouts.
func WithPostgresCursor ¶
func WithPostgresCursor(useCursor bool, cursorName string) PostgresReaderOption
WithPostgresCursor enables or disables server-side cursor usage for large results.
func WithPostgresDSN ¶
func WithPostgresDSN(dsn string) PostgresReaderOption
WithPostgresDSN sets the PostgreSQL connection string.
func WithPostgresHealthCheckInterval ¶
func WithPostgresHealthCheckInterval(interval time.Duration) PostgresReaderOption
Add functional option for health check interval
func WithPostgresMetadata ¶
func WithPostgresMetadata(metadata map[string]string) PostgresReaderOption
WithPostgresMetadata sets user metadata for the reader.
func WithPostgresQuery ¶
func WithPostgresQuery(query string, params ...interface{}) PostgresReaderOption
WithPostgresQuery sets the SQL query and optional parameters.
func WithPostgresQueryTimeout ¶
func WithPostgresQueryTimeout(timeout time.Duration) PostgresReaderOption
WithPostgresQueryTimeout sets the query execution timeout.
type PostgresReaderOptions ¶
type PostgresReaderOptions struct {
DSN string // Database connection string
Query string // SQL query to execute
Params []interface{} // Optional query parameters
BatchSize int // Records to fetch per batch (used for cursor queries)
ConnMaxLifetime time.Duration // Maximum connection lifetime
ConnMaxIdleTime time.Duration // Maximum connection idle time
MaxOpenConns int // Maximum open connections
MaxIdleConns int // Maximum idle connections
QueryTimeout time.Duration // Query execution timeout
Metadata map[string]string // Custom metadata
UseCursor bool // Use server-side cursor for large results
CursorName string // Name for the cursor (if UseCursor is true)
HealthCheckInterval time.Duration
}
PostgresReaderOptions configures the Postgres reader
type PostgresReaderStats ¶
type PostgresReaderStats struct {
RecordsRead int64
QueryDuration time.Duration
ReadDuration time.Duration
LastReadTime time.Time
NullValueCounts map[string]int64
ConnectionTime time.Duration
}
PostgresReaderStats holds statistics about the Postgres reader's performance
type ReaderOption ¶
type ReaderOption func(*ParquetReaderOptions)
ReaderOption represents a configuration function
func WithColumnProjection ¶
func WithColumnProjection(columns ...string) ReaderOption
func WithColumns ¶
func WithColumns(columns []string) ReaderOption
func WithMemoryLimit ¶
func WithMemoryLimit(limit int64) ReaderOption
func WithMetadata ¶
func WithMetadata(metadata map[string]string) ReaderOption
func WithParallelRead ¶
func WithParallelRead(parallel bool) ReaderOption
func WithPreloadBatch ¶
func WithPreloadBatch(preload bool) ReaderOption
func WithUseArrowTypes ¶
func WithUseArrowTypes(useArrow bool) ReaderOption
type ReaderOptionCSV ¶
type ReaderOptionCSV func(*CSVReaderOptions)
ReaderOptionCSV allows functional customization of CSVReader.
func WithCSVComma ¶
func WithCSVComma(r rune) ReaderOptionCSV
WithCSVComma sets the field delimiter for the CSV reader.
func WithCSVHasHeaders ¶
func WithCSVHasHeaders(hasHeaders bool) ReaderOptionCSV
WithCSVHasHeaders enables or disables header row detection.
func WithCSVTrimSpace ¶
func WithCSVTrimSpace(trim bool) ReaderOptionCSV
WithCSVTrimSpace enables or disables trimming of leading spaces.
type ReaderOptionHTTP ¶
type ReaderOptionHTTP func(*HTTPReaderOptions)
ReaderOptionHTTP is a functional option for HTTPReaderOptions
func WithHTTPAPIKey ¶
func WithHTTPAPIKey(headerName, apiKey string) ReaderOptionHTTP
func WithHTTPAuth ¶
func WithHTTPAuth(auth *AuthConfig) ReaderOptionHTTP
func WithHTTPBasicAuth ¶
func WithHTTPBasicAuth(username, password string) ReaderOptionHTTP
func WithHTTPBearerToken ¶
func WithHTTPBearerToken(token string) ReaderOptionHTTP
func WithHTTPClient ¶
func WithHTTPClient(client *http.Client) ReaderOptionHTTP
func WithHTTPDataPath ¶
func WithHTTPDataPath(path string) ReaderOptionHTTP
func WithHTTPHeaders ¶
func WithHTTPHeaders(headers map[string]string) ReaderOptionHTTP
func WithHTTPMethod ¶
func WithHTTPMethod(method string) ReaderOptionHTTP
Functional option functions
func WithHTTPPagination ¶
func WithHTTPPagination(pagination *PaginationConfig) ReaderOptionHTTP
func WithHTTPQueryParams ¶
func WithHTTPQueryParams(params map[string]string) ReaderOptionHTTP
func WithHTTPRateLimit ¶
func WithHTTPRateLimit(delay time.Duration) ReaderOptionHTTP
func WithHTTPResponseFormat ¶
func WithHTTPResponseFormat(format string) ReaderOptionHTTP
func WithHTTPRetries ¶
func WithHTTPRetries(attempts int, delay time.Duration) ReaderOptionHTTP
func WithHTTPTimeout ¶
func WithHTTPTimeout(timeout time.Duration) ReaderOptionHTTP
func WithHTTPUserAgent ¶
func WithHTTPUserAgent(userAgent string) ReaderOptionHTTP
type ReaderOptionJSON ¶
type ReaderOptionJSON func(*JSONReaderOptions)
ReaderOptionJSON is a functional option for JSONReaderOptions.
func WithBufferSize ¶
func WithBufferSize(size int) ReaderOptionJSON
WithBufferSize sets the buffer size for the JSONReader's scanner.
type ReaderOptionMongo ¶
type ReaderOptionMongo func(*MongoReaderOptions)
ReaderOptionMongo is a functional option for MongoReaderOptions
func WithMongoAllowDiskUse ¶
func WithMongoAllowDiskUse(allow bool) ReaderOptionMongo
func WithMongoAuth ¶
func WithMongoAuth(username, password, authDB string) ReaderOptionMongo
Authentication options
func WithMongoBatchSize ¶
func WithMongoBatchSize(batchSize int32) ReaderOptionMongo
func WithMongoCollation ¶
func WithMongoCollation(collation *options.Collation) ReaderOptionMongo
func WithMongoCollection ¶
func WithMongoCollection(collection string) ReaderOptionMongo
func WithMongoComment ¶
func WithMongoComment(comment string) ReaderOptionMongo
func WithMongoDB ¶
func WithMongoDB(database string) ReaderOptionMongo
func WithMongoLimit ¶
func WithMongoLimit(limit int64) ReaderOptionMongo
func WithMongoMaxTime ¶
func WithMongoMaxTime(maxTimeMS int64) ReaderOptionMongo
func WithMongoPipeline ¶
func WithMongoPipeline(pipeline []bson.M) ReaderOptionMongo
func WithMongoPoolSize ¶
func WithMongoPoolSize(min, max uint64) ReaderOptionMongo
func WithMongoProjection ¶
func WithMongoProjection(projection bson.M) ReaderOptionMongo
func WithMongoReadConcern ¶
func WithMongoReadConcern(concern string) ReaderOptionMongo
func WithMongoReadPreference ¶
func WithMongoReadPreference(preference string) ReaderOptionMongo
func WithMongoSkip ¶
func WithMongoSkip(skip int64) ReaderOptionMongo
func WithMongoSort ¶
func WithMongoSort(sort bson.M) ReaderOptionMongo
func WithMongoTimeout ¶
func WithMongoTimeout(timeout time.Duration) ReaderOptionMongo
Performance options
type ReaderOptionS3 ¶
type ReaderOptionS3 func(*S3ReaderOptions)
ReaderOptionS3 represents a configuration function for S3Reader
func WithS3BatchSize ¶
func WithS3BatchSize(batchSize int) ReaderOptionS3
func WithS3BufferSize ¶
func WithS3BufferSize(bufferSize int64) ReaderOptionS3
func WithS3Credentials ¶
func WithS3Credentials(creds aws.Credentials) ReaderOptionS3
func WithS3Endpoint ¶
func WithS3Endpoint(endpoint string) ReaderOptionS3
func WithS3FilePattern ¶
func WithS3FilePattern(pattern string) ReaderOptionS3
func WithS3IncludeMetadata ¶
func WithS3IncludeMetadata(include bool) ReaderOptionS3
func WithS3MaxKeys ¶
func WithS3MaxKeys(maxKeys int32) ReaderOptionS3
func WithS3PathStyle ¶
func WithS3PathStyle(pathStyle bool) ReaderOptionS3
func WithS3Prefix ¶
func WithS3Prefix(prefix string) ReaderOptionS3
func WithS3Profile ¶
func WithS3Profile(profile string) ReaderOptionS3
func WithS3Recursive ¶
func WithS3Recursive(recursive bool) ReaderOptionS3
func WithS3Region ¶
func WithS3Region(region string) ReaderOptionS3
func WithS3SortOrder ¶
func WithS3SortOrder(order SortOrder) ReaderOptionS3
func WithS3Suffix ¶
func WithS3Suffix(suffix string) ReaderOptionS3
type ReaderStats ¶
type ReaderStats struct {
RecordsRead int64
BatchesRead int64
BytesRead int64
ReadDuration time.Duration
LastReadTime time.Time
NullValueCounts map[string]int64
}
ReaderStats holds statistics about the Parquet reader's performance
type S3Object ¶
type S3Object struct {
Key string
Size int64
LastModified time.Time
ETag string
Metadata map[string]string
}
S3Object represents an S3 object with metadata
type S3Reader ¶
type S3Reader struct {
// contains filtered or unexported fields
}
S3Reader implements core.DataSource for reading from Amazon S3
func NewS3Reader ¶
func NewS3Reader(options ...ReaderOptionS3) (*S3Reader, error)
NewS3Reader creates a new S3 reader with the specified options
func (*S3Reader) Stats ¶
func (s *S3Reader) Stats() S3ReaderStats
Stats returns S3 reader performance statistics
type S3ReaderError ¶
type S3ReaderError struct {
Op string // Operation that failed (e.g., "list_objects", "get_object", "read")
Err error // Underlying error
}
S3ReaderError provides structured error information for S3 reader operations
func (*S3ReaderError) Error ¶
func (e *S3ReaderError) Error() string
func (*S3ReaderError) Unwrap ¶
func (e *S3ReaderError) Unwrap() error
type S3ReaderOptions ¶
type S3ReaderOptions struct {
Bucket string // S3 bucket name
Prefix string // Key prefix filter
Suffix string // Key suffix filter (e.g., ".csv", ".json")
MaxKeys int32 // Maximum number of objects to list
Region string // AWS region
Profile string // AWS profile to use
Credentials aws.Credentials // Explicit credentials
EndpointURL string // Custom S3 endpoint (for S3-compatible services)
ForcePathStyle bool // Use path-style addressing
FilePattern string // Regex pattern for file matching
Recursive bool // Process subdirectories recursively
SortOrder SortOrder // Order to process files
BatchSize int // Number of files to process in parallel
BufferSize int64 // Buffer size for reading objects
Metadata map[string]string // Custom metadata filters
IncludeMetadata bool // Include S3 object metadata in records
}
S3ReaderOptions configures the S3 reader behavior
type S3ReaderStats ¶
type S3ReaderStats struct {
ObjectsListed int64 // Total objects discovered
ObjectsRead int64 // Total objects successfully read
RecordsRead int64 // Total records read across all objects
BytesRead int64 // Total bytes read
ReadDuration time.Duration // Total time spent reading
LastReadTime time.Time // Time of last read operation
ObjectErrors int64 // Number of objects that failed to read
CurrentObject string // Currently processing object
ProcessedFiles []string // List of successfully processed files
}
S3ReaderStats holds statistics about the S3 reader's performance