rabbitmqutil

package
v1.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 21, 2025 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TextType     = "text/plain"
	JsonType     = "application/json"
	ProtobufType = "application/x-protobuf"

	//重连次数,默认3次
	DefaultRetryCounts = 3
)

Variables

View Source
var (
	ErrDisConnected = errors.New("dial rabbitmq failed")
	ErrNotSetLogger = errors.New("not set logger")
)
View Source
var (
	DefaultObjCnts = 100
	ErrNilObj      = errors.New("pool obj is nil")
)

Functions

This section is empty.

Types

type BasePool

type BasePool[T any] struct {
	Pool *sync.Pool
	//用于创建新对象的函数
	NewFunc func() *T
	Mtx     sync.Mutex
	Cap     int
}

池的泛型实现

func NewPool

func NewPool[T any](opts ...PoolOption[T]) *BasePool[T]

创建一个新的泛型连接池 newFunc 参数是用于创建新对象的函数

func (*BasePool[T]) Clear

func (c *BasePool[T]) Clear()

func (*BasePool[T]) Get

func (c *BasePool[T]) Get() (*T, error)

从pool中获取一个对象

func (*BasePool[T]) Put

func (c *BasePool[T]) Put(obj *T) error

归还对象

type MessageAttribute

type MessageAttribute struct {
	//消息要投入的交换机名称
	ExchangeName string
	//路由键
	RouteKey string
	//消息ID
	MsgID string
	//消息类型
	MsgType string
	//载荷
	Payload []byte
	//创建时间
	CreateAt time.Time
}

消息的属性

type NewFuncCallBack

type NewFuncCallBack[T any] = func() *T

type Option

type Option = func(r *RabbitMqClientManager)

func WithAckedHandler

func WithAckedHandler(handler on_acked_handler) Option

func WithCheckHandler

func WithCheckHandler(handler on_check_handler) Option

func WithConfig

func WithConfig(config *RabbitmqConfig) Option

func WithExceptHandler

func WithExceptHandler(handler on_exception_handler_t) Option

func WithInitLog

func WithInitLog(outlogger RabbitmqLogger) Option

func WithMessageHandler

func WithMessageHandler(handler on_message_handler) Option

func WithRetryCnts

func WithRetryCnts(cnts int) Option

func WithReturnHandler

func WithReturnHandler(handler on_return_handler) Option

type PoolOption

type PoolOption[T any] = func(c *BasePool[T])

func WithNewFunc

func WithNewFunc[T any](handler NewFuncCallBack[T]) PoolOption[T]

func WithSetPoolCap

func WithSetPoolCap[T any](cap int) PoolOption[T]

type RabbitMqClient

type RabbitMqClient struct {
	Channel        *amqp091.Channel // 非协程安全
	CheckHandler   on_check_handler //处理消息前的检查回调函数
	MessageHandler on_message_handler
	//当ack成功后触发的回调函数
	OnAckedHandler on_acked_handler
	//由客户端管理器注入
	Config *RabbitmqConfig
}

rabbitmq二次封装-Channel不是协程安全的

func (*RabbitMqClient) BindDieExchangeAndQueue

func (client *RabbitMqClient) BindDieExchangeAndQueue() error

声明并绑定死信交换机+队列

func (*RabbitMqClient) BindNormalExchangeAndQueue

func (client *RabbitMqClient) BindNormalExchangeAndQueue() error

声明并绑定业务交换机+队列,并关联死信配置

func (*RabbitMqClient) Consumer

func (client *RabbitMqClient) Consumer(queueName, consumerName string) error

订阅消息--异步的 订阅失败error被设置

func (*RabbitMqClient) Publish

func (client *RabbitMqClient) Publish(attribute *MessageAttribute) error

发布消息 发布失败error被设置

type RabbitMqClientManager

type RabbitMqClientManager struct {
	// 内部是协程安全的
	Conn *amqp091.Connection
	// 连接异常时触发的用户自定义回调
	ExceptionHandler on_exception_handler_t
	// Channel池
	Pool *BasePool[RabbitMqClient]
	// 外部传入的处理路由失败的return回调
	OnReturnHandler on_return_handler
	//消息触发时调用的回调函数
	OnMessageHandler on_message_handler
	//ack成功后触发的回调函数
	OnAckedHandler on_acked_handler
	//调用MessageHandler前触发的回调函数
	CheckHandler on_check_handler
	//rabbitmq客户端配置信息
	Config *RabbitmqConfig
	Locker sync.Mutex
	//是否健康
	IsHealthy atomic.Bool
	//重连次数,默认3次
	RetryCounts int
	Logger      RabbitmqLogger
}

RabbitMQ客户端工厂

func NewRabbitmqClientFacroty

func NewRabbitmqClientFacroty(opts ...Option) (*RabbitMqClientManager, error)

创建rabbitmq信道句柄工厂

func (*RabbitMqClientManager) CheckExceptionLoop

func (m *RabbitMqClientManager) CheckExceptionLoop()

监听连接异常

func (*RabbitMqClientManager) Connect

func (m *RabbitMqClientManager) Connect() error

建立TCP连接

func (*RabbitMqClientManager) GetClient

func (m *RabbitMqClientManager) GetClient() (*RabbitMqClient, error)

============================ 获取Channel客户端 ============================

func (*RabbitMqClientManager) InitNotifyChannel

func (m *RabbitMqClientManager) InitNotifyChannel() (bool, chan *amqp091.Error)

重置监听事件

func (*RabbitMqClientManager) PushClient

func (m *RabbitMqClientManager) PushClient(client *RabbitMqClient) error

归还client

type RabbitmqConfig

type RabbitmqConfig struct {
	Address             string
	User                string
	AccessKey           string
	VirtualHostName     string
	NormalExchangeName  string
	NormalQueueName     string
	NormalQueueRouteKey string
	DieExchangeName     string
	DieQueueName        string
	DieQueueRouteKey    string
	MessageTTL          int64
	ConsumerName        string
	MaxClientCnts       int
}

type RabbitmqLogger

type RabbitmqLogger interface {
	Debug(expand *map[string]any, format string)
	Info(expand *map[string]any, format string)
	Error(expand *map[string]any, format string)
}

日志器接口

var Log RabbitmqLogger

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL