Documentation
¶
Index ¶
- Constants
- Variables
- type BasePool
- type MessageAttribute
- type NewFuncCallBack
- type Option
- func WithAckedHandler(handler on_acked_handler) Option
- func WithCheckHandler(handler on_check_handler) Option
- func WithConfig(config *RabbitmqConfig) Option
- func WithExceptHandler(handler on_exception_handler_t) Option
- func WithInitLog(outlogger RabbitmqLogger) Option
- func WithMessageHandler(handler on_message_handler) Option
- func WithRetryCnts(cnts int) Option
- func WithReturnHandler(handler on_return_handler) Option
- type PoolOption
- type RabbitMqClient
- type RabbitMqClientManager
- func (m *RabbitMqClientManager) CheckExceptionLoop()
- func (m *RabbitMqClientManager) Connect() error
- func (m *RabbitMqClientManager) GetClient() (*RabbitMqClient, error)
- func (m *RabbitMqClientManager) InitNotifyChannel() (bool, chan *amqp091.Error)
- func (m *RabbitMqClientManager) PushClient(client *RabbitMqClient) error
- type RabbitmqConfig
- type RabbitmqLogger
Constants ¶
View Source
const ( TextType = "text/plain" JsonType = "application/json" ProtobufType = "application/x-protobuf" //重连次数,默认3次 DefaultRetryCounts = 3 )
Variables ¶
View Source
var ( ErrDisConnected = errors.New("dial rabbitmq failed") ErrNotSetLogger = errors.New("not set logger") )
View Source
var ( DefaultObjCnts = 100 ErrNilObj = errors.New("pool obj is nil") )
Functions ¶
This section is empty.
Types ¶
type BasePool ¶
type BasePool[T any] struct { Pool *sync.Pool //用于创建新对象的函数 NewFunc func() *T Mtx sync.Mutex Cap int }
池的泛型实现
func NewPool ¶
func NewPool[T any](opts ...PoolOption[T]) *BasePool[T]
创建一个新的泛型连接池 newFunc 参数是用于创建新对象的函数
type MessageAttribute ¶
type MessageAttribute struct {
//消息要投入的交换机名称
ExchangeName string
//路由键
RouteKey string
//消息ID
MsgID string
//消息类型
MsgType string
//载荷
Payload []byte
//创建时间
CreateAt time.Time
}
消息的属性
type NewFuncCallBack ¶
type NewFuncCallBack[T any] = func() *T
type Option ¶
type Option = func(r *RabbitMqClientManager)
func WithAckedHandler ¶
func WithAckedHandler(handler on_acked_handler) Option
func WithCheckHandler ¶
func WithCheckHandler(handler on_check_handler) Option
func WithConfig ¶
func WithConfig(config *RabbitmqConfig) Option
func WithExceptHandler ¶
func WithExceptHandler(handler on_exception_handler_t) Option
func WithInitLog ¶
func WithInitLog(outlogger RabbitmqLogger) Option
func WithMessageHandler ¶
func WithMessageHandler(handler on_message_handler) Option
func WithRetryCnts ¶
func WithReturnHandler ¶
func WithReturnHandler(handler on_return_handler) Option
type PoolOption ¶
func WithNewFunc ¶
func WithNewFunc[T any](handler NewFuncCallBack[T]) PoolOption[T]
func WithSetPoolCap ¶
func WithSetPoolCap[T any](cap int) PoolOption[T]
type RabbitMqClient ¶
type RabbitMqClient struct {
Channel *amqp091.Channel // 非协程安全
CheckHandler on_check_handler //处理消息前的检查回调函数
MessageHandler on_message_handler
//当ack成功后触发的回调函数
OnAckedHandler on_acked_handler
//由客户端管理器注入
Config *RabbitmqConfig
}
rabbitmq二次封装-Channel不是协程安全的
func (*RabbitMqClient) BindDieExchangeAndQueue ¶
func (client *RabbitMqClient) BindDieExchangeAndQueue() error
声明并绑定死信交换机+队列
func (*RabbitMqClient) BindNormalExchangeAndQueue ¶
func (client *RabbitMqClient) BindNormalExchangeAndQueue() error
声明并绑定业务交换机+队列,并关联死信配置
func (*RabbitMqClient) Consumer ¶
func (client *RabbitMqClient) Consumer(queueName, consumerName string) error
订阅消息--异步的 订阅失败error被设置
func (*RabbitMqClient) Publish ¶
func (client *RabbitMqClient) Publish(attribute *MessageAttribute) error
发布消息 发布失败error被设置
type RabbitMqClientManager ¶
type RabbitMqClientManager struct {
// 内部是协程安全的
Conn *amqp091.Connection
// 连接异常时触发的用户自定义回调
ExceptionHandler on_exception_handler_t
// Channel池
Pool *BasePool[RabbitMqClient]
// 外部传入的处理路由失败的return回调
OnReturnHandler on_return_handler
//消息触发时调用的回调函数
OnMessageHandler on_message_handler
//ack成功后触发的回调函数
OnAckedHandler on_acked_handler
//调用MessageHandler前触发的回调函数
CheckHandler on_check_handler
//rabbitmq客户端配置信息
Config *RabbitmqConfig
Locker sync.Mutex
//是否健康
IsHealthy atomic.Bool
//重连次数,默认3次
RetryCounts int
Logger RabbitmqLogger
}
RabbitMQ客户端工厂
func NewRabbitmqClientFacroty ¶
func NewRabbitmqClientFacroty(opts ...Option) (*RabbitMqClientManager, error)
创建rabbitmq信道句柄工厂
func (*RabbitMqClientManager) CheckExceptionLoop ¶
func (m *RabbitMqClientManager) CheckExceptionLoop()
监听连接异常
func (*RabbitMqClientManager) GetClient ¶
func (m *RabbitMqClientManager) GetClient() (*RabbitMqClient, error)
============================ 获取Channel客户端 ============================
func (*RabbitMqClientManager) InitNotifyChannel ¶
func (m *RabbitMqClientManager) InitNotifyChannel() (bool, chan *amqp091.Error)
重置监听事件
func (*RabbitMqClientManager) PushClient ¶
func (m *RabbitMqClientManager) PushClient(client *RabbitMqClient) error
归还client
type RabbitmqConfig ¶
Click to show internal directories.
Click to hide internal directories.