Documentation
¶
Index ¶
- Constants
- Variables
- func AddClientIPsToPool(clientInfo *pc.ClientProxyInfo)
- func BuildSignature(secret string, payload []byte) (string, string)
- func BuildWebhookUrl(ip, port string) string
- func GenerateSignature(secret string, payload []byte) string
- func GetAllIPs() []string
- func GetReachableIPs() []string
- func GetSignature(header map[string][]string) string
- func GetUnreachableIPs() []string
- func KafkaConsumerConfigComparator(oldDriver, newDriver driver.Driver) bool
- func KafkaConsumerConfigComparatorWithOptions(oldDriver, newDriver driver.Driver, options ...KafkaConfigCompareOptions) bool
- func KafkaProducerConfigComparator(oldDriver, newDriver driver.Driver) bool
- func KafkaProducerConfigComparatorWithOptions(oldDriver, newDriver driver.Driver, options ...KafkaConfigCompareOptions) bool
- func PollingConsumerConfigComparator(oldDriver, newDriver driver.Driver) bool
- func PollingProducerConfigComparator(oldDriver, newDriver driver.Driver) bool
- func RemoveClientIPsFromPool(clientInfo *pc.ClientProxyInfo)
- func StartIPPoolManager(ctx context.Context, config *IPConnectivityConfig) error
- func StopIPPoolManager() error
- func ValidateHMACSignature(signature string, body []byte, secret string) bool
- func WebhookConsumerConfigComparator(oldDriver, newDriver driver.Driver) bool
- func WebhookProducerConfigComparator(oldDriver, newDriver driver.Driver) bool
- type BlacklistConfigAssistance
- type ConfigCache
- func (c *ConfigCache) AddConfig(config interface{}) uint64
- func (c *ConfigCache) Clear()
- func (c *ConfigCache) GetCacheStats() map[string]interface{}
- func (c *ConfigCache) GetLatestConfig() (*ConfigVersion, bool)
- func (c *ConfigCache) GetLatestVersion() uint64
- func (c *ConfigCache) GetOldestCachedVersion() uint64
- func (c *ConfigCache) GetVersion(version uint64) (*ConfigVersion, bool)
- func (c *ConfigCache) GetVersionsSince(clientVersion uint64) []*ConfigVersion
- func (c *ConfigCache) HasNewerVersion(clientVersion uint64) bool
- type ConfigVersion
- type ErrorType
- type IPConnectivityConfig
- type IPPoolManager
- type IPStatus
- type KafkaConfigCompareOptions
- type KafkaConsumer
- func (k *KafkaConsumer) Close() error
- func (k *KafkaConsumer) GetDriverName() string
- func (k *KafkaConsumer) GetDriverType() driver.DriverType
- func (k *KafkaConsumer) RecreateFromConfig() (driver.Driver, error)
- func (k *KafkaConsumer) SetDriverMeta(name string)
- func (k *KafkaConsumer) SetFailureCallback(callback driver.DriverFailureCallback)
- func (k *KafkaConsumer) Start(ctx context.Context) error
- type KafkaConsumerConfig
- type KafkaConsumerValidator
- type KafkaProducer
- func (k *KafkaProducer) Close() error
- func (k *KafkaProducer) GetDriverName() string
- func (k *KafkaProducer) GetDriverType() driver.DriverType
- func (k *KafkaProducer) Notify(ctx context.Context, data interface{}) error
- func (k *KafkaProducer) RecreateFromConfig() (driver.Driver, error)
- func (k *KafkaProducer) SetDriverMeta(name string)
- func (k *KafkaProducer) SetFailureCallback(callback driver.DriverFailureCallback)
- func (k *KafkaProducer) Start(ctx context.Context) error
- type KafkaProducerConfig
- type KafkaProducerValidator
- type MessageHandler
- type PendingConnection
- type PollingClientSecurityConfig
- type PollingConsumer
- func (p *PollingConsumer) Close() error
- func (p *PollingConsumer) GetDriverName() string
- func (p *PollingConsumer) GetDriverType() driver.DriverType
- func (p *PollingConsumer) IsRunning() bool
- func (p *PollingConsumer) RecreateFromConfig() (driver.Driver, error)
- func (p *PollingConsumer) SetDriverMeta(name string)
- func (p *PollingConsumer) SetFailureCallback(callback driver.DriverFailureCallback)
- func (p *PollingConsumer) Start(ctx context.Context) error
- type PollingConsumerConfig
- type PollingConsumerValidator
- type PollingError
- type PollingMessageHandler
- type PollingProducer
- func (p *PollingProducer) Close() error
- func (p *PollingProducer) GetCacheStats() map[string]interface{}
- func (p *PollingProducer) GetConfig() *PollingProducerConfig
- func (p *PollingProducer) GetConnectionCount() int
- func (p *PollingProducer) GetDriverName() string
- func (p *PollingProducer) GetDriverType() driver.DriverType
- func (p *PollingProducer) IsRunning() bool
- func (p *PollingProducer) Notify(ctx context.Context, data interface{}) error
- func (p *PollingProducer) PushMessage(message interface{}) error
- func (p *PollingProducer) PushRawMessage(data []byte) error
- func (p *PollingProducer) RecreateFromConfig() (driver.Driver, error)
- func (p *PollingProducer) SetDriverMeta(name string)
- func (p *PollingProducer) SetFailureCallback(callback driver.DriverFailureCallback)
- func (p *PollingProducer) Start(ctx context.Context) error
- type PollingProducerConfig
- type PollingProducerValidator
- type PollingServerSecurityConfig
- type RetryConfig
- type SASLConfig
- type SecurityConfig
- type TLSConfig
- type WebhookConsumer
- func (w *WebhookConsumer) Close() error
- func (w *WebhookConsumer) GetDriverName() string
- func (w *WebhookConsumer) GetDriverType() driver.DriverType
- func (w *WebhookConsumer) RecreateFromConfig() (driver.Driver, error)
- func (w *WebhookConsumer) SetDriverMeta(name string)
- func (w *WebhookConsumer) SetFailureCallback(callback driver.DriverFailureCallback)
- func (w *WebhookConsumer) Start(ctx context.Context) error
- type WebhookConsumerConfig
- type WebhookConsumerValidator
- type WebhookMessageHandler
- type WebhookProducer
- func (w *WebhookProducer) Close() error
- func (w *WebhookProducer) GetClient() *http.Client
- func (w *WebhookProducer) GetDriverName() string
- func (w *WebhookProducer) GetDriverType() driver.DriverType
- func (w *WebhookProducer) Notify(ctx context.Context, data interface{}) error
- func (w *WebhookProducer) RecreateFromConfig() (driver.Driver, error)
- func (w *WebhookProducer) SetDriverMeta(name string)
- func (w *WebhookProducer) SetFailureCallback(callback driver.DriverFailureCallback)
- func (w *WebhookProducer) Start(ctx context.Context) error
- type WebhookProducerConfig
- type WebhookProducerValidator
- type WebhookSecurityConfig
Constants ¶
const DefaultPollingPath = "/switch/polling"
const DefaultPollingPort = "10002"
const DefaultProto = "http://"
const DefaultWebhookPort = "20002"
const KafkaConsumerDriverType driver.DriverType = "kafka_consumer"
const KafkaProducerDriverType driver.DriverType = "kafka_producer"
const PollingConsumerDriverType driver.DriverType = "polling_consumer"
const PollingProducerDriverType driver.DriverType = "polling_producer"
const WebhookConsumerDriverType driver.DriverType = "webhook_consumer"
const WebhookProducerDriverType driver.DriverType = "webhook_producer"
const WebhookReceivePoint = "/switch/webhook"
Variables ¶
var DefaultKafkaConfigCompareOptions = KafkaConfigCompareOptions{ StrictBrokerOrder: false, }
DefaultKafkaConfigCompareOptions 默认比较选项
var SupportedTypes = map[driver.DriverType]bool{ KafkaConsumerDriverType: true, KafkaProducerDriverType: true, WebhookConsumerDriverType: true, WebhookProducerDriverType: true, PollingConsumerDriverType: true, PollingProducerDriverType: true, }
Functions ¶
func AddClientIPsToPool ¶
func AddClientIPsToPool(clientInfo *pc.ClientProxyInfo)
AddClientIPsToPool 添加客户端IP到全局池
func BuildSignature ¶
BuildSignature 构建Signature
func BuildWebhookUrl ¶
BuildWebhookUrl 构建webhook地址(客户端使用配置中的端口避免端口冲突)
func GenerateSignature ¶
GenerateSignature 生成HMAC签名
func GetSignature ¶
GetSignature 获取请求头中的Signature
func KafkaConsumerConfigComparator ¶
KafkaConsumerConfigComparator Kafka Consumer 配置比较器
func KafkaConsumerConfigComparatorWithOptions ¶
func KafkaConsumerConfigComparatorWithOptions(oldDriver, newDriver driver.Driver, options ...KafkaConfigCompareOptions) bool
KafkaConsumerConfigComparatorWithOptions Kafka Consumer 配置比较器
func KafkaProducerConfigComparator ¶
KafkaProducerConfigComparator Kafka Producer 配置比较器
func KafkaProducerConfigComparatorWithOptions ¶
func KafkaProducerConfigComparatorWithOptions(oldDriver, newDriver driver.Driver, options ...KafkaConfigCompareOptions) bool
KafkaProducerConfigComparatorWithOptions Kafka Producer 配置比较器
func PollingConsumerConfigComparator ¶
PollingConsumerConfigComparator 长轮询配置比较器
func PollingProducerConfigComparator ¶
PollingProducerConfigComparator 长轮询生产者配置比较器
func RemoveClientIPsFromPool ¶
func RemoveClientIPsFromPool(clientInfo *pc.ClientProxyInfo)
RemoveClientIPsFromPool 从全局池中移除客户端IP
func StartIPPoolManager ¶
func StartIPPoolManager(ctx context.Context, config *IPConnectivityConfig) error
StartIPPoolManager 启动
func ValidateHMACSignature ¶
ValidateHMACSignature 使用HMAC校验签名
func WebhookConsumerConfigComparator ¶
WebhookConsumerConfigComparator Webhook Consumer 配置比较器
func WebhookProducerConfigComparator ¶
WebhookProducerConfigComparator Webhook Producer 配置比较器
Types ¶
type BlacklistConfigAssistance ¶
type BlacklistConfigAssistance struct {
}
func (*BlacklistConfigAssistance) HasBlacklistIPs ¶
func (b *BlacklistConfigAssistance) HasBlacklistIPs(blacklistIPs []string) bool
HasBlacklistIPs 检查是否有黑名单IP配置
func (*BlacklistConfigAssistance) IsIPBlacklisted ¶
func (b *BlacklistConfigAssistance) IsIPBlacklisted(ip string, blacklistIPs []string) bool
IsIPBlacklisted 检查IP是否在黑名单中
type ConfigCache ¶
type ConfigCache struct {
// contains filtered or unexported fields
}
ConfigCache 配置缓存管理器
func NewConfigCacheWithOptions ¶
func NewConfigCacheWithOptions(maxVersions int, maxAge time.Duration) *ConfigCache
NewConfigCacheWithOptions 自定义的配置
func (*ConfigCache) AddConfig ¶
func (c *ConfigCache) AddConfig(config interface{}) uint64
AddConfig 添加一个配置
func (*ConfigCache) GetCacheStats ¶
func (c *ConfigCache) GetCacheStats() map[string]interface{}
GetCacheStats 获取缓存统计信息
func (*ConfigCache) GetLatestConfig ¶
func (c *ConfigCache) GetLatestConfig() (*ConfigVersion, bool)
GetLatestConfig 获取最新配置
func (*ConfigCache) GetLatestVersion ¶
func (c *ConfigCache) GetLatestVersion() uint64
GetLatestVersion 获取最新版本号
func (*ConfigCache) GetOldestCachedVersion ¶
func (c *ConfigCache) GetOldestCachedVersion() uint64
GetOldestCachedVersion 获取最旧的缓存版本号
func (*ConfigCache) GetVersion ¶
func (c *ConfigCache) GetVersion(version uint64) (*ConfigVersion, bool)
GetVersion 获取指定版本的配置
func (*ConfigCache) GetVersionsSince ¶
func (c *ConfigCache) GetVersionsSince(clientVersion uint64) []*ConfigVersion
GetVersionsSince 获取指定版本之后的所有版本 客户端请求过来后。服务端响应客户端版本后的所有内容
func (*ConfigCache) HasNewerVersion ¶
func (c *ConfigCache) HasNewerVersion(clientVersion uint64) bool
HasNewerVersion 检查是否有比指定版本更新的配置
type ConfigVersion ¶
type ConfigVersion struct {
Version uint64 `json:"version"`
Config interface{} `json:"config"`
Timestamp time.Time `json:"timestamp"`
}
ConfigVersion 配置版本信息
type IPConnectivityConfig ¶
type IPConnectivityConfig struct {
CheckInterval time.Duration `yaml:"check_interval"` // 存量IP定时检查间隔
CheckTimeout time.Duration `yaml:"check_timeout"` // 单个IP检查超时时间
NewIPQueueSize int `yaml:"new_ip_queue_size"` // 新IP检查队列大小
}
IPConnectivityConfig IP连通性检查配置
type IPPoolManager ¶
type IPPoolManager struct {
// contains filtered or unexported fields
}
IPPoolManager 全局IP池 维护长连接回调中的内网公网IP 只针对webhook 驱动中的连接做检查,比IP检查更严苛
func (*IPPoolManager) AddIPs ¶
func (m *IPPoolManager) AddIPs(publicIPs, internalIPs []string) []string
AddIPs 添加IP到池中 新添加的IP都设置成不可达等待检查
func (*IPPoolManager) RemoveIPs ¶
func (m *IPPoolManager) RemoveIPs(publicIPs, internalIPs []string) int
RemoveIPs 从池中移除IP
type KafkaConfigCompareOptions ¶
type KafkaConfigCompareOptions struct {
// 是否对比broker的顺序
StrictBrokerOrder bool
}
KafkaConfigCompareOptions Kafka 配置比较选项
type KafkaConsumer ¶
type KafkaConsumer struct {
KafkaConsumerValidator
Reader *kafka.Reader
// contains filtered or unexported fields
}
KafkaConsumer 消费者配置
func NewKafkaConsumer ¶
func NewKafkaConsumer(c *KafkaConsumerConfig, handler MessageHandler) (*KafkaConsumer, error)
NewKafkaConsumer 创建一个消费者驱动
func (*KafkaConsumer) GetDriverName ¶
func (k *KafkaConsumer) GetDriverName() string
func (*KafkaConsumer) GetDriverType ¶
func (k *KafkaConsumer) GetDriverType() driver.DriverType
func (*KafkaConsumer) RecreateFromConfig ¶
func (k *KafkaConsumer) RecreateFromConfig() (driver.Driver, error)
func (*KafkaConsumer) SetDriverMeta ¶
func (k *KafkaConsumer) SetDriverMeta(name string)
func (*KafkaConsumer) SetFailureCallback ¶
func (k *KafkaConsumer) SetFailureCallback(callback driver.DriverFailureCallback)
type KafkaConsumerConfig ¶
type KafkaConsumerConfig struct {
GroupID string `yaml:"groupId" json:"groupId" mapstructure:"groupId"`
AutoOffsetReset string `yaml:"autoOffsetReset,omitempty" json:"autoOffsetReset,omitempty" mapstructure:"autoOffsetReset,omitempty"`
EnableAutoCommit bool `yaml:"enableAutoCommit,omitempty" json:"enableAutoCommit,omitempty" mapstructure:"enableAutoCommit,omitempty"`
AutoCommitInterval string `yaml:"autoCommitInterval,omitempty" json:"autoCommitInterval,omitempty" mapstructure:"autoCommitInterval,omitempty"`
// 连接和验证超时配置(这两项配置变更不需要做same config做新老驱动的替换)
ConnectTimeout string `yaml:"connectTimeout,omitempty" json:"connectTimeout,omitempty" mapstructure:"connectTimeout,omitempty"`
ValidateTimeout string `yaml:"validateTimeout,omitempty" json:"validateTimeout,omitempty" mapstructure:"validateTimeout,omitempty"`
// 读消息和提交偏移量超时配置
ReadTimeout string `yaml:"readTimeout,omitempty" json:"readTimeout,omitempty" mapstructure:"readTimeout,omitempty"` // 读消息超时
CommitTimeout string `yaml:"commitTimeout,omitempty" json:"commitTimeout,omitempty" mapstructure:"commitTimeout,omitempty"` // 提交偏移量超时
Brokers []string `yaml:"brokers" json:"brokers" mapstructure:"brokers"`
Topic string `yaml:"topic" json:"topic" mapstructure:"topic"`
Security *SecurityConfig `yaml:"security,omitempty" json:"security,omitempty" mapstructure:"security,omitempty"`
Retry *RetryConfig `yaml:"retry,omitempty" json:"retry,omitempty" mapstructure:"retry,omitempty"`
}
KafkaConsumerConfig consumer config
type KafkaConsumerValidator ¶
type KafkaConsumerValidator struct {
// contains filtered or unexported fields
}
KafkaConsumerValidator 1.消费者 是否具有对topic的读权限 2.消费者 是否可以加入消费者组(如果有组) 3.消费者 是否可以正确的提交偏移量
type KafkaProducer ¶
type KafkaProducer struct {
KafkaProducerValidator
Writer *kafka.Writer
// contains filtered or unexported fields
}
KafkaProducer 生产者配置
func NewKafkaProducer ¶
func NewKafkaProducer(c *KafkaProducerConfig) (*KafkaProducer, error)
NewKafkaProducer 创建一个生产者驱动
func (*KafkaProducer) GetDriverName ¶
func (k *KafkaProducer) GetDriverName() string
func (*KafkaProducer) GetDriverType ¶
func (k *KafkaProducer) GetDriverType() driver.DriverType
func (*KafkaProducer) Notify ¶
func (k *KafkaProducer) Notify(ctx context.Context, data interface{}) error
Notify 推送消息
func (*KafkaProducer) RecreateFromConfig ¶
func (k *KafkaProducer) RecreateFromConfig() (driver.Driver, error)
func (*KafkaProducer) SetDriverMeta ¶
func (k *KafkaProducer) SetDriverMeta(name string)
func (*KafkaProducer) SetFailureCallback ¶
func (k *KafkaProducer) SetFailureCallback(callback driver.DriverFailureCallback)
type KafkaProducerConfig ¶
type KafkaProducerConfig struct {
RequiredAcks string `yaml:"requiredAcks,omitempty" json:"requiredAcks,omitempty" mapstructure:"requiredAcks,omitempty"`
Timeout string `yaml:"timeout,omitempty" json:"timeout,omitempty" mapstructure:"timeout,omitempty"`
BatchTimeout string `yaml:"batchTimeout,omitempty" json:"batchTimeout,omitempty" mapstructure:"batchTimeout,omitempty"`
BatchBytes int64 `yaml:"batchBytes,omitempty" json:"batchBytes,omitempty" mapstructure:"batchBytes,omitempty"`
BatchSize int `yaml:"batchSize,omitempty" json:"batchSize,omitempty" mapstructure:"batchSize,omitempty"`
Retries int `yaml:"retries,omitempty" json:"retries,omitempty" mapstructure:"retries,omitempty"`
RetryBackoffMin string `yaml:"retryBackoffMin,omitempty" json:"retryBackoffMin,omitempty" mapstructure:"retryBackoffMin,omitempty"`
RetryBackoffMax string `yaml:"retryBackoffMax,omitempty" json:"retryBackoffMax,omitempty" mapstructure:"retryBackoffMax,omitempty"`
Compression string `yaml:"compression,omitempty" json:"compression,omitempty" mapstructure:"compression,omitempty"`
// 连接和验证超时配置(不需要做same config的比较替换)
ConnectTimeout string `yaml:"connectTimeout,omitempty" json:"connectTimeout,omitempty" mapstructure:"connectTimeout,omitempty"`
ValidateTimeout string `yaml:"validateTimeout,omitempty" json:"validateTimeout,omitempty" mapstructure:"validateTimeout,omitempty"`
Brokers []string `yaml:"brokers" json:"brokers" mapstructure:"brokers"`
Topic string `yaml:"topic" json:"topic" mapstructure:"topic"`
Security *SecurityConfig `yaml:"security,omitempty" json:"security,omitempty" mapstructure:"security,omitempty"`
}
KafkaProducerConfig producer config
type KafkaProducerValidator ¶
type KafkaProducerValidator struct {
// contains filtered or unexported fields
}
KafkaProducerValidator 1.生产者 是否具有对topic的写权限
type MessageHandler ¶
MessageHandler 消息处理函数
type PendingConnection ¶
type PendingConnection struct {
ResponseWriter http.ResponseWriter
Context context.Context
Cancel context.CancelFunc
ConnectedAt time.Time
ClientID string
// 客户端网络信息
PublicIPs []string
InternalIPs []string
}
PendingConnection 等待中的长轮询连接
type PollingClientSecurityConfig ¶
type PollingClientSecurityConfig struct {
// 客户端认证token
Token string `json:"token,omitempty" mapstructure:"token,omitempty" yaml:"token,omitempty"`
// HTTPS配置
InsecureSkipVerify bool `json:"insecure_skip_verify" mapstructure:"insecure_skip_verify" yaml:"insecure_skip_verify"`
}
PollingClientSecurityConfig 客户端安全配置
type PollingConsumer ¶
type PollingConsumer struct {
PollingConsumerValidator
// contains filtered or unexported fields
}
PollingConsumer 长轮询消费者
func NewPollingConsumer ¶
func NewPollingConsumer(c *PollingConsumerConfig, handler PollingMessageHandler) (*PollingConsumer, error)
NewPollingConsumer 创建长轮询消费者驱动
func (*PollingConsumer) GetDriverName ¶
func (p *PollingConsumer) GetDriverName() string
func (*PollingConsumer) GetDriverType ¶
func (p *PollingConsumer) GetDriverType() driver.DriverType
GetDriverType 获取驱动类型
func (*PollingConsumer) RecreateFromConfig ¶
func (p *PollingConsumer) RecreateFromConfig() (driver.Driver, error)
func (*PollingConsumer) SetDriverMeta ¶
func (p *PollingConsumer) SetDriverMeta(name string)
func (*PollingConsumer) SetFailureCallback ¶
func (p *PollingConsumer) SetFailureCallback(callback driver.DriverFailureCallback)
type PollingConsumerConfig ¶
type PollingConsumerConfig struct {
// 服务器URL配置(第一版admin跟server不分离)
URL string `json:"url,omitempty" mapstructure:"url,omitempty" yaml:"url,omitempty"`
// 轮询配置
PollInterval string `json:"poll_interval" mapstructure:"poll_interval" yaml:"poll_interval"` // 普通轮询间隔
RequestTimeout string `json:"request_timeout" mapstructure:"request_timeout" yaml:"request_timeout"` // HTTP请求超时时间,建议配置比服务端LongPollTimeout大
// HTTP配置
Headers map[string]string `json:"headers,omitempty" mapstructure:"headers,omitempty" yaml:"headers,omitempty"` // 自定义请求头
UserAgent string `json:"user_agent,omitempty" mapstructure:"user_agent,omitempty" yaml:"user_agent,omitempty"` // User-Agent
// 客户端安全配置
Security *PollingClientSecurityConfig `json:"security,omitempty" mapstructure:"security,omitempty" yaml:"security,omitempty"`
// 其他配置
IgnoreExceptions bool `json:"ignore_exceptions" mapstructure:"ignore_exceptions" yaml:"ignore_exceptions"` // 是否忽略异常
Retry *RetryConfig `json:"retry,omitempty" mapstructure:"retry,omitempty" yaml:"retry,omitempty"`
}
PollingConsumerConfig 长轮询配置
type PollingConsumerValidator ¶
type PollingConsumerValidator struct {
// contains filtered or unexported fields
}
PollingConsumerValidator 长轮询消费者驱动验证器 1.配置有效性验证 2.服务端的连通性验证
type PollingError ¶
PollingError 轮训过程中的错误信息
func (*PollingError) Error ¶
func (pe *PollingError) Error() string
func (*PollingError) Unwrap ¶
func (pe *PollingError) Unwrap() error
type PollingMessageHandler ¶
type PollingMessageHandler func(ctx context.Context, data json.RawMessage) error
type PollingProducer ¶
type PollingProducer struct {
PollingProducerValidator
// contains filtered or unexported fields
}
PollingProducer 长轮询生产者
func NewPollingProducer ¶
func NewPollingProducer(c *PollingProducerConfig) (*PollingProducer, error)
NewPollingProducer 创建长轮询生产者驱动 使用http + 长轮询的模式
func (*PollingProducer) GetCacheStats ¶
func (p *PollingProducer) GetCacheStats() map[string]interface{}
GetCacheStats 获取缓存统计信息
func (*PollingProducer) GetConfig ¶
func (p *PollingProducer) GetConfig() *PollingProducerConfig
GetConfig 获取配置
func (*PollingProducer) GetConnectionCount ¶
func (p *PollingProducer) GetConnectionCount() int
GetConnectionCount 获取当前等待连接数
func (*PollingProducer) GetDriverName ¶
func (p *PollingProducer) GetDriverName() string
func (*PollingProducer) GetDriverType ¶
func (p *PollingProducer) GetDriverType() driver.DriverType
func (*PollingProducer) Notify ¶
func (p *PollingProducer) Notify(ctx context.Context, data interface{}) error
func (*PollingProducer) PushMessage ¶
func (p *PollingProducer) PushMessage(message interface{}) error
PushMessage 推送消息给所有等待的长轮询连接
func (*PollingProducer) PushRawMessage ¶
func (p *PollingProducer) PushRawMessage(data []byte) error
PushRawMessage 推送原始消息给所有等待的长轮询连接
func (*PollingProducer) RecreateFromConfig ¶
func (p *PollingProducer) RecreateFromConfig() (driver.Driver, error)
func (*PollingProducer) SetDriverMeta ¶
func (p *PollingProducer) SetDriverMeta(name string)
func (*PollingProducer) SetFailureCallback ¶
func (p *PollingProducer) SetFailureCallback(callback driver.DriverFailureCallback)
type PollingProducerConfig ¶
type PollingProducerConfig struct {
// 服务器端口配置(polling的server端端口)
Port string `json:"port" mapstructure:"port" yaml:"port"`
// 轮询配置
LongPollTimeout string `json:"long_poll_timeout" mapstructure:"long_poll_timeout" yaml:"long_poll_timeout"` // 长轮询超时
// 服务器超时配置
ServerReadTimeout string `json:"server_read_timeout" mapstructure:"server_read_timeout" yaml:"server_read_timeout"` // 服务器读取超时
ServerWriteTimeout string `json:"server_write_timeout" mapstructure:"server_write_timeout" yaml:"server_write_timeout"` // 服务器写入超时
ServerIdleTimeout string `json:"server_idle_timeout" mapstructure:"server_idle_timeout" yaml:"server_idle_timeout"` // 服务器空闲超时
// 服务端安全配置
Security *PollingServerSecurityConfig `json:"security,omitempty" mapstructure:"security,omitempty" yaml:"security,omitempty"`
Retry *RetryConfig `json:"retry,omitempty" mapstructure:"retry,omitempty" yaml:"retry,omitempty"`
}
PollingProducerConfig 长轮询生产者配置
type PollingProducerValidator ¶
type PollingProducerValidator struct {
// contains filtered or unexported fields
}
PollingProducerValidator 长轮询生产者驱动验证器 1.配置有效性检查
type PollingServerSecurityConfig ¶
type PollingServerSecurityConfig struct {
// 服务端验证客户端token的配置
ValidTokens []string `json:"valid_tokens,omitempty" mapstructure:"valid_tokens,omitempty" yaml:"valid_tokens,omitempty"`
// HTTPS配置
CertFile string `json:"cert_file,omitempty" mapstructure:"cert_file,omitempty" yaml:"cert_file,omitempty"`
KeyFile string `json:"key_file,omitempty" mapstructure:"key_file,omitempty" yaml:"key_file,omitempty"`
}
PollingServerSecurityConfig 服务端安全配置
type RetryConfig ¶
type RetryConfig struct {
Count int `json:"count" mapstructure:"count" yaml:"count"`
Backoff string `json:"backoff" mapstructure:"backoff" yaml:"backoff"`
}
RetryConfig 定义重试配置
type SASLConfig ¶
type SASLConfig struct {
Enabled bool `yaml:"enabled" json:"enabled" mapstructure:"enabled"`
Mechanism string `yaml:"mechanism,omitempty" json:"mechanism,omitempty" mapstructure:"mechanism,omitempty"`
Username string `yaml:"username,omitempty" json:"username,omitempty" mapstructure:"username,omitempty"`
Password string `yaml:"password,omitempty" json:"password,omitempty" mapstructure:"password,omitempty"`
}
SASLConfig sasl配置
type SecurityConfig ¶
type SecurityConfig struct {
SASL *SASLConfig `yaml:"sasl,omitempty" json:"sasl,omitempty" mapstructure:"sasl,omitempty"`
TLS *TLSConfig `yaml:"tls,omitempty" json:"tls,omitempty" mapstructure:"tls,omitempty"`
}
SecurityConfig 安全配置
type TLSConfig ¶
type TLSConfig struct {
Enabled bool `yaml:"enabled" json:"enabled" mapstructure:"enabled"`
CaFile string `yaml:"caFile,omitempty" json:"caFile,omitempty" mapstructure:"caFile,omitempty"`
CertFile string `yaml:"certFile,omitempty" json:"certFile,omitempty" mapstructure:"certFile,omitempty"`
KeyFile string `yaml:"keyFile,omitempty" json:"keyFile,omitempty" mapstructure:"keyFile,omitempty"`
InsecureSkipVerify bool `yaml:"insecureSkipVerify" json:"insecureSkipVerify" mapstructure:"insecureSkipVerify"`
}
TLSConfig tls配置
type WebhookConsumer ¶
type WebhookConsumer struct {
WebhookConsumerValidator
// contains filtered or unexported fields
}
WebhookConsumer webhook消费者
func NewWebhookConsumer ¶
func NewWebhookConsumer(c *WebhookConsumerConfig, handler WebhookMessageHandler) (*WebhookConsumer, error)
NewWebhookConsumer 创建webhook消费者驱动
func (*WebhookConsumer) GetDriverName ¶
func (w *WebhookConsumer) GetDriverName() string
func (*WebhookConsumer) GetDriverType ¶
func (w *WebhookConsumer) GetDriverType() driver.DriverType
func (*WebhookConsumer) RecreateFromConfig ¶
func (w *WebhookConsumer) RecreateFromConfig() (driver.Driver, error)
func (*WebhookConsumer) SetDriverMeta ¶
func (w *WebhookConsumer) SetDriverMeta(name string)
func (*WebhookConsumer) SetFailureCallback ¶
func (w *WebhookConsumer) SetFailureCallback(callback driver.DriverFailureCallback)
type WebhookConsumerConfig ¶
type WebhookConsumerConfig struct {
BlacklistConfigAssistance
BlacklistIPs []string `json:"blacklistIPs,omitempty" mapstructure:"blacklistIPs,omitempty" yaml:"blacklistIPs,omitempty"`
// 客户端端口配置
Port string `json:"port" mapstructure:"port" yaml:"port"`
Retry *RetryConfig `json:"retry,omitempty" mapstructure:"retry,omitempty" yaml:"retry,omitempty"`
Security *WebhookSecurityConfig `json:"security,omitempty" mapstructure:"security,omitempty" yaml:"security,omitempty"`
}
WebhookConsumerConfig webhook consumer配置
type WebhookConsumerValidator ¶
type WebhookConsumerValidator struct {
// contains filtered or unexported fields
}
WebhookConsumerValidator Webhook驱动验证器 1.配置有效性的检查 2.黑名单检查
type WebhookMessageHandler ¶
type WebhookMessageHandler func(ctx context.Context, body json.RawMessage) error
WebhookMessageHandler webhook消息处理函数
type WebhookProducer ¶
type WebhookProducer struct {
WebhookProducerValidator
// contains filtered or unexported fields
}
WebhookProducer webhook-生产者驱动
func NewWebhookDriver ¶
func NewWebhookDriver(w *WebhookProducerConfig) (d *WebhookProducer, err error)
func (*WebhookProducer) Close ¶
func (w *WebhookProducer) Close() error
func (*WebhookProducer) GetClient ¶
func (w *WebhookProducer) GetClient() *http.Client
func (*WebhookProducer) GetDriverName ¶
func (w *WebhookProducer) GetDriverName() string
func (*WebhookProducer) GetDriverType ¶
func (w *WebhookProducer) GetDriverType() driver.DriverType
func (*WebhookProducer) Notify ¶
func (w *WebhookProducer) Notify(ctx context.Context, data interface{}) error
func (*WebhookProducer) RecreateFromConfig ¶
func (w *WebhookProducer) RecreateFromConfig() (driver.Driver, error)
func (*WebhookProducer) SetDriverMeta ¶
func (w *WebhookProducer) SetDriverMeta(name string)
func (*WebhookProducer) SetFailureCallback ¶
func (w *WebhookProducer) SetFailureCallback(callback driver.DriverFailureCallback)
type WebhookProducerConfig ¶
type WebhookProducerConfig struct {
BlacklistConfigAssistance
BlacklistIPs []string `json:"blacklistIPs,omitempty" mapstructure:"blacklistIPs,omitempty" yaml:"blacklistIPs,omitempty"`
// 客户端端口配置
Port string `json:"port" mapstructure:"port" yaml:"port"`
// 基础配置
IgnoreExceptions bool `json:"ignoreExceptions" mapstructure:"ignoreExceptions" yaml:"ignoreExceptions"`
TimeOut string `json:"timeOut" mapstructure:"timeOut" yaml:"timeOut"`
Retry *RetryConfig `json:"retry,omitempty" mapstructure:"retry,omitempty" yaml:"retry,omitempty"`
Security *WebhookSecurityConfig `json:"security,omitempty" mapstructure:"security,omitempty" yaml:"security,omitempty"`
}
WebhookProducerConfig webhook producer配置
type WebhookProducerValidator ¶
type WebhookProducerValidator struct {
// contains filtered or unexported fields
}
WebhookProducerValidator Webhook驱动验证器 1.配置有效性的检查
type WebhookSecurityConfig ¶
type WebhookSecurityConfig struct {
Secret string `json:"secret" mapstructure:"secret" yaml:"secret"`
}
WebhookSecurityConfig 安全配置
Source Files
¶
- config_cache.go
- define.go
- dynamic_ip_pool.go
- kafka.go
- kafka_auth.go
- kafka_consumer_driver.go
- kafka_consumer_validator.go
- kafka_producer_driver.go
- kafka_producer_validator.go
- kafka_same_config.go
- kafka_validator.go
- polling.go
- polling_consumer_driver.go
- polling_consumer_validator.go
- polling_producer_driver.go
- polling_producer_validator.go
- polling_same_config.go
- webhook.go
- webhook_consumer_driver.go
- webhook_consumer_validator.go
- webhook_producer_driver.go
- webhook_producer_validator.go
- webhook_same_config.go
- webhook_signature.go