forked from zlyuancn/zconn
-
Notifications
You must be signed in to change notification settings - Fork 0
/
kafka_producer.go
110 lines (88 loc) · 2.64 KB
/
kafka_producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
/*
-------------------------------------------------
Author : Zhang Fan
date: 2020/6/9
Description :
-------------------------------------------------
*/
package zconn
import (
"errors"
"fmt"
"github.com/Shopify/sarama"
)
type kafkaProducerConnector struct{}
var _ IConnector = (*kafkaProducerConnector)(nil)
type KafkaProducerConfig struct {
Address []string
Async bool // 是否异步
}
func (*kafkaProducerConnector) NewEmptyConfig() interface{} {
return new(KafkaProducerConfig)
}
func (*kafkaProducerConnector) Connect(config interface{}) (instance interface{}, err error) {
conf := config.(KafkaProducerConfig)
kconf := sarama.NewConfig()
kconf.Producer.Return.Successes = true // producer把消息发给kafka之后不会等待结果返回
kconf.Producer.Return.Errors = true // 如果启用了该选项,未交付的消息将在Errors通道上返回,包括error(默认启用)。
if conf.Async {
producer, err := sarama.NewAsyncProducer(conf.Address, kconf)
if err != nil {
return nil, fmt.Errorf("连接失败: %s", err)
}
return producer, nil
}
producer, err := sarama.NewSyncProducer(conf.Address, kconf)
if err != nil {
return nil, fmt.Errorf("连接失败: %s", err)
}
return producer, nil
}
func (*kafkaProducerConnector) Close(instance interface{}) error {
if c, ok := instance.(sarama.SyncProducer); ok {
return c.Close()
}
if c, ok := instance.(sarama.AsyncProducer); ok {
return c.Close()
}
panic(errors.New("非sarama.SyncProducer或sarama.AsyncProducer结构"))
}
func AddKafkaProducer(config interface{}, conn_name ...string) {
AddConfig(KafkaProducer, config, conn_name...)
}
func GetKafkaProducer(conn_name ...string) (sarama.SyncProducer, error) {
c, ok := GetConn(KafkaProducer, conn_name...)
if !ok {
return nil, ErrNoConn
}
if !c.IsConnect() {
return nil, ErrConnNotConnected
}
if p, ok := c.Instance().(sarama.SyncProducer); ok {
return p, nil
}
panic(fmt.Errorf("非sarama.SyncProducer结构: %T", c.Instance()))
}
func MustKafkaProducer(conn_name ...string) sarama.SyncProducer {
c, err := GetKafkaProducer(conn_name...)
panicOnErr(err)
return c
}
func GetKafkaAsyncProducer(conn_name ...string) (sarama.AsyncProducer, error) {
c, ok := GetConn(KafkaProducer, conn_name...)
if !ok {
return nil, ErrNoConn
}
if !c.IsConnect() {
return nil, ErrConnNotConnected
}
if p, ok := c.Instance().(sarama.AsyncProducer); ok {
return p, nil
}
panic(fmt.Errorf("非sarama.AsyncProducer结构: %T", c.Instance()))
}
func MustKafkaAsyncProducer(conn_name ...string) sarama.AsyncProducer {
c, err := GetKafkaAsyncProducer(conn_name...)
panicOnErr(err)
return c
}