本文介绍在微服务架构中,常用于服务与服务之间的异步通信组件kafka。
kafka
Kafka是由Apache软件基金会开发的一个开源流处理平台,是一种高吞吐量的分布式发布订阅消息系统。
使用场景
- 在系统或应用程序之间构建可靠的用于传输实时数据的管道,消息队列功能
- 构建实时的流数据处理程序来变换或处理数据流,数据处理功能
通过docker部署kafka
首先需要安装zookeeper
1
docker run -d --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime wurstmeister/zookeeper
然后启动kafka容器
1
2
3
4
5
6docker run -d --name kafka -p 9092:9092 \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_ZOOKEEPER_CONNECT=172.17.0.1:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.0.1:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
-t wurstmeister/kafka
使用kafka-go
安装
1
go get github.com/segmentio/kafka-go
封装客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import (
"context"
"fmt"
"github.com/segmentio/kafka-go"
"time"
)
type Client struct {
host string // kafka地址,一般是:localhost:9092
instance *kafka.Conn //
}
// Connect 连接kafka服务
func (client *Client) Connect( topic string) error {
conn, err := kafka.DialLeader(context.Background(), "tcp", client.host, topic, 0)
if err != nil {
return fmt.Errorf("failed to dial learder: %v", err)
}
_ = conn.SetWriteDeadline(time.Now().Add(10*time.Second))
client.instance = conn
return nil
}
// NewClient 实例化
func NewClient(host string) *Client {
return &Client{host: host}
}