drivers

package
v1.0.0-beta.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 25, 2025 License: MIT Imports: 30 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultPollingPath = "/switch/polling"
View Source
const DefaultPollingPort = "10002"
View Source
const DefaultProto = "http://"
View Source
const DefaultWebhookPort = "20002"
View Source
const KafkaConsumerDriverType driver.DriverType = "kafka_consumer"
View Source
const KafkaProducerDriverType driver.DriverType = "kafka_producer"
View Source
const PollingConsumerDriverType driver.DriverType = "polling_consumer"
View Source
const PollingProducerDriverType driver.DriverType = "polling_producer"
View Source
const WebhookConsumerDriverType driver.DriverType = "webhook_consumer"
View Source
const WebhookProducerDriverType driver.DriverType = "webhook_producer"
View Source
const WebhookReceivePoint = "/switch/webhook"

Variables

View Source
var DefaultKafkaConfigCompareOptions = KafkaConfigCompareOptions{
	StrictBrokerOrder: false,
}

DefaultKafkaConfigCompareOptions 默认比较选项

View Source
var SupportedTypes = map[driver.DriverType]bool{
	KafkaConsumerDriverType:   true,
	KafkaProducerDriverType:   true,
	WebhookConsumerDriverType: true,
	WebhookProducerDriverType: true,
	PollingConsumerDriverType: true,
	PollingProducerDriverType: true,
}

Functions

func AddClientIPsToPool

func AddClientIPsToPool(clientInfo *pc.ClientProxyInfo)

AddClientIPsToPool 添加客户端IP到全局池

func BuildSignature

func BuildSignature(secret string, payload []byte) (string, string)

BuildSignature 构建Signature

func BuildWebhookUrl

func BuildWebhookUrl(ip, port string) string

BuildWebhookUrl 构建webhook地址(客户端使用配置中的端口避免端口冲突)

func GenerateSignature

func GenerateSignature(secret string, payload []byte) string

GenerateSignature 生成HMAC签名

func GetAllIPs

func GetAllIPs() []string

GetAllIPs 获取所有IP列表

func GetReachableIPs

func GetReachableIPs() []string

GetReachableIPs 获取可达的IP列表

func GetSignature

func GetSignature(header map[string][]string) string

GetSignature 获取请求头中的Signature

func GetUnreachableIPs

func GetUnreachableIPs() []string

GetUnreachableIPs 获取不可达的IP列表

func KafkaConsumerConfigComparator

func KafkaConsumerConfigComparator(oldDriver, newDriver driver.Driver) bool

KafkaConsumerConfigComparator Kafka Consumer 配置比较器

func KafkaConsumerConfigComparatorWithOptions

func KafkaConsumerConfigComparatorWithOptions(oldDriver, newDriver driver.Driver, options ...KafkaConfigCompareOptions) bool

KafkaConsumerConfigComparatorWithOptions Kafka Consumer 配置比较器

func KafkaProducerConfigComparator

func KafkaProducerConfigComparator(oldDriver, newDriver driver.Driver) bool

KafkaProducerConfigComparator Kafka Producer 配置比较器

func KafkaProducerConfigComparatorWithOptions

func KafkaProducerConfigComparatorWithOptions(oldDriver, newDriver driver.Driver, options ...KafkaConfigCompareOptions) bool

KafkaProducerConfigComparatorWithOptions Kafka Producer 配置比较器

func PollingConsumerConfigComparator

func PollingConsumerConfigComparator(oldDriver, newDriver driver.Driver) bool

PollingConsumerConfigComparator 长轮询配置比较器

func PollingProducerConfigComparator

func PollingProducerConfigComparator(oldDriver, newDriver driver.Driver) bool

PollingProducerConfigComparator 长轮询生产者配置比较器

func RemoveClientIPsFromPool

func RemoveClientIPsFromPool(clientInfo *pc.ClientProxyInfo)

RemoveClientIPsFromPool 从全局池中移除客户端IP

func StartIPPoolManager

func StartIPPoolManager(ctx context.Context, config *IPConnectivityConfig) error

StartIPPoolManager 启动

func StopIPPoolManager

func StopIPPoolManager() error

StopIPPoolManager 停止

func ValidateHMACSignature

func ValidateHMACSignature(signature string, body []byte, secret string) bool

ValidateHMACSignature 使用HMAC校验签名

func WebhookConsumerConfigComparator

func WebhookConsumerConfigComparator(oldDriver, newDriver driver.Driver) bool

WebhookConsumerConfigComparator Webhook Consumer 配置比较器

func WebhookProducerConfigComparator

func WebhookProducerConfigComparator(oldDriver, newDriver driver.Driver) bool

WebhookProducerConfigComparator Webhook Producer 配置比较器

Types

type BlacklistConfigAssistance

type BlacklistConfigAssistance struct {
}

func (*BlacklistConfigAssistance) HasBlacklistIPs

func (b *BlacklistConfigAssistance) HasBlacklistIPs(blacklistIPs []string) bool

HasBlacklistIPs 检查是否有黑名单IP配置

func (*BlacklistConfigAssistance) IsIPBlacklisted

func (b *BlacklistConfigAssistance) IsIPBlacklisted(ip string, blacklistIPs []string) bool

IsIPBlacklisted 检查IP是否在黑名单中

type ConfigCache

type ConfigCache struct {
	// contains filtered or unexported fields
}

ConfigCache 配置缓存管理器

func NewConfigCache

func NewConfigCache() *ConfigCache

NewConfigCache 默认的配置

func NewConfigCacheWithOptions

func NewConfigCacheWithOptions(maxVersions int, maxAge time.Duration) *ConfigCache

NewConfigCacheWithOptions 自定义的配置

func (*ConfigCache) AddConfig

func (c *ConfigCache) AddConfig(config interface{}) uint64

AddConfig 添加一个配置

func (*ConfigCache) Clear

func (c *ConfigCache) Clear()

Clear 清空所有缓存 重置归0

func (*ConfigCache) GetCacheStats

func (c *ConfigCache) GetCacheStats() map[string]interface{}

GetCacheStats 获取缓存统计信息

func (*ConfigCache) GetLatestConfig

func (c *ConfigCache) GetLatestConfig() (*ConfigVersion, bool)

GetLatestConfig 获取最新配置

func (*ConfigCache) GetLatestVersion

func (c *ConfigCache) GetLatestVersion() uint64

GetLatestVersion 获取最新版本号

func (*ConfigCache) GetOldestCachedVersion

func (c *ConfigCache) GetOldestCachedVersion() uint64

GetOldestCachedVersion 获取最旧的缓存版本号

func (*ConfigCache) GetVersion

func (c *ConfigCache) GetVersion(version uint64) (*ConfigVersion, bool)

GetVersion 获取指定版本的配置

func (*ConfigCache) GetVersionsSince

func (c *ConfigCache) GetVersionsSince(clientVersion uint64) []*ConfigVersion

GetVersionsSince 获取指定版本之后的所有版本 客户端请求过来后。服务端响应客户端版本后的所有内容

func (*ConfigCache) HasNewerVersion

func (c *ConfigCache) HasNewerVersion(clientVersion uint64) bool

HasNewerVersion 检查是否有比指定版本更新的配置

type ConfigVersion

type ConfigVersion struct {
	Version   uint64      `json:"version"`
	Config    interface{} `json:"config"`
	Timestamp time.Time   `json:"timestamp"`
}

ConfigVersion 配置版本信息

type ErrorType

type ErrorType int

ErrorType 通信的错误类型

const (
	TimeoutError ErrorType = iota // 超时错误 - 正常的长轮询行为不属于错误
	NeedRetry                     // 需要重试的错误 - 不详细区分到底是什么原因了总之要重试
)

type IPConnectivityConfig

type IPConnectivityConfig struct {
	CheckInterval  time.Duration `yaml:"check_interval"`    // 存量IP定时检查间隔
	CheckTimeout   time.Duration `yaml:"check_timeout"`     // 单个IP检查超时时间
	NewIPQueueSize int           `yaml:"new_ip_queue_size"` // 新IP检查队列大小
}

IPConnectivityConfig IP连通性检查配置

type IPPoolManager

type IPPoolManager struct {
	// contains filtered or unexported fields
}

IPPoolManager 全局IP池 维护长连接回调中的内网公网IP 只针对webhook 驱动中的连接做检查,比IP检查更严苛

func GetIPPoolManager

func GetIPPoolManager() *IPPoolManager

GetIPPoolManager 获取全局IP池管理器

func (*IPPoolManager) AddIPs

func (m *IPPoolManager) AddIPs(publicIPs, internalIPs []string) []string

AddIPs 添加IP到池中 新添加的IP都设置成不可达等待检查

func (*IPPoolManager) RemoveIPs

func (m *IPPoolManager) RemoveIPs(publicIPs, internalIPs []string) int

RemoveIPs 从池中移除IP

type IPStatus

type IPStatus struct {
	IP string
	// 是否可达
	IsReachable bool
	// 最近一次的检查时间
	LastChecked time.Time
}

IPStatus IP状态信息

type KafkaConfigCompareOptions

type KafkaConfigCompareOptions struct {
	// 是否对比broker的顺序
	StrictBrokerOrder bool
}

KafkaConfigCompareOptions Kafka 配置比较选项

type KafkaConsumer

type KafkaConsumer struct {
	KafkaConsumerValidator
	Reader *kafka.Reader
	// contains filtered or unexported fields
}

KafkaConsumer 消费者配置

func NewKafkaConsumer

func NewKafkaConsumer(c *KafkaConsumerConfig, handler MessageHandler) (*KafkaConsumer, error)

NewKafkaConsumer 创建一个消费者驱动

func (*KafkaConsumer) Close

func (k *KafkaConsumer) Close() error

Close 关闭一个消费者

func (*KafkaConsumer) GetDriverName

func (k *KafkaConsumer) GetDriverName() string

func (*KafkaConsumer) GetDriverType

func (k *KafkaConsumer) GetDriverType() driver.DriverType

func (*KafkaConsumer) RecreateFromConfig

func (k *KafkaConsumer) RecreateFromConfig() (driver.Driver, error)

func (*KafkaConsumer) SetDriverMeta

func (k *KafkaConsumer) SetDriverMeta(name string)

func (*KafkaConsumer) SetFailureCallback

func (k *KafkaConsumer) SetFailureCallback(callback driver.DriverFailureCallback)

func (*KafkaConsumer) Start

func (k *KafkaConsumer) Start(ctx context.Context) error

Start 开始消费

type KafkaConsumerConfig

type KafkaConsumerConfig struct {
	GroupID            string `yaml:"groupId" json:"groupId" mapstructure:"groupId"`
	AutoOffsetReset    string `yaml:"autoOffsetReset,omitempty" json:"autoOffsetReset,omitempty" mapstructure:"autoOffsetReset,omitempty"`
	EnableAutoCommit   bool   `yaml:"enableAutoCommit,omitempty" json:"enableAutoCommit,omitempty" mapstructure:"enableAutoCommit,omitempty"`
	AutoCommitInterval string `yaml:"autoCommitInterval,omitempty" json:"autoCommitInterval,omitempty" mapstructure:"autoCommitInterval,omitempty"`

	// 连接和验证超时配置(这两项配置变更不需要做same config做新老驱动的替换)
	ConnectTimeout  string `yaml:"connectTimeout,omitempty" json:"connectTimeout,omitempty" mapstructure:"connectTimeout,omitempty"`
	ValidateTimeout string `yaml:"validateTimeout,omitempty" json:"validateTimeout,omitempty" mapstructure:"validateTimeout,omitempty"`

	// 读消息和提交偏移量超时配置
	ReadTimeout   string `yaml:"readTimeout,omitempty" json:"readTimeout,omitempty" mapstructure:"readTimeout,omitempty"`       // 读消息超时
	CommitTimeout string `yaml:"commitTimeout,omitempty" json:"commitTimeout,omitempty" mapstructure:"commitTimeout,omitempty"` // 提交偏移量超时

	Brokers  []string        `yaml:"brokers" json:"brokers" mapstructure:"brokers"`
	Topic    string          `yaml:"topic" json:"topic" mapstructure:"topic"`
	Security *SecurityConfig `yaml:"security,omitempty" json:"security,omitempty" mapstructure:"security,omitempty"`
	Retry    *RetryConfig    `yaml:"retry,omitempty" json:"retry,omitempty" mapstructure:"retry,omitempty"`
}

KafkaConsumerConfig consumer config

type KafkaConsumerValidator

type KafkaConsumerValidator struct {
	// contains filtered or unexported fields
}

KafkaConsumerValidator 1.消费者 是否具有对topic的读权限 2.消费者 是否可以加入消费者组(如果有组) 3.消费者 是否可以正确的提交偏移量

func (*KafkaConsumerValidator) Validate

func (k *KafkaConsumerValidator) Validate(driver driver.Driver) error

Validate 验证Kafka消费者驱动

type KafkaProducer

type KafkaProducer struct {
	KafkaProducerValidator
	Writer *kafka.Writer
	// contains filtered or unexported fields
}

KafkaProducer 生产者配置

func NewKafkaProducer

func NewKafkaProducer(c *KafkaProducerConfig) (*KafkaProducer, error)

NewKafkaProducer 创建一个生产者驱动

func (*KafkaProducer) Close

func (k *KafkaProducer) Close() error

Close 关闭生产者

func (*KafkaProducer) GetDriverName

func (k *KafkaProducer) GetDriverName() string

func (*KafkaProducer) GetDriverType

func (k *KafkaProducer) GetDriverType() driver.DriverType

func (*KafkaProducer) Notify

func (k *KafkaProducer) Notify(ctx context.Context, data interface{}) error

Notify 推送消息

func (*KafkaProducer) RecreateFromConfig

func (k *KafkaProducer) RecreateFromConfig() (driver.Driver, error)

func (*KafkaProducer) SetDriverMeta

func (k *KafkaProducer) SetDriverMeta(name string)

func (*KafkaProducer) SetFailureCallback

func (k *KafkaProducer) SetFailureCallback(callback driver.DriverFailureCallback)

func (*KafkaProducer) Start

func (k *KafkaProducer) Start(ctx context.Context) error

Start 启动 Kafka Producer 驱动

type KafkaProducerConfig

type KafkaProducerConfig struct {
	RequiredAcks    string `yaml:"requiredAcks,omitempty" json:"requiredAcks,omitempty" mapstructure:"requiredAcks,omitempty"`
	Timeout         string `yaml:"timeout,omitempty" json:"timeout,omitempty" mapstructure:"timeout,omitempty"`
	BatchTimeout    string `yaml:"batchTimeout,omitempty" json:"batchTimeout,omitempty" mapstructure:"batchTimeout,omitempty"`
	BatchBytes      int64  `yaml:"batchBytes,omitempty" json:"batchBytes,omitempty" mapstructure:"batchBytes,omitempty"`
	BatchSize       int    `yaml:"batchSize,omitempty" json:"batchSize,omitempty" mapstructure:"batchSize,omitempty"`
	Retries         int    `yaml:"retries,omitempty" json:"retries,omitempty" mapstructure:"retries,omitempty"`
	RetryBackoffMin string `yaml:"retryBackoffMin,omitempty" json:"retryBackoffMin,omitempty" mapstructure:"retryBackoffMin,omitempty"`
	RetryBackoffMax string `yaml:"retryBackoffMax,omitempty" json:"retryBackoffMax,omitempty" mapstructure:"retryBackoffMax,omitempty"`
	Compression     string `yaml:"compression,omitempty" json:"compression,omitempty" mapstructure:"compression,omitempty"`

	// 连接和验证超时配置(不需要做same config的比较替换)
	ConnectTimeout  string `yaml:"connectTimeout,omitempty" json:"connectTimeout,omitempty" mapstructure:"connectTimeout,omitempty"`
	ValidateTimeout string `yaml:"validateTimeout,omitempty" json:"validateTimeout,omitempty" mapstructure:"validateTimeout,omitempty"`

	Brokers  []string        `yaml:"brokers" json:"brokers" mapstructure:"brokers"`
	Topic    string          `yaml:"topic" json:"topic" mapstructure:"topic"`
	Security *SecurityConfig `yaml:"security,omitempty" json:"security,omitempty" mapstructure:"security,omitempty"`
}

KafkaProducerConfig producer config

type KafkaProducerValidator

type KafkaProducerValidator struct {
	// contains filtered or unexported fields
}

KafkaProducerValidator 1.生产者 是否具有对topic的写权限

func (*KafkaProducerValidator) Validate

func (v *KafkaProducerValidator) Validate(driver driver.Driver) error

Validate kafka的可用性校验 1.能否连接到broker 2.配置的topic是否存在 3.能否向topic写消息

type MessageHandler

type MessageHandler func(ctx context.Context, msg *kafka.Message) error

MessageHandler 消息处理函数

type PendingConnection

type PendingConnection struct {
	ResponseWriter http.ResponseWriter
	Context        context.Context
	Cancel         context.CancelFunc
	ConnectedAt    time.Time
	ClientID       string

	// 客户端网络信息
	PublicIPs   []string
	InternalIPs []string
}

PendingConnection 等待中的长轮询连接

type PollingClientSecurityConfig

type PollingClientSecurityConfig struct {
	// 客户端认证token
	Token string `json:"token,omitempty" mapstructure:"token,omitempty" yaml:"token,omitempty"`

	// HTTPS配置
	InsecureSkipVerify bool `json:"insecure_skip_verify" mapstructure:"insecure_skip_verify" yaml:"insecure_skip_verify"`
}

PollingClientSecurityConfig 客户端安全配置

type PollingConsumer

type PollingConsumer struct {
	PollingConsumerValidator
	// contains filtered or unexported fields
}

PollingConsumer 长轮询消费者

func NewPollingConsumer

func NewPollingConsumer(c *PollingConsumerConfig, handler PollingMessageHandler) (*PollingConsumer, error)

NewPollingConsumer 创建长轮询消费者驱动

func (*PollingConsumer) Close

func (p *PollingConsumer) Close() error

Close 关闭长轮询消费者

func (*PollingConsumer) GetDriverName

func (p *PollingConsumer) GetDriverName() string

func (*PollingConsumer) GetDriverType

func (p *PollingConsumer) GetDriverType() driver.DriverType

GetDriverType 获取驱动类型

func (*PollingConsumer) IsRunning

func (p *PollingConsumer) IsRunning() bool

IsRunning 检查是否正在运行

func (*PollingConsumer) RecreateFromConfig

func (p *PollingConsumer) RecreateFromConfig() (driver.Driver, error)

func (*PollingConsumer) SetDriverMeta

func (p *PollingConsumer) SetDriverMeta(name string)

func (*PollingConsumer) SetFailureCallback

func (p *PollingConsumer) SetFailureCallback(callback driver.DriverFailureCallback)

func (*PollingConsumer) Start

func (p *PollingConsumer) Start(ctx context.Context) error

Start 启动长轮询消费者

type PollingConsumerConfig

type PollingConsumerConfig struct {
	// 服务器URL配置(第一版admin跟server不分离)
	URL string `json:"url,omitempty" mapstructure:"url,omitempty" yaml:"url,omitempty"`

	// 轮询配置
	PollInterval   string `json:"poll_interval" mapstructure:"poll_interval" yaml:"poll_interval"`       // 普通轮询间隔
	RequestTimeout string `json:"request_timeout" mapstructure:"request_timeout" yaml:"request_timeout"` // HTTP请求超时时间,建议配置比服务端LongPollTimeout大

	// HTTP配置
	Headers   map[string]string `json:"headers,omitempty" mapstructure:"headers,omitempty" yaml:"headers,omitempty"`          // 自定义请求头
	UserAgent string            `json:"user_agent,omitempty" mapstructure:"user_agent,omitempty" yaml:"user_agent,omitempty"` // User-Agent

	// 客户端安全配置
	Security *PollingClientSecurityConfig `json:"security,omitempty" mapstructure:"security,omitempty" yaml:"security,omitempty"`

	// 其他配置
	IgnoreExceptions bool `json:"ignore_exceptions" mapstructure:"ignore_exceptions" yaml:"ignore_exceptions"` // 是否忽略异常

	Retry *RetryConfig `json:"retry,omitempty" mapstructure:"retry,omitempty" yaml:"retry,omitempty"`
}

PollingConsumerConfig 长轮询配置

type PollingConsumerValidator

type PollingConsumerValidator struct {
	// contains filtered or unexported fields
}

PollingConsumerValidator 长轮询消费者驱动验证器 1.配置有效性验证 2.服务端的连通性验证

func (*PollingConsumerValidator) Validate

func (p *PollingConsumerValidator) Validate(driver driver.Driver) error

Validate 验证长轮询消费者驱动

type PollingError

type PollingError struct {
	Type ErrorType
	Err  error
}

PollingError 轮训过程中的错误信息

func (*PollingError) Error

func (pe *PollingError) Error() string

func (*PollingError) Unwrap

func (pe *PollingError) Unwrap() error

type PollingMessageHandler

type PollingMessageHandler func(ctx context.Context, data json.RawMessage) error

type PollingProducer

type PollingProducer struct {
	PollingProducerValidator
	// contains filtered or unexported fields
}

PollingProducer 长轮询生产者

func NewPollingProducer

func NewPollingProducer(c *PollingProducerConfig) (*PollingProducer, error)

NewPollingProducer 创建长轮询生产者驱动 使用http + 长轮询的模式

func (*PollingProducer) Close

func (p *PollingProducer) Close() error

Close 关闭生产者

func (*PollingProducer) GetCacheStats

func (p *PollingProducer) GetCacheStats() map[string]interface{}

GetCacheStats 获取缓存统计信息

func (*PollingProducer) GetConfig

func (p *PollingProducer) GetConfig() *PollingProducerConfig

GetConfig 获取配置

func (*PollingProducer) GetConnectionCount

func (p *PollingProducer) GetConnectionCount() int

GetConnectionCount 获取当前等待连接数

func (*PollingProducer) GetDriverName

func (p *PollingProducer) GetDriverName() string

func (*PollingProducer) GetDriverType

func (p *PollingProducer) GetDriverType() driver.DriverType

func (*PollingProducer) IsRunning

func (p *PollingProducer) IsRunning() bool

IsRunning 检查是否正在运行

func (*PollingProducer) Notify

func (p *PollingProducer) Notify(ctx context.Context, data interface{}) error

func (*PollingProducer) PushMessage

func (p *PollingProducer) PushMessage(message interface{}) error

PushMessage 推送消息给所有等待的长轮询连接

func (*PollingProducer) PushRawMessage

func (p *PollingProducer) PushRawMessage(data []byte) error

PushRawMessage 推送原始消息给所有等待的长轮询连接

func (*PollingProducer) RecreateFromConfig

func (p *PollingProducer) RecreateFromConfig() (driver.Driver, error)

func (*PollingProducer) SetDriverMeta

func (p *PollingProducer) SetDriverMeta(name string)

func (*PollingProducer) SetFailureCallback

func (p *PollingProducer) SetFailureCallback(callback driver.DriverFailureCallback)

func (*PollingProducer) Start

func (p *PollingProducer) Start(ctx context.Context) error

Start 启动长轮询生产者服务器

type PollingProducerConfig

type PollingProducerConfig struct {
	// 服务器端口配置(polling的server端端口)
	Port string `json:"port" mapstructure:"port" yaml:"port"`

	// 轮询配置
	LongPollTimeout string `json:"long_poll_timeout" mapstructure:"long_poll_timeout" yaml:"long_poll_timeout"` // 长轮询超时

	// 服务器超时配置
	ServerReadTimeout  string `json:"server_read_timeout" mapstructure:"server_read_timeout" yaml:"server_read_timeout"`    // 服务器读取超时
	ServerWriteTimeout string `json:"server_write_timeout" mapstructure:"server_write_timeout" yaml:"server_write_timeout"` // 服务器写入超时
	ServerIdleTimeout  string `json:"server_idle_timeout" mapstructure:"server_idle_timeout" yaml:"server_idle_timeout"`    // 服务器空闲超时

	// 服务端安全配置
	Security *PollingServerSecurityConfig `json:"security,omitempty" mapstructure:"security,omitempty" yaml:"security,omitempty"`

	Retry *RetryConfig `json:"retry,omitempty" mapstructure:"retry,omitempty" yaml:"retry,omitempty"`
}

PollingProducerConfig 长轮询生产者配置

type PollingProducerValidator

type PollingProducerValidator struct {
	// contains filtered or unexported fields
}

PollingProducerValidator 长轮询生产者驱动验证器 1.配置有效性检查

func (*PollingProducerValidator) Validate

func (p *PollingProducerValidator) Validate(driver driver.Driver) error

Validate 验证长轮询生产者驱动

type PollingServerSecurityConfig

type PollingServerSecurityConfig struct {
	// 服务端验证客户端token的配置
	ValidTokens []string `json:"valid_tokens,omitempty" mapstructure:"valid_tokens,omitempty" yaml:"valid_tokens,omitempty"`

	// HTTPS配置
	CertFile string `json:"cert_file,omitempty" mapstructure:"cert_file,omitempty" yaml:"cert_file,omitempty"`
	KeyFile  string `json:"key_file,omitempty" mapstructure:"key_file,omitempty" yaml:"key_file,omitempty"`
}

PollingServerSecurityConfig 服务端安全配置

type RetryConfig

type RetryConfig struct {
	Count   int    `json:"count" mapstructure:"count" yaml:"count"`
	Backoff string `json:"backoff" mapstructure:"backoff" yaml:"backoff"`
}

RetryConfig 定义重试配置

type SASLConfig

type SASLConfig struct {
	Enabled   bool   `yaml:"enabled" json:"enabled" mapstructure:"enabled"`
	Mechanism string `yaml:"mechanism,omitempty" json:"mechanism,omitempty" mapstructure:"mechanism,omitempty"`
	Username  string `yaml:"username,omitempty" json:"username,omitempty" mapstructure:"username,omitempty"`
	Password  string `yaml:"password,omitempty" json:"password,omitempty" mapstructure:"password,omitempty"`
}

SASLConfig sasl配置

type SecurityConfig

type SecurityConfig struct {
	SASL *SASLConfig `yaml:"sasl,omitempty" json:"sasl,omitempty" mapstructure:"sasl,omitempty"`
	TLS  *TLSConfig  `yaml:"tls,omitempty" json:"tls,omitempty" mapstructure:"tls,omitempty"`
}

SecurityConfig 安全配置

type TLSConfig

type TLSConfig struct {
	Enabled            bool   `yaml:"enabled" json:"enabled" mapstructure:"enabled"`
	CaFile             string `yaml:"caFile,omitempty" json:"caFile,omitempty" mapstructure:"caFile,omitempty"`
	CertFile           string `yaml:"certFile,omitempty" json:"certFile,omitempty" mapstructure:"certFile,omitempty"`
	KeyFile            string `yaml:"keyFile,omitempty" json:"keyFile,omitempty" mapstructure:"keyFile,omitempty"`
	InsecureSkipVerify bool   `yaml:"insecureSkipVerify" json:"insecureSkipVerify" mapstructure:"insecureSkipVerify"`
}

TLSConfig tls配置

type WebhookConsumer

type WebhookConsumer struct {
	WebhookConsumerValidator
	// contains filtered or unexported fields
}

WebhookConsumer webhook消费者

func NewWebhookConsumer

func NewWebhookConsumer(c *WebhookConsumerConfig, handler WebhookMessageHandler) (*WebhookConsumer, error)

NewWebhookConsumer 创建webhook消费者驱动

func (*WebhookConsumer) Close

func (w *WebhookConsumer) Close() error

Close 关闭webhook消费者

func (*WebhookConsumer) GetDriverName

func (w *WebhookConsumer) GetDriverName() string

func (*WebhookConsumer) GetDriverType

func (w *WebhookConsumer) GetDriverType() driver.DriverType

func (*WebhookConsumer) RecreateFromConfig

func (w *WebhookConsumer) RecreateFromConfig() (driver.Driver, error)

func (*WebhookConsumer) SetDriverMeta

func (w *WebhookConsumer) SetDriverMeta(name string)

func (*WebhookConsumer) SetFailureCallback

func (w *WebhookConsumer) SetFailureCallback(callback driver.DriverFailureCallback)

func (*WebhookConsumer) Start

func (w *WebhookConsumer) Start(ctx context.Context) error

Start 启动webhook消费者

type WebhookConsumerConfig

type WebhookConsumerConfig struct {
	BlacklistConfigAssistance
	BlacklistIPs []string `json:"blacklistIPs,omitempty" mapstructure:"blacklistIPs,omitempty" yaml:"blacklistIPs,omitempty"`

	// 客户端端口配置
	Port     string                 `json:"port" mapstructure:"port" yaml:"port"`
	Retry    *RetryConfig           `json:"retry,omitempty" mapstructure:"retry,omitempty" yaml:"retry,omitempty"`
	Security *WebhookSecurityConfig `json:"security,omitempty" mapstructure:"security,omitempty" yaml:"security,omitempty"`
}

WebhookConsumerConfig webhook consumer配置

type WebhookConsumerValidator

type WebhookConsumerValidator struct {
	// contains filtered or unexported fields
}

WebhookConsumerValidator Webhook驱动验证器 1.配置有效性的检查 2.黑名单检查

func (*WebhookConsumerValidator) Validate

func (w *WebhookConsumerValidator) Validate(driver driver.Driver) error

type WebhookMessageHandler

type WebhookMessageHandler func(ctx context.Context, body json.RawMessage) error

WebhookMessageHandler webhook消息处理函数

type WebhookProducer

type WebhookProducer struct {
	WebhookProducerValidator
	// contains filtered or unexported fields
}

WebhookProducer webhook-生产者驱动

func NewWebhookDriver

func NewWebhookDriver(w *WebhookProducerConfig) (d *WebhookProducer, err error)

func (*WebhookProducer) Close

func (w *WebhookProducer) Close() error

func (*WebhookProducer) GetClient

func (w *WebhookProducer) GetClient() *http.Client

func (*WebhookProducer) GetDriverName

func (w *WebhookProducer) GetDriverName() string

func (*WebhookProducer) GetDriverType

func (w *WebhookProducer) GetDriverType() driver.DriverType

func (*WebhookProducer) Notify

func (w *WebhookProducer) Notify(ctx context.Context, data interface{}) error

func (*WebhookProducer) RecreateFromConfig

func (w *WebhookProducer) RecreateFromConfig() (driver.Driver, error)

func (*WebhookProducer) SetDriverMeta

func (w *WebhookProducer) SetDriverMeta(name string)

func (*WebhookProducer) SetFailureCallback

func (w *WebhookProducer) SetFailureCallback(callback driver.DriverFailureCallback)

func (*WebhookProducer) Start

func (w *WebhookProducer) Start(ctx context.Context) error

type WebhookProducerConfig

type WebhookProducerConfig struct {
	BlacklistConfigAssistance
	BlacklistIPs []string `json:"blacklistIPs,omitempty" mapstructure:"blacklistIPs,omitempty" yaml:"blacklistIPs,omitempty"`
	// 客户端端口配置
	Port string `json:"port" mapstructure:"port" yaml:"port"`
	// 基础配置
	IgnoreExceptions bool                   `json:"ignoreExceptions" mapstructure:"ignoreExceptions" yaml:"ignoreExceptions"`
	TimeOut          string                 `json:"timeOut" mapstructure:"timeOut" yaml:"timeOut"`
	Retry            *RetryConfig           `json:"retry,omitempty" mapstructure:"retry,omitempty" yaml:"retry,omitempty"`
	Security         *WebhookSecurityConfig `json:"security,omitempty" mapstructure:"security,omitempty" yaml:"security,omitempty"`
}

WebhookProducerConfig webhook producer配置

type WebhookProducerValidator

type WebhookProducerValidator struct {
	// contains filtered or unexported fields
}

WebhookProducerValidator Webhook驱动验证器 1.配置有效性的检查

func (*WebhookProducerValidator) Validate

func (w *WebhookProducerValidator) Validate(driver driver.Driver) error

type WebhookSecurityConfig

type WebhookSecurityConfig struct {
	Secret string `json:"secret" mapstructure:"secret" yaml:"secret"`
}

WebhookSecurityConfig 安全配置

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL