安装pakcgae
我们使用的是v3版本的etcd client, 首先通过go get下载并编译安装etcd clinet v3
。
文档地址:https://godoc.org/github.com/coreos/etcd/clientv3
1
| go get github.com/coreos/etcd/clientv3
|
所有相关依赖包会自动下载编译,包括protobuf、grpc等。这里需要注意需要将 go.mod
中添加一行,使用grpc的v1.26.0版本,否则后面连接会报错:undefined: balancer.PickOptions
1
| replace google.golang.org/grpc => google.golang.org/grpc v1.26.0
|
连接客户端
创建客户端使用 clientv3.New
,它需要传入一个Config配置,这里传了2个选项:
Endpoints
:etcd的多个节点服务地址。
DialTimeout
:创建client的首次连接超时时间,这里传了5秒,如果5秒都没有连接成功就会返回err;一旦client创建成功,我们就不用再关心后续底层连接的状态了,client内部会重连。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| package main import ( "context" "fmt" "time" "github.com/coreos/etcd/clientv3" ) var defaultTimeout = 5 * time.Second var Client *clientv3.Client func Init() *clientv3.Client { cli, err := clientv3.New(clientv3.Config{ Endpoints: []string{"localhost:2379"}, DialTimeout: defaultTimeout, }) if err != nil { fmt.Println("clientv3 Init error:", err) panic(err) } Client = cli fmt.Println("clientv3 Init success") return cli } func main() { cli := Init() defer cli.Close() }
|
返回的 client
类型中,几个etcd客户端核心功能模块如下:
- Cluster:向集群里增加etcd服务端节点之类,属于管理员操作。
- KV:我们主要使用的功能,即K-V键值库的操作。
- Lease:租约相关操作,比如申请一个TTL=10秒的租约(应用给key可以实现键值的自动过期)。
- Watcher:观察订阅,从而监听最新的数据变化。
- Auth:管理etcd的用户和权限,属于管理员操作。
- Maintenance:维护etcd,比如主动迁移etcd的leader节点,属于管理员操作。
Client.KV
Client.KV 是我们平时主要使用的功能,它是一个 interface
,提供了所有关于K-V操作的方法,我们前面通过 etcdctl
执行的相关命令,都可以在这里来操作。
我们通过方法 clientv3.NewKV()
来获得KV接口的实现(实现中内置了错误重试机制):
1 2 3 4 5
| func main() { ... kv := clientv3.NewKV(cli) }
|
接下来,我们将通过 kv
操作etcd中的数据。
Put
etcd v3使用gRPC进行远程程序调用,并且clientv3使用 grpc-go
连接etcd。确保在使用完客户端后关闭它,如果客户端没有关闭,连接将会有泄漏的goroutines。指定超时时间,通过 context.WithTimeout
使用APIs:
函数声明如下:
1
| Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
|
第一个参数是goroutine的上下文Context。后面两个参数分别是key和value;另外,还支持一个变长参数 ,可以传递一些控制项来影响Put的行为,例如可以携带一个lease ID来支持key过期。
例:
1 2 3 4 5 6
| ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) putResp, err := kv.Put(ctx, "/test", "Hello world") cancel() fmt.Println(putResp)
|
我们需要判断err来确定操作是否成功。
不同的KV操作对应不同的response结构,所有KV操作返回的response结构如下:
1 2 3 4 5 6 7
| type ( CompactResponse pb.CompactionResponse PutResponse pb.PutResponse GetResponse pb.RangeResponse DeleteResponse pb.DeleteRangeResponse TxnResponse pb.TxnResponse )
|
Get
函数声明如下:
1 2 3 4 5 6 7 8 9
| Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)
|
我们可以传递一些控制参数来影响Get的行为,比如:WithFromKey表示读取从参数key开始递增的所有key,而不是读取单个key。
例:使用KV的Get方法来读取给定键的值:
1 2 3 4 5 6
| resp, err := kv.Get(context.TODO(), "/test") for _, ev := range resp.Kvs { fmt.Printf("%s : %s\n", ev.Key, ev.Value) }
|
上面的例子中,没有传递opOption,所以就是获取 key=/test 的最新版本数据。
1 2 3 4 5 6 7 8 9 10
| type RangeResponse struct { Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"` Kvs []*mvccpb.KeyValue `protobuf:"bytes,2,rep,name=kvs" json:"kvs,omitempty"` More bool `protobuf:"varint,3,opt,name=more,proto3" json:"more,omitempty"` Count int64 `protobuf:"varint,4,opt,name=count,proto3" json:"count,omitempty"` }
|
Kvs字段,保存了Get查询到的所有k-v对,因为上述例子只Get了一个单key,所以只需要判断一下 len(Kvs) 是否等于1即可知道key是否存在。
RangeResponse.More
和 Count
,当我们使用 withLimit()
等选项进行Get时会发挥作用,相当于翻页查询。
WithPrefix前缀匹配
1
| rangeResp, err := kv.Get(context.TODO(), "/test/", clientv3.WithPrefix())
|
WithPrefix()
是指查找以/test/为前缀的所有key,因此可以模拟出查找子目录的效果。
etcd是一个有序的k-v存储,因此 /test/
为前缀的key总是顺序排列在一起。
withPrefix()实际上会转化为范围查询,它根据前缀/test/生成了一个前闭后开的 key range:[“/test/”, “/test0”)
,为什么呢?因为比/大的字符是0,所以以/test0作为范围的末尾,就可以扫描到所有以/test/为前缀的key了。
Lease
etcd客户端的Lease对象可以通过以下的代码获取到
1
| lease := clientv3.NewLease(cli)
|
Lease提供了功能如下:
Grant
:分配一个租约。
Revoke
:释放一个租约。
TimeToLive
:获取剩余TTL时间。
Leases
:列举所有etcd中的租约。
KeepAlive
:自动定时的续约某个租约。
KeepAliveOnce
:为某个租约续约一次。
Close
:释放当前客户端建立的所有租约。
要想实现key自动过期,首先得创建一个租约,下面的代码创建一个TTL为60秒的租约:
1
| grantResp, err := lease.Grant(context.TODO(), 10)
|
返回的grantResponse的结构体:
1 2 3 4 5 6 7
| type LeaseGrantResponse struct { *pb.ResponseHeader ID LeaseID TTL int64 Error string }
|
接下来我们用这个Lease往etcd中存储一个60秒过期的key:
1
| putResp, err := kv.Put(context.TODO(), "/test/key1", "hello world", clientv3.WithLease(grantResp.ID))
|
注意: 如果Put之前Lease已经过期了,那么这个Put操作会返回error,此时需要重新分配Lease。
实现服务注册时,需要主动给Lease进行续约,通常是以小于TTL的间隔循环调用Lease的 KeepAliveOnce() 方法对租约进行续期,一旦某个服务节点出错无法完成租约的续期,等key过期后客户端即无法在查询服务时获得对应节点的服务,这样就通过租约到期实现了服务的错误隔离。
1
| keepResp, err := lease.KeepAliveOnce(context.TODO(), grantResp.ID)
|
Txn事务
etcd中事务是 原子执行
的,只支持 if … then … else …
这种表达。首先来看一下Txn中定义的方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| type Txn interface { If(cs ...Cmp) Txn Then(ops ...Op) Txn Else(ops ...Op) Txn Commit() (*TxnResponse, error) }
|
通过KV对象开启一个事务:
1
| txn := kv.Txn(context.TODO())
|
例:下面的测试程序,判断如果key的版本号是2,则Put 键值k1和k2,否则Put键值k3和4。
1 2 3 4 5 6 7
| kv.Txn(context.TODO()).If( clientv3.Compare(clientv3.Version(key), "=", 2) ).Then( clientv3.OpPut(k1,v1), clentv3.OpPut(k2,v2) ).Else( clientv3.OpPut(k3,v3), clientv3.OpPut(k4,v4) ).Commit()
|
Watch
Watch用于 监听某个键的变化, Watch调用后返回一个 WatchChan
,它的类型声明如下:
1 2 3 4 5 6 7 8 9 10 11 12
| type WatchChan <-chan WatchResponse type WatchResponse struct { Header pb.ResponseHeader Events []*Event CompactRevision int64 Canceled bool Created bool }
|
当监听的key有变化后会向 WatchChan
发送 WatchResponse
。
应用场景:系统配置的热加载,我们可以在系统读取到存储在etcd key中的配置后,用Watch监听key的变化。在单独的 goroutine
中接收WatchChan发送过来的数据,并将更新应用到系统设置的配置变量中,这样系统就实现了配置变量的热加载。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| type Config struct { config1 string config2 string } var appConfig Appconfig func watchConfig(clt *clientv3.Client, key string, c interface{}) { watchCh := clt.Watch(context.TODO(), key) go func() { for res := range watchCh { value := res.Events[0].Kv.Value if err := json.Unmarshal(value, c); err != nil { fmt.Println("watchConfig err:", err) continue } fmt.Println("watchConfig", c) } }() } watchConfig(client, "config_key", &Config)
|