Documentation
¶
Index ¶
Examples ¶
Constants ¶
const ( Redis = EndpointProtocol("redis") // Redis AMQP = EndpointProtocol("amqp") // AMQP )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AMQPSubscriber ¶
type AMQPSubscriber struct {
*Endpoint
// contains filtered or unexported fields
}
AMQPSubscriber represents a subscriber, which consumes messages from AMQP
func (*AMQPSubscriber) Close ¶
func (sub *AMQPSubscriber) Close()
Close closes the subscriber gracefully, it blocks until all messages are handled
func (*AMQPSubscriber) Run ¶
func (sub *AMQPSubscriber) Run()
Run starts the subscriber and blocks until the subscriber is closed
type ActionFunc ¶
type ActionFunc func(args ...interface{})
ActionFunc is the function that hanlding messages args is composed of context-related parameters
amqp args[0] should be amqp.Delivery
redis args[0] should be ...
type Endpoint ¶
type Endpoint struct {
Protocol EndpointProtocol
Original string
Redis struct {
Addr string
Password string
Channels []string
}
AMQP struct {
URI string
ExchangeName string
QueueName string
RouteKey []string
Ack bool
Exclusive bool
Type string
}
}
Endpoint represents an endpoint
type EndpointProtocol ¶
type EndpointProtocol string
EndpointProtocol is the type of protocol that the endpoint represents.
type RedisSubscriber ¶
type RedisSubscriber struct {
*Endpoint
// contains filtered or unexported fields
}
RedisSubscriber represents a subscriber, which consumes messages from redis
func (*RedisSubscriber) Close ¶
func (sub *RedisSubscriber) Close()
Close closes the subscriber gracefully, it blocks until all messages are finished
func (*RedisSubscriber) Run ¶
func (sub *RedisSubscriber) Run()
Run starts the subscriber and blocks until the subscriber is closed
type Setup ¶
type Setup struct {
ActionFunc ActionFunc
URL string
}
type Subscriber ¶
type Subscriber interface {
Run()
Close()
}
type SubscriberManager ¶
type SubscriberManager struct {
// contains filtered or unexported fields
}
SubscriberManager is a manager to control subscribers
Example ¶
logger := logrus.New()
subMgr := NewSubscriberManager(logger)
subMgr.Register(
"TestAMQPSubscriber",
&Setup{
URL: "amqp://root:root@rabbitmq:5672/test.amqp.exchange1/test.amqp.queue1?route=foo&route=bar&ack=false&type=direct",
ActionFunc: func(args ...interface{}) {
delivery := args[0].(amqp.Delivery)
delivery.Ack(false)
},
},
)
subMgr.Register(
"TestRedisSubscriber",
&Setup{
URL: "redis://:password@redis:6379/?channel=foo&channel=bar",
ActionFunc: func(args ...interface{}) {
message := args[0].(*redis.Message).Payload
fmt.Println(message)
},
},
)
subMgr.Run()
// Stop the subscribers
subMgr.GracefulStop()
func NewSubscriberManager ¶
func NewSubscriberManager(log logger) *SubscriberManager
NewSubscriberManager creates a mangager
func (*SubscriberManager) GracefulStop ¶
func (sm *SubscriberManager) GracefulStop()
GracefulStop stops the manager gracefully. It stops the subscribers from accepting new messages and blocks until all the pending messages are finished.
func (*SubscriberManager) Register ¶
func (sm *SubscriberManager) Register(name string, setup *Setup) error
func (*SubscriberManager) Run ¶
func (sm *SubscriberManager) Run()
Run starts the subscribers that the manager controls
func (*SubscriberManager) Validate ¶
func (sm *SubscriberManager) Validate(url string) error
Validate validates if a url is valid