30 January 2020

问题定义

笔者在这篇文章中想要探讨的问题是:当向 Kubernetes ApiServer 并发的发起多个请求,对同一个 API 资源对象进行更新时,Kubernetes ApiServer 是如何确保更新不丢失,以及如何做冲突检测的?

将 Kubectl 作为切入点

Kubectl 的命令有很多,其中笔者经常用于修改 API 资源对象的命令是 apply 和 edit。接下来以修改 Deployment 资源对象为例子,通过观察这2个命令发出的请求,发现它们都是使用 HTTP PATCH:

PATCH /apis/extensions/v1beta1/namespaces/default/deployments/nginx HTTP/1.1
Host: 127.0.0.1:8080
User-Agent: kubectl/v0.0.0 (linux/amd64) kubernetes/$Format
Content-Length: 246
Accept: application/json
Content-Type: application/strategic-merge-patch+json
Uber-Trace-Id: 5ca3fde0b9f9aaf1:0c0358897c8e0ef8:0f38135280523f87:1
Accept-Encoding: gzip

{"spec":{"template":{"spec":{"$setElementOrder/containers":[{"name":"nginx"}],"containers":[{"$setElementOrder/env":[{"name":"DEMO_GREETING"}],"env":[{"name":"DEMO_GREETING","value":"Hello from the environment#kubectl edit"}],"name":"nginx"}]}}}}

跟踪 ApiServer 对 PATCH 请求的处理

ApiServer 处理 PATCH 请求的调用栈如下:

restfulPatchResource
    // 参数
    r rest.Patcher // registry.Store
    scope handlers.RequestScope
    admit admission.Interface
    supportedTypes []string // 支持的资源对象 PATCH 类型
    // 返回值
    restful.RouteFunction // go-restful 的路由处理回调

handlers.PatchResource
    // 创建 handlers.patcher 对象
    // 参数
    r rest.Patcher // registry.Store
    scope *RequestScope
    admit admission.Interface
    patchTypes []string // 支持的资源对象 PATCH 类型
    // 返回值
    http.HandlerFunc

k8s.io/apiserver/pkg/endpoints/handlers.(*patcher).patchResource
    // 参数
    ctx context.Context // http.Request.Context
    scope *RequestScope
    // 返回值
    runtime.Object // 更新后的资源对象
    bool // 是否创建了资源对象(当更新一个不存在的资源对象时,可以直接创建)
    error

k8s.io/apiserver/pkg/registry/generic/registry.(*Store).Update
    // 参数
    ctx context.Context
    name string // 资源对象的名称,例如:/deployments/default/nginx
    objInfo rest.UpdatedObjectInfo // 更新资源对象所需要的信息
    createValidation rest.ValidateObjectFunc
    updateValidation rest.ValidateObjectUpdateFunc
    forceAllowCreate bool
    options *metav1.UpdateOptions
    // 返回值
    runtime.Object // 更新后的资源对象
    bool // 是否创建了资源对象(当更新一个不存在的资源对象时,可以直接创建)
    error

k8s.io/apiserver/pkg/registry/generic/registry.(*DryRunnableStorage).GuaranteedUpdate
    // 实现对资源对象进行模拟更新的逻辑
    // 如果 dryRun = true,则执行完后不会再调用持久化操作
    // 参数
    ctx context.Context
    key string // 资源对象的名称,例如:/deployments/default/nginx
    ptrToType runtime.Object, // 资源对象的指针类型,由 registry.Store.NewFunc 创建
    ignoreNotFound bool, // ???
    preconditions *storage.Preconditions, // 执行更新前的检查,检查 UID 和 ResourceVersion
    tryUpdate storage.UpdateFunc, // registry.Store.Update 方法中的闭包函数,执行资源对象的更新操作
    dryRun bool, // 是否空跑(即执行输入检查,在内存中尝试更新,但不持久化)
    suggestion ...runtime.Object
    // 返回值
    error

k8s.io/apiserver/pkg/storage/cacher.(*Cacher).GuaranteedUpdate
    // 从内存缓存 cacher.Cacher.watchCache 中查询 key 是否存在,若存在,作为 suggestion 传递下去
    // 参数
    ctx context.Context
    key string // 资源对象的名称,例如:/deployments/default/nginx
    ptrToType runtime.Object // 资源对象的指针类型,由 registry.Store.NewFunc 创建
    ignoreNotFound bool
    preconditions *storage.Preconditions // 执行更新前的检查,检查 UID 和 ResourceVersion
    tryUpdate storage.UpdateFunc // registry.Store.Update 方法中的闭包函数,执行资源对象的更新操作
    _ ...runtime.Object
    // 返回值
    error

k8s.io/apiserver/pkg/storage/etcd3.(*store).GuaranteedUpdate
    // 调用 tryUpdate 更新资源对象,并通过 Etcd 的事务更新操作,持久化到 Etcd
    // 参数
    ctx context.Context
    key string // 资源对象的名称,例如:/deployments/default/nginx
    out runtime.Object // 资源对象的指针类型,由 registry.Store.NewFunc 创建
    ignoreNotFound bool
    preconditions *storage.Preconditions // 执行更新前的检查,检查 UID 和 ResourceVersion
    tryUpdate storage.UpdateFunc // registry.Store.Update 方法中的闭包函数,执行资源对象的更新操作
    suggestion ...runtime.Object // 如果在内存缓存中能找到,则可以避免直接访问 Etcd
    // 返回值
    error

在整个调用栈中,有几个关键点需要展开分析下:

storage.Preconditions

执行 tryUpdate 前的检查。

// Preconditions must be fulfilled before an operation (update, delete, etc.) is carried out.
type Preconditions struct {
    // Specifies the target UID.
    // +optional
    UID *types.UID `json:"uid,omitempty"`
    // Specifies the target ResourceVersion
    // +optional
    ResourceVersion *string `json:"resourceVersion,omitempty"`
}

storage.Preconditions 只有一个方法:func (p *Preconditions) Check(key string, obj runtime.Object) error,检查 obj 的 UID 和 ResourceVersion 是否与 Preconditions 一致。

在 PATCH 请求处理中,Preconditions 从 rest.UpdatedObjectInfo.Preconditions() 获取。

rest.UpdatedObjectInfo

tryUpdate 闭包中会调用 rest.UpdatedObjectInfo.UpdatedObject() 对资源对象执行更新操作。

// UpdatedObjectInfo provides information about an updated object to an Updater.
// It requires access to the old object in order to return the newly updated object.
type UpdatedObjectInfo interface {
    // Returns preconditions built from the updated object, if applicable.
    // May return nil, or a preconditions object containing nil fields,
    // if no preconditions can be determined from the updated object.
    Preconditions() *metav1.Preconditions

    // UpdatedObject returns the updated object, given a context and old object.
    // The only time an empty oldObj should be passed in is if a "create on update" is occurring (there is no oldObj).
    UpdatedObject(ctx context.Context, oldObj runtime.Object) (newObj runtime.Object, err error)
}

在 PATCH 请求处理中,rest.UpdatedObjectInfo 的实现为 rest.defaultUpdatedObjectInfo:

// defaultUpdatedObjectInfo implements UpdatedObjectInfo
type defaultUpdatedObjectInfo struct {
    // obj is the updated object
    obj runtime.Object

    // transformers is an optional list of transforming functions that modify or
    // replace obj using information from the context, old object, or other sources.
    transformers []TransformFunc
}

// DefaultUpdatedObjectInfo returns an UpdatedObjectInfo impl based on the specified object.
func DefaultUpdatedObjectInfo(obj runtime.Object, transformers ...TransformFunc) UpdatedObjectInfo {
    return &defaultUpdatedObjectInfo{obj, transformers}
}

// Preconditions satisfies the UpdatedObjectInfo interface.
// 注意这里只获取了 UID
func (i *defaultUpdatedObjectInfo) Preconditions() *metav1.Preconditions {
    // Attempt to get the UID out of the object
    accessor, err := meta.Accessor(i.obj)
    if err != nil {
        // If no UID can be read, no preconditions are possible
        return nil
    }

    // If empty, no preconditions needed
    uid := accessor.GetUID()
    if len(uid) == 0 {
        return nil
    }

    return &metav1.Preconditions{UID: &uid}
}

// UpdatedObject satisfies the UpdatedObjectInfo interface.
// It returns a copy of the held obj, passed through any configured transformers.
func (i *defaultUpdatedObjectInfo) UpdatedObject(ctx context.Context, oldObj runtime.Object) (runtime.Object, error) {
    var err error
    // Start with the configured object
    newObj := i.obj

    // If the original is non-nil (might be nil if the first transformer builds the object from the oldObj), make a copy,
    // so we don't return the original. BeforeUpdate can mutate the returned object, doing things like clearing ResourceVersion.
    // If we're re-called, we need to be able to return the pristine version.
    if newObj != nil {
        newObj = newObj.DeepCopyObject()
    }

    // Allow any configured transformers to update the new object
    for _, transformer := range i.transformers {
        newObj, err = transformer(ctx, newObj, oldObj) // if http.method == patch: patcher.applyPatch, patcher.applyAdmission
        if err != nil {
            return nil, err
        }
    }

    return newObj, nil
}

创建时 defaultUpdatedObjectInfo.obj 为 nil,则 defaultUpdatedObjectInfo.Preconditions() 返回 nil,即不会进行 Preconditions 检查。

// patchResource divides PatchResource for easier unit testing
func (p *patcher) patchResource(ctx context.Context, scope *RequestScope) (runtime.Object, bool, error) {
    ......
    p.updatedObjectInfo = rest.DefaultUpdatedObjectInfo(nil, p.applyPatch, p.applyAdmission)
    ......
}
tryUpdate 闭包更新
// Update performs an atomic update and set of the object. Returns the result of the update
// or an error. If the registry allows create-on-update, the create flow will be executed.
// A bool is returned along with the object and any errors, to indicate object creation.
func (e *Store) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
    ......
    err = e.Storage.GuaranteedUpdate(ctx, key, out, true, storagePreconditions, func(existing runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
        // Given the existing object, get the new object
        obj, err := objInfo.UpdatedObject(ctx, existing) // defaultUpdatedObjectInfo.UpdatedObject
        if err != nil {
            return nil, nil, err
        }

        // If AllowUnconditionalUpdate() is true and the object specified by
        // the user does not have a resource version, then we populate it with
        // the latest version. Else, we check that the version specified by
        // the user matches the version of latest storage object.
        resourceVersion, err := e.Storage.Versioner().ObjectResourceVersion(obj)
        if err != nil {
            return nil, nil, err
        }
        doUnconditionalUpdate := resourceVersion == 0 && e.UpdateStrategy.AllowUnconditionalUpdate()

        version, err := e.Storage.Versioner().ObjectResourceVersion(existing)
        if err != nil {
            return nil, nil, err
        }
        // version == 0 说明 key 对应的资源对象不存在
        if version == 0 {
            if !e.UpdateStrategy.AllowCreateOnUpdate() && !forceAllowCreate {
                return nil, nil, kubeerr.NewNotFound(qualifiedResource, name)
            }
            creating = true
            creatingObj = obj
            if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
                return nil, nil, err
            }
            // at this point we have a fully formed object.  It is time to call the validators that the apiserver
            // handling chain wants to enforce.
            if createValidation != nil {
                if err := createValidation(obj.DeepCopyObject()); err != nil {
                    return nil, nil, err
                }
            }
            ttl, err := e.calculateTTL(obj, 0, false)
            if err != nil {
                return nil, nil, err
            }

            return obj, &ttl, nil
        }

        creating = false
        creatingObj = nil
        if doUnconditionalUpdate {
            // Update the object's resource version to match the latest
            // storage object's resource version.
            // 无条件更新
            err = e.Storage.Versioner().UpdateObject(obj, res.ResourceVersion)
            if err != nil {
                return nil, nil, err
            }
        } else {
            // Check if the object's resource version matches the latest
            // resource version.
            if resourceVersion == 0 {
                // TODO: The Invalid error should have a field for Resource.
                // After that field is added, we should fill the Resource and
                // leave the Kind field empty. See the discussion in #18526.
                qualifiedKind := schema.GroupKind{Group: qualifiedResource.Group, Kind: qualifiedResource.Resource}
                fieldErrList := field.ErrorList{field.Invalid(field.NewPath("metadata").Child("resourceVersion"), resourceVersion, "must be specified for an update")}
                return nil, nil, kubeerr.NewInvalid(qualifiedKind, name, fieldErrList)
            }
            // 这里是关键,更新前后的资源对象版本不一致,说明出现了并发更新冲突
            // PATCH 请求中 resourceVersion 始终是等于 version 的!!!
            // PUT 请求中才可能会出现 resourceVersion != version
            if resourceVersion != version {
                return nil, nil, kubeerr.NewConflict(qualifiedResource, name, fmt.Errorf(OptimisticLockErrorMsg))
            }
        }
        if err := rest.BeforeUpdate(e.UpdateStrategy, ctx, obj, existing); err != nil {
            return nil, nil, err
        }
        // at this point we have a fully formed object.  It is time to call the validators that the apiserver
        // handling chain wants to enforce.
        if updateValidation != nil {
            if err := updateValidation(obj.DeepCopyObject(), existing.DeepCopyObject()); err != nil {
                return nil, nil, err
            }
        }
        // Check the default delete-during-update conditions, and store-specific conditions if provided
        if ShouldDeleteDuringUpdate(ctx, key, obj, existing) &&
            (e.ShouldDeleteDuringUpdate == nil || e.ShouldDeleteDuringUpdate(ctx, key, obj, existing)) {
            deleteObj = obj
            return nil, nil, errEmptiedFinalizers
        }
        ttl, err := e.calculateTTL(obj, res.TTL, true)
        if err != nil {
            return nil, nil, err
        }
        if int64(ttl) != res.TTL {
            return obj, &ttl, nil
        }
        return obj, nil, nil
    }, dryrun.IsDryRun(options.DryRun))
    ......
}
持久化到 Etcd
// GuaranteedUpdate implements storage.Interface.GuaranteedUpdate.
func (s *store) GuaranteedUpdate(
    ctx context.Context, key string, out runtime.Object, ignoreNotFound bool,
    preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, suggestion ...runtime.Object) error {
    trace := utiltrace.New(fmt.Sprintf("GuaranteedUpdate etcd3: %s", getTypeName(out)))
    defer trace.LogIfLong(500 * time.Millisecond)

    v, err := conversion.EnforcePtr(out) // out 必须是非 nil 值的指针类型
    if err != nil {
        panic("unable to convert output object to pointer")
    }
    key = path.Join(s.pathPrefix, key)

    // 从 Etcd 存储中获取 key 对象的状态
    getCurrentState := func() (*objState, error) {
        startTime := time.Now()
        getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
        metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
        if err != nil {
            return nil, err
        }
        return s.getState(getResp, key, v, ignoreNotFound)
    }

    var origState *objState
    var mustCheckData bool // = true 说明 origState 有可能不是最新的
    // 如果上层调用提供了 key 对象的值(从 k8s.io/apiserver/pkg/storage/cacher.Cacher.watchCache.GetByKey(key) 获取)
    // 则不需要访问 Etcd
    if len(suggestion) == 1 && suggestion[0] != nil {
        span.LogFields(log.Object("suggestion[0]", suggestion[0]))
        origState, err = s.getStateFromObject(suggestion[0])
        span.LogFields(log.Object("origState", origState))
        if err != nil {
            return err
        }
        mustCheckData = true
    } else {
        origState, err = getCurrentState()
        if err != nil {
            return err
        }
    }
    trace.Step("initial value restored")

    transformContext := authenticatedDataString(key)
    for {
        // 检查 origState.obj 的 UID、 ResourceVersion 是否与 preconditions 一致
        // PATCH 和 PUT 请求都不做检查!!!
        if err := preconditions.Check(key, origState.obj); err != nil {
            // If our data is already up to date, return the error
            // 如果 origState 已经是最新的状态了,则返回错误
            if !mustCheckData {
                return err
            }

            // 可能 origState 不是最新的状态,从 Etcd 获取最新的状态
            // It's possible we were working with stale data
            // Actually fetch
            origState, err = getCurrentState()
            if err != nil {
                return err
            }
            mustCheckData = false
            // Retry
            continue
        }

        // 在 origState.obj 的基础上进行修改
        ret, ttl, err := s.updateState(origState, tryUpdate)
        if err != nil {
            // origState 可能不是最新的状态,会从 Etcd 中获取最新的状态,再尝试更新一次
            // If our data is already up to date, return the error
            if !mustCheckData {
                return err
            }

            // It's possible we were working with stale data
            // Actually fetch
            origState, err = getCurrentState()
            if err != nil {
                return err
            }
            mustCheckData = false
            // Retry
            continue
        }

        data, err := runtime.Encode(s.codec, ret)
        if err != nil {
            return err
        }
        // 目前 origState.stale 始终为 false
        // 判断修改后的资源对象是否有变化
        if !origState.stale && bytes.Equal(data, origState.data) {
            // if we skipped the original Get in this loop, we must refresh from
            // etcd in order to be sure the data in the store is equivalent to
            // our desired serialization
            // mustCheckData = true 说明 origState 有可能不是最新的
            // 必须再确认一遍
            if mustCheckData {
                origState, err = getCurrentState()
                if err != nil {
                    return err
                }
                mustCheckData = false
                if !bytes.Equal(data, origState.data) {
                    // original data changed, restart loop
                    continue
                }
            }
            // recheck that the data from etcd is not stale before short-circuiting a write
            if !origState.stale {
                return decode(s.codec, s.versioner, origState.data, out, origState.rev)
            }
        }

        // 将对象序列化为二进制数据存储到 Etcd
        newData, err := s.transformer.TransformToStorage(data, transformContext)
        if err != nil {
            return storage.NewInternalError(err.Error())
        }

        // 设置 key 的过期时间
        opts, err := s.ttlOpts(ctx, int64(ttl))
        if err != nil {
            return err
        }
        trace.Step("Transaction prepared")

        span.LogFields(log.Uint64("ttl", ttl), log.Int64("origState.rev", origState.rev))

        // Etcd 事务
        // 注意这里会比较 Etcd 中的资源对象版本跟 origState.rev 是否一致
        // 如果一致,则更新
        // 否则,更新失败并获取当前最新的资源对象
        startTime := time.Now()
        txnResp, err := s.client.KV.Txn(ctx).If(
            clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
        ).Then(
            clientv3.OpPut(key, string(newData), opts...),
        ).Else(
            clientv3.OpGet(key),
        ).Commit()
        metrics.RecordEtcdRequestLatency("update", getTypeName(out), startTime)
        if err != nil {
            return err
        }
        trace.Step("Transaction committed")
        if !txnResp.Succeeded {
            // 事务执行失败
            getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
            klog.V(4).Infof("GuaranteedUpdate of %s failed because of a conflict, going to retry", key)
            // 获取最新的状态,并重试
            origState, err = s.getState(getResp, key, v, ignoreNotFound)
            if err != nil {
                return err
            }
            trace.Step("Retry value restored")
            mustCheckData = false
            continue
        }
        // 事务执行成功
        putResp := txnResp.Responses[0].GetResponsePut()

        return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
    }
}

分析总结

并发更新是如何确保更新不丢失的?

在将更新后的对象持久化到 Etcd 中时,通过事务保证的,当事务执行失败时会不断重试,事务的伪代码逻辑如下:

// oldObj = FromMemCache(key) or EtcdGet(key)
if inEtcd(key).rev == inMemory(oldObj).rev:
    EtcdSet(key) = newObj
    transaction = success
else:
    EtcdGet(key)
    transaction = fail
并发更新是如何做冲突检测的?

在上面的分析中,可以看到有两处冲突检测的判断:

  1. Preconditions
  2. tryUpdate 中的 resourceVersion != version

对于 kubectl apply 和 edit (发送的都是 PATCH 请求),创建的 Preconditions 是零值,所以不会通过 Preconditions 进行冲突检测,而在 tryUpdate 中调用 objInfo.UpdatedObject(ctx, existing) 得到的 newObj.rv 始终是等于 existing.rv 的,所以也不会进行冲突检测。

那什么时候会进行冲突检测?其实 kubectl 还有个 replace 的命令,通过抓包发现 replace 命令发送的是 PUT 请求,并且请求中会带有 resourceVersion:

PUT /apis/extensions/v1beta1/namespaces/default/deployments/nginx HTTP/1.1
Host: localhost:8080
User-Agent: kubectl/v0.0.0 (linux/amd64) kubernetes/$Format
Content-Length: 866
Accept: application/json
Content-Type: application/json
Uber-Trace-Id: 6e685772cc06fc16:2514dc54a474fe88:4f488c05a7cef9c8:1
Accept-Encoding: gzip

{"apiVersion":"extensions/v1beta1","kind":"Deployment","metadata":{"labels":{"app":"nginx"},"name":"nginx","namespace":"default","resourceVersion":"744603"},"spec":{"progressDeadlineSeconds":600,"replicas":1,"revisionHistoryLimit":10,"selector":{"matchLabels":{"app":"nginx"}},"strategy":{"rollingUpdate":{"maxSurge":"25%","maxUnavailable":"25%"},"type":"RollingUpdate"},"template":{"metadata":{"creationTimestamp":null,"labels":{"app":"nginx"}},"spec":{"containers":[{"env":[{"name":"DEMO_GREETING","value":"Hello from the environment#kubectl replace"}],"image":"nginx","imagePullPolicy":"IfNotPresent","name":"nginx","resources":{},"terminationMessagePath":"/dev/termination-log","terminationMessagePolicy":"File"}],"dnsPolicy":"ClusterFirst","restartPolicy":"Always","schedulerName":"default-scheduler","securityContext":{},"terminationGracePeriodSeconds":30}}}}

PUT 请求在 ApiServer 中的处理与 PATCH 请求类似,都是调用 k8s.io/apiserver/pkg/registry/generic/registry.(*Store).Update,创建的 rest.UpdatedObjectInfo 为 rest.DefaultUpdatedObjectInfo(obj, transformers…),注意这里传了 obj 参数值(通过 decode 请求 body 得到),而不是 nil。

在 PUT 请求处理中,创建的 Preconditions 也是零值,不会通过 Preconditions 做检查。但是在 tryUpdate 中,resourceVersion, err := e.Storage.Versioner().ObjectResourceVersion(obj) 得到的 resourceVersion 是请求 body 中的值,而不像 PATCH 请求一样,来自 existing(看下 defaultUpdatedObjectInfo.UpdatedObject 方法就明白了)。

所以在 PUT 请求处理中,会通过 tryUpdate 中的 resourceVersion != version,检测是否发生了并发写冲突。