Skip to content

Instantly share code, notes, and snippets.

@zoetrope
Last active October 2, 2023 04:15
Show Gist options
  • Star 9 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zoetrope/6134ba95486f6367e22de346ec99b631 to your computer and use it in GitHub Desktop.
Save zoetrope/6134ba95486f6367e22de346ec99b631 to your computer and use it in GitHub Desktop.
Kubernetesオペレータのアンチパターン&ベストプラクティス 補足資料

本資料は、CNDT2021「Kubernetesオペレータのアンチパターン&ベストプラクティス」の補足資料です。

プレゼンの中では説明しきれなかったベストプラクティスの実装を詳細に解説します。

結果が収束するように実装しよう

Reconcileの基本実装方針

プレゼンでも紹介したように、必ず現在の状態をチェックしてから実行すべき処理を決定することになります。 そのため、Reconcile関数は以下のように処理ごとに関数を分離し、失敗したら即座にエラーを返すという実装が一般的です。

https://github.com/cybozu-go/moco/blob/f638af0dfa46ee8c8e770f6744e18dd6543d2cc0/controllers/mysqlcluster_controller.go#L182-L231

  if err := r.reconcileV1Secret(ctx, req, cluster); err != nil {
		log.Error(err, "failed to reconcile secret")
		return ctrl.Result{}, err
	}

	if err := r.reconcileV1Certificate(ctx, req, cluster); err != nil {
		log.Error(err, "failed to reconcile certificate")
		return ctrl.Result{}, err
	}

	if err := r.reconcileV1GRPCSecret(ctx, req, cluster); err != nil {
		log.Error(err, "failed to reconcile gRPC secret")
		return ctrl.Result{}, err
	}

	mycnf, err := r.reconcileV1MyCnf(ctx, req, cluster)
	if err != nil {
		log.Error(err, "failed to reconcile my.conf config map")
		return ctrl.Result{}, err
	}

	if err := r.reconcileV1FluentBitConfigMap(ctx, req, cluster); err != nil {
		log.Error(err, "failed to reconcile config maps for fluent-bit")
		return ctrl.Result{}, err
	}

	if err := r.reconcileV1ServiceAccount(ctx, req, cluster); err != nil {
		return ctrl.Result{}, err
	}

	if err := r.reconcileV1Service(ctx, req, cluster); err != nil {
		return ctrl.Result{}, err
	}

	if err := r.reconcileV1StatefulSet(ctx, req, cluster, mycnf); err != nil {
		log.Error(err, "failed to reconcile stateful set")
		return ctrl.Result{}, err
	}

	if err := r.reconcileV1PDB(ctx, req, cluster); err != nil {
		return ctrl.Result{}, err
	}

	if err := r.reconcileV1BackupJob(ctx, req, cluster); err != nil {
		return ctrl.Result{}, err
	}

	if err := r.reconcileV1RestoreJob(ctx, req, cluster); err != nil {
		return ctrl.Result{}, err
	}

MOCOのクラスタリング処理

スライドでは、MOCOのクラスタリング処理を省略して記述していたため、ここでは全文記載します。 まず最初にMySQLのクラスタの状態を収集した後、ステータスを更新し、最後にMySQLの操作をおこなっています。

https://github.com/cybozu-go/moco/blob/f638af0dfa46ee8c8e770f6744e18dd6543d2cc0/clustering/process.go#L147-L209

func (p *managerProcess) do(ctx context.Context) (bool, error) {
	ss, err := p.GatherStatus(ctx)
	if err != nil {
		return false, err
	}
	defer ss.Close()

	if err := p.updateStatus(ctx, ss); err != nil {
		return false, fmt.Errorf("failed to update status fields in MySQLCluster: %w", err)
	}

	p.log.Info("cluster state is " + ss.State.String())
	switch ss.State {
	case StateCloning:
		if p.isCloning(ctx, ss) {
			return false, nil
		}

		redo, err := p.clone(ctx, ss)
		if err != nil {
			event.InitCloneFailed.Emit(ss.Cluster, p.recorder, err)
			return false, fmt.Errorf("failed to clone data: %w", err)
		}
		event.InitCloneSucceeded.Emit(ss.Cluster, p.recorder)
		return redo, nil

	case StateRestoring:
		return false, nil

	case StateHealthy, StateDegraded:
		if ss.NeedSwitch {
			if err := p.switchover(ctx, ss); err != nil {
				event.SwitchOverFailed.Emit(ss.Cluster, p.recorder, err)
				return false, fmt.Errorf("failed to switchover: %w", err)
			}
			event.SwitchOverSucceeded.Emit(ss.Cluster, p.recorder, ss.Candidate)
			// do not configure the cluster after a switchover.
			return true, nil
		}
		if ss.State == StateDegraded {
			return p.configure(ctx, ss)
		}
		return false, nil

	case StateFailed:
		// in this case, only applicable operation is a failover.
		if err := p.failover(ctx, ss); err != nil {
			event.FailOverFailed.Emit(ss.Cluster, p.recorder, err)
			return false, fmt.Errorf("failed to failover: %w", err)
		}
		event.FailOverSucceeded.Emit(ss.Cluster, p.recorder, ss.Candidate)
		return true, nil

	case StateLost:
		// nothing can be done
		return false, nil

	case StateIncomplete:
		return p.configure(ctx, ss)
	}

	return false, nil
}

上記のようにステータスの更新処理をおこなった後にMySQLの操作をおこなうと、その操作の結果によりクラスタの状態が変化した場合、カスタムリソースのステータスと実際のクラスタの状態にずれが生じる場合があります。 そこで上記の関数ではredoフラグを返すようにして、redoがtrueの場合は以下のように即座に再度更新処理を実行するようにしています。

https://github.com/cybozu-go/moco/blob/f638af0dfa46ee8c8e770f6744e18dd6543d2cc0/clustering/process.go#L123-L144

	for {
		select {
		case <-p.ch:
		case <-tick.C:
		case <-ctx.Done():
			p.log.Info("quit")
			return
		}

		p.metrics.checkCount.Inc()
		redo, err := p.do(ctx)
		if err != nil {
			p.metrics.errorCount.Inc()
			p.log.Error(err, "error")
			continue
		}

		if redo {
			// to update status quickly
			p.Update()
		}
	}

非同期処理やキャッシュを活用しよう

非同期処理の例

meowsではReconcile関数内で空のSecretリソースを作成し、GitHubから取得したトークンをSecretリソースに埋め込む処理を別途goroutineで実行しています。 以下のように、Secretリソースにトークンが埋め込まれていない場合は、RequeueAfter: 10 * time.Secondを即座に返し、10秒後にReconcileが実行されるようにしています。

https://github.com/cybozu-go/meows/blob/c734b64701e7529bb42b089f16c98e5598cb6804/controllers/runnerpool_controller.go#L108-L123

	isContinuation, err := r.reconcileSecret(ctx, log, rp)
	if err != nil {
		log.Error(err, "failed to reconcile secret")
		return ctrl.Result{}, err
	}
	if err := r.secretUpdater.start(ctx, rp); err != nil {
		log.Error(err, "failed to start secret updater")
		return ctrl.Result{}, err
	}
	if !isContinuation {
		log.Info("wait for the secret to be issued by secret updater")
		return ctrl.Result{
			Requeue:      true,
			RequeueAfter: 10 * time.Second,
		}, nil
	}

https://github.com/cybozu-go/meows/blob/427cafc979213abea725513fb2b01c7fae64ecd8/controllers/secret_updater.go#L159-L186

func (p *updateProcess) updateSecret(ctx context.Context) error {
	s, err := p.getSecret(ctx)
	if err != nil {
		return err
	}

	runnerToken, err := p.githubClient.CreateRegistrationToken(ctx, p.repositoryName)
	if err != nil {
		p.log.Error(err, "failed to create actions registration token", "repository", p.repositoryName)
		return err
	}

	newS := s.DeepCopy()
	newS.Annotations = mergeMap(s.Annotations, map[string]string{
		constants.RunnerSecretExpiresAtAnnotationKey: runnerToken.GetExpiresAt().Format(time.RFC3339),
	})
	newS.StringData = map[string]string{
		"runnertoken": runnerToken.GetToken(),
	}
	patch := client.MergeFrom(s)

	err = p.client.Patch(ctx, newS, patch)
	if err != nil {
		p.log.Error(err, "failed to patch secret")
		return err
	}
	return nil
}

MOCOの非同期処理

MOCOでは実行に時間のかかるMySQLの操作処理は、goroutineを立ち上げて処理しています。 Reconcile関数から、下記のclusterManagerupdate関数を呼び出し、MySQLクラスタごとにgoroutineを立ち上げています。

https://github.com/cybozu-go/moco/blob/f638af0dfa46ee8c8e770f6744e18dd6543d2cc0/clustering/manager.go#L70-L94

func (m *clusterManager) update(ctx context.Context, name types.NamespacedName, noStart bool) {
	m.mu.Lock()
	defer m.mu.Unlock()

	key := name.String()
	p, ok := m.processes[key]
	if ok {
		p.Update()
		return
	}
	if noStart {
		return
	}

	ctx, cancel := context.WithCancel(ctx)

	p = newManagerProcess(m.client, m.reader, m.recorder, m.dbf, m.agentf, name, m.log.WithName(key), cancel)
	m.wg.Add(1)
	go func() {
		p.Start(ctx, m.interval)
		m.wg.Done()
	}()
	m.processes[key] = p
	p.Update()
}

HNCにおけるキャッシュ処理

HNC(Hierarchical Namespace Controller)では、大量のリソースを扱えるようにするために、リソースのツリー構造をインメモリで自前管理しています。 一方、Accurateはclient-go/controller-runtimeが提供するキャッシュ機能のみを利用しています。

HNCとAccurateの性能比較をおこなってみましたが、client-go/controller-runtimeが提供するキャッシュ機能のみでも十分な性能を得ることはできそうです。 詳しくは以下の記事をご覧ください。 HNCとAccurateのパフォーマンス比較

不要になったリソースは片付けよう

Owner Reference

MOCOのOwner Referenceの例です。

https://github.com/cybozu-go/moco/blob/f638af0dfa46ee8c8e770f6744e18dd6543d2cc0/controllers/mysqlcluster_controller.go#L465-L468

  result, err := ctrl.CreateOrUpdate(ctx, r.Client, sa, func() error {
		sa.Labels = mergeMap(sa.Labels, labelSet(cluster, false))
		return ctrl.SetControllerReference(cluster, sa, r.Scheme)
	})

Finalizer(Kubernetes外のリソースの削除)

meowsのFinalizerの実装例です。 Kubernetesのリソースではなく、GitHub APIで登録された情報を削除するためにFinalizerを利用しています。

https://github.com/cybozu-go/meows/blob/c734b64701e7529bb42b089f16c98e5598cb6804/controllers/runnerpool_controller.go#L76-L101

   if rp.ObjectMeta.DeletionTimestamp != nil {
		if !controllerutil.ContainsFinalizer(rp, constants.RunnerPoolFinalizer) {
			return ctrl.Result{}, nil
		}

		log.Info("start finalizing RunnerPool")

		if err := r.runnerManager.Stop(ctx, rp); err != nil {
			log.Error(err, "failed to stop runner manager")
			return ctrl.Result{}, err
		}

		if err := r.secretUpdater.stop(ctx, rp); err != nil {
			log.Error(err, "failed to stop secret updater")
			return ctrl.Result{}, err
		}

		controllerutil.RemoveFinalizer(rp, constants.RunnerPoolFinalizer)
		if err := r.Update(ctx, rp); err != nil {
			log.Error(err, "failed to remove finalizer")
			return ctrl.Result{}, err
		}

		log.Info("finalizing RunnerPool is completed")
		return ctrl.Result{}, nil
	}

https://github.com/cybozu-go/meows/blob/c734b64701e7529bb42b089f16c98e5598cb6804/controllers/runnermanager.go#L95

func (m *RunnerManagerImpl) Stop(ctx context.Context, rp *meowsv1alpha1.RunnerPool) error {
	rpNamespacedName := namespacedName(rp.Namespace, rp.Name)
	if loop, ok := m.loops[rpNamespacedName]; ok {
		if err := loop.stop(ctx); err != nil {
			return err
		}
		delete(m.loops, rpNamespacedName)
	}

	runnerList, err := m.githubClient.ListRunners(ctx, rp.Spec.RepositoryName, []string{rpNamespacedName})
	if err != nil {
		m.log.Error(err, "failed to list runners")
		return err
	}
	for _, runner := range runnerList {
		err := m.githubClient.RemoveRunner(ctx, rp.Spec.RepositoryName, runner.ID)
		if err != nil {
			m.log.Error(err, "failed to remove runner", "runner", runner.Name, "runner_id", runner.ID)
			return err
		}
		m.log.Info("removed runner", "runner", runner.Name, "runner_id", runner.ID)
	}
	return nil
}

Finalizer(スコープの異なるリソースの削除)

Accurateでは、親と子でリソースのスコープが異なる(SubNamespaceはnamespacedリソース、Namespaceはcluster-wideリソース)ため、Finalzierを利用してリソースの削除をおこなっています。

https://github.com/cybozu-go/accurate/blob/a8bdffc1fd52e06bc8073bdc7686f277bb33729c/controllers/subnamespace_controller.go#L58-L91

func (r *SubNamespaceReconciler) finalize(ctx context.Context, sn *accuratev1.SubNamespace) error {
	if !controllerutil.ContainsFinalizer(sn, constants.Finalizer) {
		return nil
	}

	logger := log.FromContext(ctx)

	ns := &corev1.Namespace{}
	if err := r.Get(ctx, types.NamespacedName{Name: sn.Name}, ns); err != nil {
		if !apierrors.IsNotFound(err) {
			return err
		}
		goto DELETE
	}

	if ns.DeletionTimestamp != nil {
		goto DELETE
	}

	if parent := ns.Labels[constants.LabelParent]; parent != sn.Namespace {
		logger.Info("finalization: ignored non-child namespace", "parent", parent)
		goto DELETE
	}

	if err := r.Delete(ctx, ns); err != nil {
		return fmt.Errorf("failed to delete namespace %s: %w", sn.Name, err)
	}

	logger.Info("deleted namespace", "name", sn.Name)

DELETE:
	controllerutil.RemoveFinalizer(sn, constants.Finalizer)
	return r.Update(ctx, sn)
}

定期的な削除処理

coilでは、結びつくノードが存在しないアドレスブロックを定期的に削除しています。

https://github.com/cybozu-go/coil/blob/52754691fa2a88f208b0cfddc9b3cb9bb3897d69/v2/runners/garbage_collector.go#L63-L96

func (gc *garbageCollector) do(ctx context.Context) error {
	gc.log.Info("start garbage collection")

	blocks := &coilv2.AddressBlockList{}
	if err := gc.Client.List(ctx, blocks); err != nil {
		return fmt.Errorf("failed to list address blocks: %w", err)
	}

	nodes := &corev1.NodeList{}
	if err := gc.apiReader.List(ctx, nodes); err != nil {
		return fmt.Errorf("failed to list nodes: %w", err)
	}

	nodeNames := make(map[string]bool)
	for _, n := range nodes.Items {
		nodeNames[n.Name] = true
	}

	for _, b := range blocks.Items {
		n := b.Labels[constants.LabelNode]
		if nodeNames[n] {
			continue
		}

		err := gc.deleteBlock(ctx, b.Name)
		if err != nil {
			return fmt.Errorf("failed to delete a block: %w", err)
		}

		gc.log.Info("deleted an orphan block", "block", b.Name, "node", n)
	}

	return nil
}

なお、Nodeリソースのコントローラを実装すればFinalizerでアドレスブロックの削除をおこなうことも可能です。 ただし、Nodeリソースは非常に頻繁に更新がおこなわれるため、コントローラの負荷が高くなりやすく、coilではFinalzierではなく定期的なチェックをおこなうようにしています。

Server-Side Apply(SSA)を活用しよう

SSAを利用しない例

SSAを利用せずに丁寧にフィールドの差分チェックをおこなうのは非常に手間がかかります。 例えばmocoのServiceリソースのReconcile処理では、以下のように各フィールドごとに値のチェックをして代入しています。

https://github.com/cybozu-go/moco/blob/c9013554e92fbc97f4d29b0beb67413b16484ab9/controllers/mysqlcluster_controller.go#L498-L598

func (r *MySQLClusterReconciler) reconcileV1Service1(ctx context.Context, cluster *mocov1beta1.MySQLCluster, name string, headless bool, selector map[string]string) error {
	log := crlog.FromContext(ctx)

	svc := &corev1.Service{}
	svc.Namespace = cluster.Namespace
	svc.Name = name
	var orig, updated *corev1.ServiceSpec
	result, err := ctrl.CreateOrUpdate(ctx, r.Client, svc, func() error {
		if debugController {
			orig = svc.Spec.DeepCopy()
		}

		saSpec := &corev1.ServiceSpec{}
		tmpl := cluster.Spec.ServiceTemplate
		if !headless && tmpl != nil {
			svc.Annotations = mergeMap(svc.Annotations, tmpl.Annotations)
			svc.Labels = mergeMap(svc.Labels, tmpl.Labels)
			svc.Labels = mergeMap(svc.Labels, labelSet(cluster, false))

			if tmpl.Spec != nil {
				tmpl.Spec.DeepCopyInto(saSpec)
			}
		} else {
			svc.Labels = mergeMap(svc.Labels, labelSet(cluster, false))
		}

		if headless {
			saSpec.ClusterIP = corev1.ClusterIPNone
			saSpec.ClusterIPs = svc.Spec.ClusterIPs
			saSpec.Type = corev1.ServiceTypeClusterIP
			saSpec.PublishNotReadyAddresses = true
		} else {
			saSpec.ClusterIP = svc.Spec.ClusterIP
			saSpec.ClusterIPs = svc.Spec.ClusterIPs
			if len(saSpec.Type) == 0 {
				saSpec.Type = svc.Spec.Type
			}
		}
		if len(saSpec.SessionAffinity) == 0 {
			saSpec.SessionAffinity = svc.Spec.SessionAffinity
		}
		if len(saSpec.ExternalTrafficPolicy) == 0 {
			saSpec.ExternalTrafficPolicy = svc.Spec.ExternalTrafficPolicy
		}
		if saSpec.HealthCheckNodePort == 0 {
			saSpec.HealthCheckNodePort = svc.Spec.HealthCheckNodePort
		}
		if saSpec.IPFamilies == nil {
			saSpec.IPFamilies = svc.Spec.IPFamilies
		}
		if saSpec.IPFamilyPolicy == nil {
			saSpec.IPFamilyPolicy = svc.Spec.IPFamilyPolicy
		}
		saSpec.Selector = selector

		var mysqlNodePort, mysqlXNodePort int32
		for _, p := range svc.Spec.Ports {
			switch p.Name {
			case constants.MySQLPortName:
				mysqlNodePort = p.NodePort
			case constants.MySQLXPortName:
				mysqlXNodePort = p.NodePort
			}
		}
		saSpec.Ports = []corev1.ServicePort{
			{
				Name:       constants.MySQLPortName,
				Protocol:   corev1.ProtocolTCP,
				Port:       constants.MySQLPort,
				TargetPort: intstr.FromString(constants.MySQLPortName),
				NodePort:   mysqlNodePort,
			},
			{
				Name:       constants.MySQLXPortName,
				Protocol:   corev1.ProtocolTCP,
				Port:       constants.MySQLXPort,
				TargetPort: intstr.FromString(constants.MySQLXPortName),
				NodePort:   mysqlXNodePort,
			},
		}

		saSpec.DeepCopyInto(&svc.Spec)

		if debugController {
			updated = svc.Spec.DeepCopy()
		}

		return ctrl.SetControllerReference(cluster, svc, r.Scheme)
	})
	if err != nil {
		return fmt.Errorf("failed to reconcile %s service: %w", name, err)
	}
	if result != controllerutil.OperationResultNone {
		log.Info("reconciled service", "name", name, "operation", string(result))
	}
	if result == controllerutil.OperationResultUpdated && debugController {
		fmt.Println(cmp.Diff(orig, updated))
	}

	return nil
}

SSAの例

SSAを利用したServiceリソースの更新処理の例です。

https://github.com/zoetrope/kubebuilder-training/blob/9194c1193c4dd0048a109f6d46bcc52f367604bc/codes/markdown-view/controllers/markdownview_controller.go#L238-L294

func (r *MarkdownViewReconciler) reconcileService(ctx context.Context, mdView viewv1.MarkdownView) error {
	logger := log.FromContext(ctx)
	svcName := "viewer-" + mdView.Name

	owner, err := ownerRef(mdView, r.Scheme)
	if err != nil {
		return err
	}

	svc := corev1apply.Service(svcName, mdView.Namespace).
		WithLabels(labelSet(mdView)).
		WithOwnerReferences(owner).
		WithSpec(corev1apply.ServiceSpec().
			WithSelector(labelSet(mdView)).
			WithType(corev1.ServiceTypeClusterIP).
			WithPorts(corev1apply.ServicePort().
				WithProtocol(corev1.ProtocolTCP).
				WithPort(80).
				WithTargetPort(intstr.FromInt(3000)),
			),
		)

	obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(svc)
	if err != nil {
		return err
	}
	patch := &unstructured.Unstructured{
		Object: obj,
	}

	var current corev1.Service
	err = r.Get(ctx, client.ObjectKey{Namespace: mdView.Namespace, Name: svcName}, &current)
	if err != nil && !errors.IsNotFound(err) {
		return err
	}

	currApplyConfig, err := corev1apply.ExtractService(&current, constants.ControllerName)
	if err != nil {
		return err
	}

	if equality.Semantic.DeepEqual(svc, currApplyConfig) {
		return nil
	}

	err = r.Patch(ctx, patch, client.Apply, &client.PatchOptions{
		FieldManager: constants.ControllerName,
		Force: pointer.Bool(true),
	})
	if err != nil {
		logger.Error(err, "unable to create or update Service")
		return err
	}

	logger.Info("reconcile Service successfully", "name", mdView.Name)
	return nil
}

適切に情報を出力しよう

MOCOでは、以下のようにMySQLクラスタの状態に応じて、適切にカスタムリソースのステータスをセットし、必要なメトリクスを出力しています。

https://github.com/cybozu-go/moco/blob/f638af0dfa46ee8c8e770f6744e18dd6543d2cc0/clustering/process.go#L211-L312

func (p *managerProcess) updateStatus(ctx context.Context, ss *StatusSet) error {
	bs := &ss.Cluster.Status.Backup
	if !bs.Time.IsZero() {
		p.metrics.backupTimestamp.Set(float64(bs.Time.Unix()))
		p.metrics.backupElapsed.Set(bs.Elapsed.Seconds())
		p.metrics.backupDumpSize.Set(float64(bs.DumpSize))
		p.metrics.backupBinlogSize.Set(float64(bs.BinlogSize))
		p.metrics.backupWorkDirUsage.Set(float64(bs.WorkDirUsage))
		p.metrics.backupWarnings.Set(float64(len(bs.Warnings)))
	}

	now := metav1.Now()
	ststr := ss.State.String()
	updateCond := func(typ mocov1beta1.MySQLClusterConditionType, val corev1.ConditionStatus, current []mocov1beta1.MySQLClusterCondition) mocov1beta1.MySQLClusterCondition {
		updated := mocov1beta1.MySQLClusterCondition{
			Type:               typ,
			Status:             val,
			Reason:             ststr,
			Message:            "the current state is " + ststr,
			LastTransitionTime: now,
		}

		for _, cond := range current {
			if cond.Type != typ {
				continue
			}
			if cond.Status == val {
				updated.LastTransitionTime = cond.LastTransitionTime
			}
			break
		}
		return updated
	}

	return retry.RetryOnConflict(retry.DefaultRetry, func() error {
		cluster := &mocov1beta1.MySQLCluster{}
		if err := p.reader.Get(ctx, p.name, cluster); err != nil {
			return err
		}
		orig := cluster.DeepCopy()

		initialized := corev1.ConditionTrue
		available := corev1.ConditionFalse
		healthy := corev1.ConditionFalse
		switch ss.State {
		case StateCloning, StateRestoring:
			initialized = corev1.ConditionFalse
		case StateHealthy:
			available = corev1.ConditionTrue
			healthy = corev1.ConditionTrue
		case StateDegraded:
			available = corev1.ConditionTrue
		case StateFailed:
		case StateLost:
		case StateIncomplete:
		}
		conditions := []mocov1beta1.MySQLClusterCondition{
			updateCond(mocov1beta1.ConditionInitialized, initialized, cluster.Status.Conditions),
			updateCond(mocov1beta1.ConditionAvailable, available, cluster.Status.Conditions),
			updateCond(mocov1beta1.ConditionHealthy, healthy, cluster.Status.Conditions),
		}
		cluster.Status.Conditions = conditions
		if available == corev1.ConditionTrue {
			p.metrics.available.Set(1)
		} else {
			p.metrics.available.Set(0)
		}
		if healthy == corev1.ConditionTrue {
			p.metrics.healthy.Set(1)
		} else {
			p.metrics.healthy.Set(0)
		}

		var syncedReplicas int
		for _, pod := range ss.Pods {
			if isPodReady(pod) {
				syncedReplicas++
			}
		}
		cluster.Status.SyncedReplicas = syncedReplicas
		cluster.Status.ErrantReplicas = len(ss.Errants)
		cluster.Status.ErrantReplicaList = ss.Errants
		p.metrics.replicas.Set(float64(len(ss.Pods)))
		p.metrics.readyReplicas.Set(float64(syncedReplicas))
		p.metrics.errantReplicas.Set(float64(len(ss.Errants)))

		// the completion of initial cloning is recorded in the status
		// to make it possible to determine the cloning status even while
		// the primary instance is down.
		if cluster.Spec.ReplicationSourceSecretName != nil && ss.State != StateCloning {
			cluster.Status.Cloned = true
		}

		// if nothing has changed, skip updating.
		if equality.Semantic.DeepEqual(orig, cluster) {
			return nil
		}

		p.log.Info("update the status information")
		return p.client.Status().Update(ctx, cluster)
	})
}

また、以下のようにFailOverに成功した場合や失敗した場合にはKubernetesのEventリソースを作成しています。

https://github.com/cybozu-go/moco/blob/f638af0dfa46ee8c8e770f6744e18dd6543d2cc0/clustering/process.go#L191-L198

  case StateFailed:
		// in this case, only applicable operation is a failover.
		if err := p.failover(ctx, ss); err != nil {
			event.FailOverFailed.Emit(ss.Cluster, p.recorder, err)
			return false, fmt.Errorf("failed to failover: %w", err)
		}
		event.FailOverSucceeded.Emit(ss.Cluster, p.recorder, ss.Candidate)
		return true, nil

controller-runtimeは、カスタムコントローラを実装する際に便利なロギング機能を提供しています。以下の記事も参考にしてみてください。

controller-runtimeのロギング機能

様々な障害を考慮したテストを書こう

MOCOでは様々な障害を想定したテストを書いています。 以下のテストでは、Primaryインスタンスのデータ(PVC)を削除し、そのときにFailOverが実行され別のReplicaがPrimaryに昇格し、クラスタが正常に復旧することを確認しています。

https://github.com/cybozu-go/moco/blob/26dad4899222ca2666e468a779e42cd271e4484e/e2e/replication_test.go#L198-L254

	It("should do a failover if the primary lost data", func() {
		cluster, err := getCluster("repl", "test")
		Expect(err).NotTo(HaveOccurred())
		primary := cluster.Status.CurrentPrimaryIndex

		kubectlSafe(nil, "delete", "-n", "repl", "--wait=false", "pvc", fmt.Sprintf("mysql-data-moco-test-%d", primary))
		kubectlSafe(nil, "delete", "-n", "repl", "--grace-period=1", "pod", cluster.PodName(primary))

		Eventually(func() error {
			out, err := kubectl(nil, "-n", "repl", "get", "pod", cluster.PodName(primary), "-o", "json")
			if err != nil {
				return err
			}
			pod := &corev1.Pod{}
			err = json.Unmarshal(out, pod)
			if err != nil {
				return err
			}
			for _, cond := range pod.Status.Conditions {
				if cond.Type != corev1.PodScheduled {
					continue
				}
				if cond.Reason == "Unschedulable" {
					fmt.Println("re-deleting pending pod...")
					_, err := kubectl(nil, "delete", "-n", "repl", "--grace-period=1", "pod", cluster.PodName(primary))
					if err != nil {
						return fmt.Errorf("failed to delete pod: %w", err)
					}
					return errors.New("pod is unschedulable")
				}
				if cond.Status == corev1.ConditionTrue {
					return nil
				}
			}
			return errors.New("no pod scheduled status")
		}).Should(Succeed())

		Eventually(func() error {
			cluster, err := getCluster("repl", "test")
			if err != nil {
				return err
			}
			if cluster.Status.CurrentPrimaryIndex == primary {
				return fmt.Errorf("primary is not changed from %d", primary)
			}
			for _, cond := range cluster.Status.Conditions {
				if cond.Type != mocov1beta1.ConditionAvailable {
					continue
				}
				if cond.Status == corev1.ConditionTrue {
					return nil
				}
				return fmt.Errorf("cluster is not available: %s", cond.Status)
			}
			return errors.New("no available condition")
		}).Should(Succeed())
	})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment