在 etcd 中,事务是一组原子性操作,可以确保多个操作之间的原子性,并且可以确保一组操作在执行期间不会被其他操作中断。
下面是一个最简单的事务示例,txn
表示开启一个事务,在 compares
中,输入事务的执行条件,即 user1 = "bad"
,如果满足条件,则删除 user1
,否则将 user1
设置为 "good"
。
etcdctl txn --interactive
compares:
value("user1") = "bad"
success requests (get, put, delete):
del user1
failure requests (get, put, delete):
put user1 good
Client 处理
根据 etcdctl 的处理我们知道,txn
命令对应的处理函数是 txnCommandFunc()
,主要是将命令行输入转为 etcd 对应的操作方法并将事务请求发送给 Server。先看看大致流程:
与之前一样,txnCommandFunc()
中也是调用了 mustClientFromCmd()
将命令转为 Client
然后执行。
// file: etcdctl/ctlv3/command/txn_command.go
func txnCommandFunc(cmd *cobra.Command, args []string) {
...
// 用于读取命令行输入
reader := bufio.NewReader(os.Stdin)
// 创建一个事务
txn := mustClientFromCmd(cmd).Txn(context.Background())
promptInteractive("compares:")
// If 分支(compares)
txn.If(readCompares(reader)...)
// 打印到标准输出
promptInteractive("success requests (get, put, del):")
// Then 分支(success)
txn.Then(readOps(reader)...)
promptInteractive("failure requests (get, put, del):")
// Else 分支(failure)
txn.Else(readOps(reader)...)
// 提交事务到 Server
resp, err := txn.Commit()
...
}
客户端的 Txn()
方法创建了一个事务 txn
实例,实现了 Txn
接口。它包含了一套事务的完整方法。对于我们的示例来说,txn
结构体有三个主要成员:comps
、sus
和 fas
,分别对应命令行中的 compares
、success
、failure
。
// file: client/v3/txn.go
type Txn interface {
If(cs ...Cmp) Txn
Then(ops ...Op) Txn
Else(ops ...Op) Txn
Commit() (*TxnResponse, error)
}
type txn struct {
...
cmps []*pb.Compare // compares
sus []*pb.RequestOp // success requests
fas []*pb.RequestOp // failure requests
...
}
// file: client/v3/kv.go
func (kv *kv) Txn(ctx context.Context) Txn {
return &txn{
kv: kv,
ctx: ctx,
callOpts: kv.callOpts,
}
}
If
分支中的 readCompares()
用于解析命令行中的 compares
,将每一行的输入调用 ParseCompare()
转成一个 clientv3.Cmp
结构列表,Then
和 Else
分支则使用 readOps()
生成 clientv3.Op
结构列表。三个分支的核心方法就是将转换后的操作对象赋给自己对应的成员数组。
// file: client/v3/txn.go
func (txn *txn) If(cs ...Cmp) Txn {
...
for i := range cs {
txn.cmps = append(txn.cmps, (*pb.Compare)(&cs[i]))
}
}
func (txn *txn) Then(ops ...Op) Txn {
...
for _, op := range ops {
txn.sus = append(txn.sus, op.toRequestOp())
}
}
func (txn *txn) Else(ops ...Op) Txn {
...
for _, op := range ops {
txn.fas = append(txn.fas, op.toRequestOp())
}
}
完成分支操作的处理后,会调用 txn.Commit()
发送 gRPC 请求将事务提交到服务端。
func (txn *txn) Commit() (*TxnResponse, error) {
...
r := &pb.TxnRequest{Compare: txn.cmps, Success: txn.sus, Failure: txn.fas}
var resp *pb.TxnResponse
var err error
// 调用 KVClient 的 Txn 方法发送 gRPC 请求
resp, err = txn.kv.remote.Txn(txn.ctx, r, txn.callOpts...)
if err != nil {
return nil, toErr(txn.ctx, err)
}
return (*TxnResponse)(resp), nil
}
Server 处理
EtcdServer
实现了 KVServer
的 Txn
方法,跟 Put 操作 一样,最终会发送一个 raft 请求等待其它节点完成响应并返回结果。
func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
...
ctx = context.WithValue(ctx, traceutil.StartTimeKey, time.Now())
// 发送 Raft 请求
resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Txn: r})
if err != nil {
return nil, err
}
return resp.(*pb.TxnResponse), nil
}
Raft 节点收到的请求之后,会交由运行 etcd 服务时的调度器执行。
// file: server/etcdserver/server.go
func (s *EtcdServer) run() {
...
for {
select {
case ap := <-s.r.apply():
f := schedule.NewJob("server_applyAll", func(context.Context) { s.applyAll(&ep, &ap) })
sched.Schedule(f)
}
}
}
根据 Put 操作 的流程我们可以得知,Txn 操作最终会进入 applierV3backend.Txn()
方法。
// file: server/etcdserver/apply/apply.go
func (a *applierV3backend) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
return mvcctxn.Txn(ctx, a.lg, rt, a.txnModeWriteWithSharedBuffer, a.kv, a.lessor)
}
mvcctxn.Txn()
方法内的大致处理逻辑如下:
在进入方法之前,先来理清楚 applierV3backend
下事务相关的一些属性的关系,如 kv
。kv
是 etcd 的 MVCC 键值存储,在服务端运行创建 EtcdServer
实例时会被创建,之后 EtcdServer
会将它传递给 applierV3backend
来处理存储。
// file: server/etcdserver/server.go
func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
...
srv = &EtcdServer{...}
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig)
}
MVCC 存储的接口之间的关系如下:
// file: server/storage/mvcc/kv.go
type WatchableKV interface {
KV
Watchable
}
type KV interface {
ReadView
WriteView
Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead
Write(trace *traceutil.Trace) TxnWrite
HashStorage() HashStorage
Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error)
Commit()
Restore(b backend.Backend) error
Close() error
}
type Watchable interface {
NewWatchStream() WatchStream
}
type ReadView interface {
FirstRev() int64
Rev() int64
Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error)
}
type WriteView interface {
DeleteRange(key, end []byte) (n, rev int64)
Put(key, value []byte, lease lease.LeaseID) (rev int64)
}
mvcc.New()
创建调用到了 newWatchableStore()
创建了一个 watchableStore
结构体实例,它实现了 WatchableKV
接口。newWatchableStore()
又创建了一个 store
结构体实例,它实现了 KV
接口,以及 ReadView
和 WriteView
接口。store
实现了 KV
接口。
// file: server/storage/mvcc/watchable_store.go
func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) WatchableKV {
return newWatchableStore(lg, b, le, cfg)
}
func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *watchableStore {
...
s := &watchableStore{
store: NewStore(lg, b, le, cfg),
...
}
s.store.ReadView = &readView{s}
s.store.WriteView = &writeView{s}
}
NewStore()
创建了实际的 MVCC 存储,初始化了 B+ 树、事务的 Revision、BoltDB 事务的 Bucket 等等。在 Put 篇我们提到过,B+ 树中只存储数据的 key 和 revision 等元数据,实际的值存在 BoltDB 中。
// file: server/storage/mvcc/kvstore.go
func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *store {
...
s := &store{
b: b,
kvindex: newTreeIndex(lg),
currentRev: 1,
compactMainRev: -1,
...
}
...
// BoltDB Bucket 相关
tx := s.b.BatchTx()
tx.LockOutsideApply()
tx.UnsafeCreateBucket(schema.Key)
schema.UnsafeCreateMetaBucket(tx)
tx.Unlock()
...
}
// file: server/storage/mvcc/index.go
func newTreeIndex(lg *zap.Logger) index {
return &treeIndex{
// 创建 B+ 树
tree: btree.NewG(32, func(aki *keyIndex, bki *keyIndex) bool {
return aki.Less(bki)
}),
lg: lg,
}
}
接下来,回到 mvcctxn.Txn()
方法,首先会判断该事务是否是为写事务,如果 Success
和 Failure
操作集合中全部都是读操作,则会认为是一个只读事务。如果是一个写事务,会先创建一个读事务来获取 Compare
分支的结果来决定事务应该执行 Success
分支还是 Failure
分支,即 Then
或 Else
,之后对分支中的请求进行检查,完成读相关的操作之后释放读事务,创建一个写事务开始执行最终分支中的操作,最后释放事务并返回结果。
// file: server/etcdserver/txn/txn.go
func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWithSharedBuffer bool, kv mvcc.KV, lessor lease.Lessor) (*pb.TxnResponse, *traceutil.Trace, error) {
// 是否为写事务
isWrite := !IsTxnReadonly(rt)
// 创建读事务,使用 TxnWrite 包装
var txnWrite mvcc.TxnWrite
txnWrite = mvcc.NewReadOnlyTxnWrite(kv.Read(mvcc.SharedBufReadTxMode, trace))
// 执行 Compares 分支,判断事物的执行路径(Success 或 Failure)
var txnPath []bool
trace.StepWithFunction(
func() {
txnPath = compareToPath(txnWrite, rt)
},
"compare",
)
// 检查请求
if isWrite {
if _, err := checkRequests(txnWrite, rt, txnPath,
func(rv mvcc.ReadView, ro *pb.RequestOp) error { return checkRequestPut(rv, lessor, ro) }); err != nil {
txnWrite.End()
return nil, nil, err
}
}
if _, err := checkRequests(txnWrite, rt, txnPath, checkRequestRange); err != nil {
txnWrite.End()
return nil, nil, err
}
// 创建事务返回值句柄
txnResp, _ := newTxnResp(rt, txnPath)
// 释放读事务并创建写事务
if isWrite {
txnWrite.End()
txnWrite = kv.Write(trace)
}
// 执行最终分支
_, err := applyTxn(ctx, lg, kv, lessor, txnWrite, rt, txnPath, txnResp)
...
// 变更事务的 Revision
rev := txnWrite.Rev()
if len(txnWrite.Changes()) != 0 {
rev++
}
// 结束事务
txnWrite.End()
...
}
mvcc.NewReadOnlyTxnWrite(kv.Read())
用来创建一个只读事务,重点的代码在 kv.Read()
中,从前面的接口关系我们知道,kv.Read()
实际调用的是 store
的 Read()
方法,它为 store
本身以及部分资源都加上了读锁,其它事务的操作不会被阻塞。读锁创建之后,使用 mvcc.NewReadOnlyTxnWrite()
,表示一个只读事务。
// file: server/storage/mvcc/kvstore_txn.go
func (s *store) Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead {
// store 加上读锁
s.mu.RLock()
// 为当前 Revision 加上读锁防止被其它事务修改
s.revMu.RLock()
// 这里可以认为是创建一个 BoltDB 后端存储的读事务
var tx backend.ReadTx
...
tx = s.b.ReadTx()
tx.RLock()
// 获取当前 Revision 并释放读锁
firstRev, rev := s.compactMainRev, s.currentRev
s.revMu.RUnlock()
...
}
读事务创建完之后,调用 compareToPath()
执行 Compare
分支中的操作,其中又调用了 applyCompares()
,如果 Compare
结果的分支中有嵌套的事务,那么会递归地调用本方法继续执行 Compare
。
// file: server/etcdserver/txn/txn.go
func compareToPath(rv mvcc.ReadView, rt *pb.TxnRequest) []bool {
txnPath := make([]bool, 1)
ops := rt.Success
if txnPath[0] = applyCompares(rv, rt.Compare); !txnPath[0] {
ops = rt.Failure
}
// 执行嵌套事务 Compare
for _, op := range ops {
tv, ok := op.Request.(*pb.RequestOp_RequestTxn)
if !ok || tv.RequestTxn == nil {
continue
}
txnPath = append(txnPath, compareToPath(rv, tv.RequestTxn)...)
}
}
applyCompares()
执行 Compare
分支中的每一个操作,如果有其中一个不满足条件则返回 false,进入 Failure
分支,否则进入 Success
分支。
// file: server/etcdserver/txn/txn.go
func applyCompares(rv mvcc.ReadView, cmps []*pb.Compare) bool {
for _, c := range cmps {
if !applyCompare(rv, c) {
return false
}
}
return true
}
applyCompare()
执行 Compare
操作,它首先调用 rv.Range()
查找 Compare
中给定的 key 值,然后与给定的值进行比较,最终返回一个布尔值结果。
// file: server/etcdserver/txn/txn.go
func applyCompare(rv mvcc.ReadView, c *pb.Compare) bool {
// 查找键值对
rr, err := rv.Range(context.TODO(), c.Key, mkGteRange(c.RangeEnd), mvcc.RangeOptions{})
...
// 比较值
if len(rr.KVs) == 0 {
if c.Target == pb.Compare_VALUE {
return false
}
return compareKV(c, mvccpb.KeyValue{})
}
for _, kv := range rr.KVs {
if !compareKV(c, kv) {
return false
}
}
return true
}
rv.Range()
最终会进入到 storeTxnRead.Range()
方法中,storeTxnRead
就是之前创建的只读事务 mvcc.TxnWrite
的接口实现。tr.rangeKey()
会将 key 和 revision 作为 store
中 B+ 树的索引,从树中查找对应的 key 的元数据,如果树中存在,则从磁盘中(BoltDB)中找出 key 的值。(还记得 Put 操作吗?Put 就是往 B+ 树和磁盘中存储键值对)。
// file: server/storage/mvcc/kvstore_txn.go
func (tr *storeTxnRead) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
return tr.rangeKeys(ctx, key, end, tr.Rev(), ro)
}
func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
...
// 索引查找
revpairs, total := tr.s.kvindex.Revisions(key, end, rev, int(ro.Limit))
if len(revpairs) == 0 {
// 没有值则直接返回
return &RangeResult{KVs: nil, Count: total, Rev: curRev}, nil
}
...
// 存在键值对则从磁盘中取出实际的值
kvs := make([]mvccpb.KeyValue, limit)
for i, revpair := range revpairs[:len(kvs)] {
...
_, vs := tr.tx.UnsafeRange(schema.Key, revBytes, nil, 0)
...
kvs[i].Unmarshal(vs[0])
}
}
applyCompare()
的 rv.Range()
执行完之后,开始调用 compareKV()
进行比较,如果 rv.Range()
没有查出键值对且 Compare
操作为值类型比较的话,会直接返回 false,applyCompare()
,我们的示例操作即是如此,但这里还是浅看一下 compareKV()
是如何比较的。
// file: server/etcdserver/txn/txn.go
func compareKV(c *pb.Compare, ckv mvccpb.KeyValue) bool {
var result int
rev := int64(0)
switch c.Target {
case pb.Compare_VALUE:
var v []byte
// 从 Compare.TargetUnion 中取出给定的值进行比较
if tv, _ := c.TargetUnion.(*pb.Compare_Value); tv != nil {
v = tv.Value
}
// 字节比较
result = bytes.Compare(ckv.Value, v)
...
}
switch c.Result {
case pb.Compare_EQUAL:
return result == 0
...
}
}
回到 mvcc.Txn()
中,我们已经完成了 Compare
分支的工作,然后会检查请求的 key、版本号等等,这里不再细看。之后将读事务释放,创建一个写事务,最后执行 Compare
结果分支中的操作,执行完后累加事务的 Revision 并释放写事务。如果写事务或其中的嵌套事务执行操作时出现错误,那么会直接将服务 panic。也就是说,etcd 并不支持内部自动回滚事务,如果需要回滚,需要在程序错误处理中手动编写回滚逻辑。
func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWithSharedBuffer bool, kv mvcc.KV, lessor lease.Lessor) (*pb.TxnResponse, *traceutil.Trace, error) {
...
var txnPath []bool
trace.StepWithFunction(
func() {
txnPath = compareToPath(txnWrite, rt)
},
"compare",
)
// 检查请求...
...
if isWrite {
// 释放读事务
txnWrite.End()
// 创建写事务
txnWrite = kv.Write(trace)
}
...
// 执行最终分支操作
_, err := applyTxn(ctx, lg, kv, lessor, txnWrite, rt, txnPath, txnResp)
if err != nil {
if isWrite {
// 事务中的写操作出现异常,直接 panic
txnWrite.End()
lg.Panic("unexpected error during txn with writes", zap.Error(err))
} else {
// 非写事务仅输出错误日志
lg.Error("unexpected error during readonly txn", zap.Error(err))
}
}
// 更新 Revision 并释放写锁
rev := txnWrite.Rev()
if len(txnWrite.Changes()) != 0 {
rev++
}
txnWrite.End()
}
读事务的释放就是将之前在 store
和后端存储上加的读锁释放。写事务会给 store
加上读锁,在执行写事务时其它事务还是能够读取 store
中的资源,然后获取后端存储(BoltDB)的事务(bucket)并为其加上写锁,写事务执行期间无法访问存储中的值,保证数据的正确性。
// file: server/storage/mvcc/kvstore_txn.go
func (tr *storeTxnRead) End() {
tr.tx.RUnlock()
tr.s.mu.RUnlock()
}
func (s *store) Write(trace *traceutil.Trace) TxnWrite {
// store 加读锁
s.mu.RLock()
tx := s.b.BatchTx()
// 后端存储加写锁
tx.LockInsideApply()
tw := &storeTxnWrite{
storeTxnRead: storeTxnRead{s, tx, 0, 0, trace},
tx: tx,
beginRev: s.currentRev,
changes: make([]mvccpb.KeyValue, 0, 4),
}
// newMetricsTxnWrite 包装事务,目的是为了测量和统计一些数据
return newMetricsTxnWrite(tw)
}
加上写锁后,调用 applyTxn()
开始执行最终分支中的操作。在我们的示例中,最终会执行到 Failure
分支,示例的 Failure
分支是一个 Put ,最终进入到了 mvcc.Put()
方法中,这时就与我们 Put 篇的一致了,不同的是我们将当前写事务传给了 Put()
方法,它内部就不需要自己创建写事务了。如果 applyTxn()
执行出现错误,那么外部捕获会直接 panic。
// file: server/etcdserver/txn/txn.go
func applyTxn(ctx context.Context, lg *zap.Logger, kv mvcc.KV, lessor lease.Lessor, txnWrite mvcc.TxnWrite, rt *pb.TxnRequest, txnPath []bool, tresp *pb.TxnResponse) (txns int, err error) {
// 获取最终执行的分支
reqs := rt.Success
if !txnPath[0] {
reqs = rt.Failure
}
// 执行分支中的所有操作
for i, req := range reqs {
respi := tresp.Responses[i].Response
switch tv := req.Request.(type) {
...
case *pb.RequestOp_RequestPut:
// 进入 mvcc.Put 方法
resp, _, err := Put(ctx, lg, lessor, kv, txnWrite, tv.RequestPut)
if err != nil {
return 0, fmt.Errorf("applyTxn: failed Put: %w", err)
}
respi.(*pb.ResponseOp_ResponsePut).ResponsePut = resp
...
}
}
}
成功执行完成后会释放写事务,更新 store
的 revision 并将之前加的读锁和写锁释放。
func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWithSharedBuffer bool, kv mvcc.KV, lessor lease.Lessor) (*pb.TxnResponse, *traceutil.Trace, error) {
...
txnWrite.End()
}
func (tw *storeTxnWrite) End() {
// 修改 store revision
if len(tw.changes) != 0 {
tw.s.revMu.Lock()
tw.s.currentRev++
}
// 释放后端存储写锁
tw.tx.Unlock()
if len(tw.changes) != 0 {
tw.s.revMu.Unlock()
}
// 释放 store 读锁
tw.s.mu.RUnlock()
}
事务执行完成后,最终将事务结果返回到 EtcdServer
的 Txn
方法中,然后将结果响应给 gRPC 客户端。
func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
...
ctx = context.WithValue(ctx, traceutil.StartTimeKey, time.Now())
resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Txn: r})
if err != nil {
return nil, err
}
return resp.(*pb.TxnResponse), nil
}
总结
本文以一个最简单的示例了解事务的源码实现,在客户端的处理中,会将用户不同分支的输入生成为对应分支的的操作类型,然后跟其它请求一样,向服务端发送一个 gRPC 请求,同样的,服务端收到请求后会发送 raft 请求给到其它节点,最后调用 mvcc 存储来处理事务。在分析事务之前,我们还梳理了 mvcc 存储中一些核心的成员关系,如 store、B+ 树、revision、锁等。
示例中的写事务的处理流程如下:
- 创建一个读事务,即 mvcc 的读锁。
- 获取
If
分支中的结果,得出一个最终的执行路径(Then
或Else
),然后检查请求的有效性。 - 释放读事务,创建一个写事务,即 mvcc 的写锁。
- 执行
If
结果分支中的操作,在我们的示例中结果是一个 Put 操作,将当前写事务传给 Put 方法,最后将键值对存入 B+ 树与 bucket,更新事务的 revision,释放写事务。
每个 raft 节点(etcd 服务)都有自己的 mvcc 存储(store),当一个事务请求发送给其它 raft 节点时它们也会各自将自己的存储锁住,保证了节点间的数据一致性。需要注意的是,如果事务的结果分支中存在多个写操作,其中一个写操作出现错误时会导致 panic,可能需要在程序中手动编写回滚键值对的代码。
除此之外,还有对只读事务的一些处理,感兴趣的同学可以自行翻阅一下。