安装pakcgae

我们使用的是v3版本的etcd client, 首先通过go get下载并编译安装etcd clinet v3

文档地址:https://godoc.org/github.com/coreos/etcd/clientv3

1
go get github.com/coreos/etcd/clientv3

所有相关依赖包会自动下载编译,包括protobuf、grpc等。这里需要注意需要将 go.mod 中添加一行,使用grpc的v1.26.0版本,否则后面连接会报错:undefined: balancer.PickOptions

1
replace google.golang.org/grpc => google.golang.org/grpc v1.26.0

连接客户端

创建客户端使用 clientv3.New,它需要传入一个Config配置,这里传了2个选项:

  • Endpoints:etcd的多个节点服务地址。
  • DialTimeout:创建client的首次连接超时时间,这里传了5秒,如果5秒都没有连接成功就会返回err;一旦client创建成功,我们就不用再关心后续底层连接的状态了,client内部会重连。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package main
import (
"context"
"fmt"
"time"
"github.com/coreos/etcd/clientv3"
)
var defaultTimeout = 5 * time.Second
var Client *clientv3.Client
func Init() *clientv3.Client {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: defaultTimeout,
})
if err != nil {
fmt.Println("clientv3 Init error:", err)
panic(err)
}
Client = cli
fmt.Println("clientv3 Init success")
return cli
}
func main() {
// 初始化
cli := Init()
defer cli.Close()
}

返回的 client类型中,几个etcd客户端核心功能模块如下:

  • Cluster:向集群里增加etcd服务端节点之类,属于管理员操作。
  • KV:我们主要使用的功能,即K-V键值库的操作。
  • Lease:租约相关操作,比如申请一个TTL=10秒的租约(应用给key可以实现键值的自动过期)。
  • Watcher:观察订阅,从而监听最新的数据变化。
  • Auth:管理etcd的用户和权限,属于管理员操作。
  • Maintenance:维护etcd,比如主动迁移etcd的leader节点,属于管理员操作。

Client.KV

Client.KV 是我们平时主要使用的功能,它是一个 interface,提供了所有关于K-V操作的方法,我们前面通过 etcdctl 执行的相关命令,都可以在这里来操作。

我们通过方法 clientv3.NewKV() 来获得KV接口的实现(实现中内置了错误重试机制):

1
2
3
4
5
func main() {
...
kv := clientv3.NewKV(cli)
}

接下来,我们将通过 kv 操作etcd中的数据。

Put

etcd v3使用gRPC进行远程程序调用,并且clientv3使用 grpc-go 连接etcd。确保在使用完客户端后关闭它,如果客户端没有关闭,连接将会有泄漏的goroutines。指定超时时间,通过 context.WithTimeout 使用APIs:

函数声明如下:

1
Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)

第一个参数是goroutine的上下文Context。后面两个参数分别是key和value;另外,还支持一个变长参数 ,可以传递一些控制项来影响Put的行为,例如可以携带一个lease ID来支持key过期。

例:

1
2
3
4
5
6
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
putResp, err := kv.Put(ctx, "/test", "Hello world")
cancel()
fmt.Println(putResp)
// &{cluster_id:14841639068965178418 member_id:10276657743932975437 revision:28 raft_term:5 <nil> {} [] 0}

我们需要判断err来确定操作是否成功。

不同的KV操作对应不同的response结构,所有KV操作返回的response结构如下:

1
2
3
4
5
6
7
type (
CompactResponse pb.CompactionResponse
PutResponse pb.PutResponse
GetResponse pb.RangeResponse
DeleteResponse pb.DeleteRangeResponse
TxnResponse pb.TxnResponse
)

Get

函数声明如下:

1
2
3
4
5
6
7
8
9
// Get retrieves keys.
// By default, Get will return the value for "key", if any.
// When passed WithRange(end), Get will return the keys in the range [key, end).
// When passed WithFromKey(), Get returns keys greater than or equal to key.
// When passed WithRev(rev) with rev > 0, Get retrieves keys at the given revision;
// if the required revision is compacted, the request will fail with ErrCompacted .
// When passed WithLimit(limit), the number of returned keys is bounded by limit.
// When passed WithSort(), the keys will be sorted.
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)

我们可以传递一些控制参数来影响Get的行为,比如:WithFromKey表示读取从参数key开始递增的所有key,而不是读取单个key。

例:使用KV的Get方法来读取给定键的值:

1
2
3
4
5
6
resp, err := kv.Get(context.TODO(), "/test")
for _, ev := range resp.Kvs {
fmt.Printf("%s : %s\n", ev.Key, ev.Value)
// "/test": "hello world"
}

上面的例子中,没有传递opOption,所以就是获取 key=/test 的最新版本数据。

1
2
3
4
5
6
7
8
9
10
type RangeResponse struct {
Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`
// kvs is the list of key-value pairs matched by the range request.
// kvs is empty when count is requested.
Kvs []*mvccpb.KeyValue `protobuf:"bytes,2,rep,name=kvs" json:"kvs,omitempty"`
// more indicates if there are more keys to return in the requested range.
More bool `protobuf:"varint,3,opt,name=more,proto3" json:"more,omitempty"`
// count is set to the number of keys within the range when requested.
Count int64 `protobuf:"varint,4,opt,name=count,proto3" json:"count,omitempty"`
}

Kvs字段,保存了Get查询到的所有k-v对,因为上述例子只Get了一个单key,所以只需要判断一下 len(Kvs) 是否等于1即可知道key是否存在。

RangeResponse.MoreCount,当我们使用 withLimit() 等选项进行Get时会发挥作用,相当于翻页查询。

WithPrefix前缀匹配

1
rangeResp, err := kv.Get(context.TODO(), "/test/", clientv3.WithPrefix())

WithPrefix() 是指查找以/test/为前缀的所有key,因此可以模拟出查找子目录的效果。

etcd是一个有序的k-v存储,因此 /test/ 为前缀的key总是顺序排列在一起。

withPrefix()实际上会转化为范围查询,它根据前缀/test/生成了一个前闭后开的 key range:[“/test/”, “/test0”),为什么呢?因为比/大的字符是0,所以以/test0作为范围的末尾,就可以扫描到所有以/test/为前缀的key了。

Lease

etcd客户端的Lease对象可以通过以下的代码获取到

1
lease := clientv3.NewLease(cli)

Lease提供了功能如下:

  • Grant:分配一个租约。
  • Revoke:释放一个租约。
  • TimeToLive:获取剩余TTL时间。
  • Leases:列举所有etcd中的租约。
  • KeepAlive:自动定时的续约某个租约。
  • KeepAliveOnce:为某个租约续约一次。
  • Close:释放当前客户端建立的所有租约。

要想实现key自动过期,首先得创建一个租约,下面的代码创建一个TTL为60秒的租约:

1
grantResp, err := lease.Grant(context.TODO(), 10)

返回的grantResponse的结构体:

1
2
3
4
5
6
7
// LeaseGrantResponse wraps the protobuf message LeaseGrantResponse.
type LeaseGrantResponse struct {
*pb.ResponseHeader
ID LeaseID
TTL int64
Error string
}

接下来我们用这个Lease往etcd中存储一个60秒过期的key:

1
putResp, err := kv.Put(context.TODO(), "/test/key1", "hello world", clientv3.WithLease(grantResp.ID))

注意: 如果Put之前Lease已经过期了,那么这个Put操作会返回error,此时需要重新分配Lease。

实现服务注册时,需要主动给Lease进行续约,通常是以小于TTL的间隔循环调用Lease的 KeepAliveOnce() 方法对租约进行续期,一旦某个服务节点出错无法完成租约的续期,等key过期后客户端即无法在查询服务时获得对应节点的服务,这样就通过租约到期实现了服务的错误隔离。

1
keepResp, err := lease.KeepAliveOnce(context.TODO(), grantResp.ID)

Txn事务

etcd中事务是 原子执行 的,只支持 if … then … else … 这种表达。首先来看一下Txn中定义的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
type Txn interface {
// If takes a list of comparison. If all comparisons passed in succeed,
// the operations passed into Then() will be executed. Or the operations
// passed into Else() will be executed.
If(cs ...Cmp) Txn
// Then takes a list of operations. The Ops list will be executed, if the
// comparisons passed in If() succeed.
Then(ops ...Op) Txn
// Else takes a list of operations. The Ops list will be executed, if the
// comparisons passed in If() fail.
Else(ops ...Op) Txn
// Commit tries to commit the transaction.
Commit() (*TxnResponse, error)
}

通过KV对象开启一个事务:

1
txn := kv.Txn(context.TODO())

例:下面的测试程序,判断如果key的版本号是2,则Put 键值k1和k2,否则Put键值k3和4。

1
2
3
4
5
6
7
kv.Txn(context.TODO()).If(
clientv3.Compare(clientv3.Version(key), "=", 2)
).Then(
clientv3.OpPut(k1,v1), clentv3.OpPut(k2,v2)
).Else(
clientv3.OpPut(k3,v3), clientv3.OpPut(k4,v4)
).Commit()

Watch

Watch用于 监听某个键的变化, Watch调用后返回一个 WatchChan,它的类型声明如下:

1
2
3
4
5
6
7
8
9
10
11
12
type WatchChan <-chan WatchResponse
type WatchResponse struct {
Header pb.ResponseHeader
Events []*Event
CompactRevision int64
Canceled bool
Created bool
}

当监听的key有变化后会向 WatchChan 发送 WatchResponse

应用场景:系统配置的热加载,我们可以在系统读取到存储在etcd key中的配置后,用Watch监听key的变化。在单独的 goroutine 中接收WatchChan发送过来的数据,并将更新应用到系统设置的配置变量中,这样系统就实现了配置变量的热加载。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
type Config struct {
config1 string
config2 string
}
var appConfig Appconfig
func watchConfig(clt *clientv3.Client, key string, c interface{}) {
watchCh := clt.Watch(context.TODO(), key)
go func() {
for res := range watchCh {
value := res.Events[0].Kv.Value
if err := json.Unmarshal(value, c); err != nil {
fmt.Println("watchConfig err:", err)
continue
}
fmt.Println("watchConfig", c)
}
}()
}
watchConfig(client, "config_key", &Config)