本文介绍如何通过etcd实现服务注册与发现。
Etcd
一个高可用的分布式键值(key-value)数据库。etcd内部采用raft协议作为一致性算法,etcd基于Go语言实现。
一致性算法raft
节点的三种角色:
- Leader: 负责接收客户端的请求,将日志复制到其他节点。
- Candidate: 用于选举Leader的一种角色。
- Follower: 普通节点,负责响应Leader和Candidate节点的请求。
服务注册与发现
服务注册
A服务启动时,将当前服务运行的IP
和Port
提交到Etcd保存,key为ServiceA
。
服务发现
B服务需要调用A服务的接口时,去Etcd通过ServiceA
这个key,找到存储的服务A的地址,即可发起调用。
服务注销
A服务现在要重启,就要在停止前主动注销。等待重启后再次注册即可。
接下来我们来通过etcd实现:服务注册/服务发现/服务注销
实现
首先初始化etcd的客户端:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import (
"go.etcd.io/etcd/clientv3"
"time"
)
var cli *clientv3.Client // etcd客户端
// Connect 连接
func Connect(endpoints []string, timeout time.Duration) (error) {
instance, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: timeout,
})
if err != nil {
return err
}
cli = instance
return nil
}
// KV 键值存储
func KV() clientv3.KV {
return clientv3.NewKV(cli)
}
// Lease 租约,控制过期时间
func Lease() clientv3.Lease {
return clientv3.NewLease(cli)
}服务组件: 实现服务注册、发现、注销的功能
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import (
"context"
"encoding/json"
"fmt"
"go.etcd.io/etcd/clientv3"
"log"
)
// Service
type Service struct {
// 前缀
prefix string
}
// Node 服务节点
type Node struct {
Name string `json:"name"` // 服务名称,比如:order
ID int `json:"id"` // 节点ID: 建议从1开始
Address string `json:"address"` // 服务地址,如:127.0.0.1:8080
Ttl int64 // 检测时间,单位:秒。比如设置5秒,那如果节点宕机5秒,这个节点就自动注销了
}
// Key 节点的唯一键名
func (node Node) Key() string {
return fmt.Sprintf("%s/%d", node.Name, node.ID)
}
// String 节点的存储数据,通过json序列化实现
func (node Node) String() string {
b, _ := json.Marshal(node)
return string(b)
}
// withPrefix 拼接前缀
func (service *Service) withPrefix(name string) string {
return fmt.Sprintf("/%s/%s", service.prefix, name)
}
// Register 服务注册
func (service *Service) Register(node Node) error {
ctx := context.Background()
// 申请lease
leaseResp, err := Lease().Grant(ctx, node.Ttl)
if err != nil {
return fmt.Errorf("unable to grant lease: %v", err)
}
// 保存数据
_, err = KV().Put(ctx, service.withPrefix(node.Key()), node.String(), clientv3.WithLease(leaseResp.ID))
if err != nil {
return fmt.Errorf("unable to put value: %v", err)
}
// 保持连接
if err := service.keepAlive(leaseResp.ID); err != nil {
return fmt.Errorf("unable to keep alive: %v", err)
}
return nil
}
// Unregister 服务注销
func (service *Service) Unregister(node Node) error {
if node.Name == "" {
return fmt.Errorf("unknown service name")
}
_, err := KV().Delete(context.TODO(), service.withPrefix(node.Key()))
if err != nil {
return err
}
return nil
}
// Discovery 服务发现
func (service *Service) Discovery(name string) ([]Node, error) {
resp, err := KV().Get(context.Background(), service.withPrefix(name), clientv3.WithPrefix())
if err != nil {
return nil, err
}
nodes := make([]Node, 0, resp.Count)
if resp.Count == 0 {
return nodes, nil
}
for _, kv := range resp.Kvs {
var node Node
if err := json.Unmarshal(kv.Value, &node); err != nil {
continue
}
nodes = append(nodes, node)
}
return nodes, err
}
// keepAlive 保持会话
func (service *Service) keepAlive(leaseId clientv3.LeaseID) error {
keepAliveRes, err := Lease().KeepAlive(context.TODO(), leaseId)
if err != nil {
return err
}
go func() {
for {
select {
case ret := <-keepAliveRes:
if ret != nil {
log.Println("keep alive success")
}
}
}
}()
return nil
}
// NewService 实例化
func NewService(prefix string) *Service {
return &Service{prefix: prefix}
}使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21func main() {
// 连接etcd
if err := Connect([]string{"127.0.0.1:2379"}, time.Second*10); err != nil {
panic(err.Error())
}
service := NewService("service")
// 服务注册:注册一个叫order的服务节点
err := service.Register(Node{
Name: "order",
ID: 1,
Address: "http://127.0.0.1:8081",
Ttl: 10,
})
// 服务发现:获取到节点信息,返回的是节点数组
node, err := service.Discovery("order")
// 服务注销: 程序关闭会自动注销,但是有延迟,如果想要立马注销,就调用Unregister方法
err = service.Unregister(Node{Name: "order", ID: 1})
}优化
服务发现不需要每次都去查询etcd里的数据,可以通过定期查询的方式去获取数据。但是如果某个节点已注销,这样会导致信息更新滞后,请求到已注销的节点上,出现异常。所以我们通过监听的方式来保持信息同步,这样可以做到实时性更新。
1 |
|
- 本文作者: Hongker
- 本文链接: https://hongker.github.io/2021/04/24/service-etcd/
- 版权声明: 本博客所有文章除特别声明外,均采用 MIT 许可协议。转载请注明出处!