golang

Выходим за рамки: создание оператора для наблюдения за внешними ресурсами в Kubernetes

  • пятница, 7 марта 2025 г. в 00:00:14
https://habr.com/ru/companies/flant/articles/884566/

Привет! Я Александр Зверев, инженер архитектурных решений в компании «Флант». Сегодня поговорим про всеми любимый Kubernetes. Этот оркестратор стал таким популярным не только потому что он эффективно управляет контейнерами. Ещё его можно прокачать под себя. И для этого у него есть все необходимые инструменты. Новые возможности можно добавить с помощью контроллеров, операторов и создания новых типов ресурсов.

При этом уже есть проекты, которые пошли дальше и научились управлять ресурсами за пределами кластера. Например, Deckhouse Commander, Argo CD, Crossplane. Они позволяют описывать ресурсы в кластере, а операторы «идут» во внешний мир и развёртывают новые кластеры или приложения.

В статье попробуем научить контроллер взаимодействовать с внешним миром. Мы создадим простой оператор, который будет следить за доступностью HTTP-серверов вне кластера, используя kubebuilder. Статья будет полезна тем, кто уже пробовал писать свои контроллеры Kubernetes и хочет расширить их возможности.

Анатомия контроллера: знакомство с kubebuilder

В своё время для написания контролеров появилось много подходов, библиотек и фреймворков. Среди популярных библиотек выделю controller-runtime, а у фреймворков можно выделить kubebuilder и operator SDK, созданные на базе controller-runtime.

Давайте ближе познакомимся с kubebuilder. Верхнеуровнево созданный на нём контроллер включает в себя:

  • менеджер, который организовывает работу других компонентов и кэширование, осуществляет leader-election, если в нём есть необходимость, и отвечает за обработку сигналов от ОС;

  • вебхуки, являющиеся опциональными, отвечают за установку пропущенных полей в спецификации и валидацию ресурсов. Они останутся за рамками данной статьи;

  • один и более контроллеров, которые «следят» за ресурсами определённого типа в кластере и дочерними ресурсами.

Про контроллеры поговорим подробнее. Их основная задача — получать события, связанные с нужными ресурсами: создание, удаление, изменения. Дальше осуществляется фильтрация этих событий. И затем событие передаётся reconciler'у, который сравнивает желаемое состояние ресурса с существующим в кластере и пытается привести к желаемому, если они расходятся. 

Когда контроллер реагирует на изменения внутри кластера, всё просто. У него есть методы For и Owns, которые за это отвечают. А что касается кейса, когда контроллер реагирует на изменения вне кластера, тут информации мало. Этот пробел я и постараюсь исправить.

Задача: мониторинг HTTP-серверов кластера

Чтобы показать, как контроллеры управляют внешними ресурсами, я придумал простую задачку. Главное — понять принцип работы. Представим, что у нас есть ресурсы, которые описывают HTTP-серверы вне кластера. Контроллер постоянно проверяет, доступны ли эти серверы, и обновляет статус соответствующих ресурсов в кластере.

Процесс инсталляции kubebuilder под различные платформы описан в официальной документации.

Инициализируем проект в kubebuilder:

mkdir probes && cd probes
kubebuilder init --domain network.io --repo probes

Создаём наш ресурс:

kubebuilder create api --group test --version v1alpha1 --kind WebChecker --namespaced=true

Система задаст два вопроса про создание ресурса и контроллера. Отвечаем утвердительно.

На выходе получаем сгенерированный проект на Go со всей необходимой структурой и Makefile. Открываем проект в любимом редакторе и находим файл probes/api/v1alpha1/webchecker_types.go. В нём мы опишем структуру нашего ресурса, а затем сгенерируем нужные методы, CustomResourceDefinitions и манифесты.

Отредактируем структуры, описывающие спецификации и статус ресурса, следующим образом:

type WebCheckerSpec struct {
	Host string `json:"host"`
	// +kubebuilder:default:="/"
	Path   string `json:"path,omitempty"`
}

type WebCheckerStatus struct {
	Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"`
}

type WebChecker struct {
	metav1.TypeMeta   `json:",inline"`
	metav1.ObjectMeta `json:"metadata,omitempty"`

	Spec   WebCheckerSpec   `json:"spec,omitempty"`
	Status WebCheckerStatus `json:"status,omitempty"`
}

Запустим кодогенерацию:

make generate && make manifests

В файле config/crd/bases/test.network.io_webcheckers.yaml сгенерируется CustomResourceDefinintion. Также в файле api/v1alpha1/zz_generated.deepcopy.go будут сгенерированы методы, реализующие DeepCopy.

Файл main.go мы редактировать не будем. Предлагаю познакомиться с его содержимым самостоятельно. В нём считываются ключи запуска, закодирован выбор лидера, запускается единственный наш контроллер и регистрируются ответные действия на сигналы от ОС.

Сердце оператора

А мы переходим к файлу internal/controller/webchecker_controller.go, так как там находится всё, что интересует нас сейчас.

Определяем типы и константы:

package controller

...

const (
	WebCheckerWorkersCount           = 2
	WebCheckerProbeTaskChannelSize   = 100
	WebCheckerProbeResultChannelSize = 100
)

type WebCheckerState struct {
	Host          string
	Path          string
	LastCheckTime time.Time
	IsSuccessful  bool
	LastError     string
}

type WebCheckerProbeTask struct {
	NamespacedName types.NamespacedName
	Host           string
	Path           string
}

type WebCheckerProbeResult struct {
	NamespacedName types.NamespacedName
	IsSuccessful   bool
	LastError      string
}

Наш основной тип данных, который содержит все необходимые поля и определяет методы для reconcile:

// WebCheckerReconciler reconciles a WebChecker object
type WebCheckerReconciler struct {
	client.Client
	Scheme            *runtime.Scheme
	events            chan event.GenericEvent
	mu                sync.RWMutex
	webCheckersStates map[types.NamespacedName]WebCheckerState
	// канал для задач на проверку
	tasks chan WebCheckerProbeTask
	// канал для результатов проверки
	tasksResults chan WebCheckerProbeResult
	cancelFunc   context.CancelFunc
}

Ниже основной метод для нашего контроллера. В нём определено, как реагировать на добавление, удаление или изменение ресурсов в кластере. В нашем примере этот метод ещё и обрабатывает запросы от внешних раздражителей:

func (r *WebCheckerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	logger := log.FromContext(ctx)
	logger.Info("Reconciling WebChecker", "request", req)

	var reconciledResource testv1alpha1.WebChecker
	// Вместо ресурса в метод Reconcile приходит запрос с указанием пространства имён / имени ресурса,
	// поэтому нужно запросить ресурс из API кластера.
	if err := r.Get(ctx, req.NamespacedName, &reconciledResource); err != nil {
		logger.Error(err, "Failed to get WebChecker resource")
		if errors.IsNotFound(err) {
			r.mu.Lock()
			delete(r.webCheckersStates, req.NamespacedName)
			r.mu.Unlock()
		}
		return ctrl.Result{}, client.IgnoreNotFound(err)
	}

	// Если ресурс помечен на удаление, то удаляем запись о нём из контроллера,
	// чтобы по нему не формировались задачи на проверку.
	if reconciledResource.DeletionTimestamp != nil {
		logger.Info("WebChecker resource is being deleted")
		r.mu.Lock()
		delete(r.webCheckersStates, req.NamespacedName)
		r.mu.Unlock()
		return ctrl.Result{}, nil
	}

	// Если ресурс создан в кластере, добавляем его в список проверяемых ресурсов.
	r.mu.Lock()
	defer r.mu.Unlock()
	var cachedState WebCheckerState
	var ok bool
	if cachedState, ok = r.webCheckersStates[req.NamespacedName]; !ok {
		r.webCheckersStates[req.NamespacedName] = WebCheckerState{
			Host: reconciledResource.Spec.Host,
			Path: reconciledResource.Spec.Path,
		}
		return ctrl.Result{}, nil
	}

	if cachedState.Host != reconciledResource.Spec.Host || cachedState.Path != reconciledResource.Spec.Path {
		r.webCheckersStates[req.NamespacedName] = WebCheckerState{
			Host: reconciledResource.Spec.Host,
			Path: reconciledResource.Spec.Path,
		}
		return ctrl.Result{}, nil
	}

	// Если запрос пришёл из канала событий, необходимо обновить статус ресурса.
	webCheckerStatus := testv1alpha1.WebCheckerStatus{
		Conditions: []metav1.Condition{
			statusFromWebCheckerState(cachedState),
		},
	}

	reconciledResource.Status = webCheckerStatus
	err := r.Status().Update(ctx, &reconciledResource)
	if err != nil {
		logger.Error(err, "Failed to update WebChecker status")
		return ctrl.Result{}, err
	}
	return ctrl.Result{}, nil

}

Небольшая вспомогательная функция, которая формирует статус ресурса:

func statusFromWebCheckerState(webCheckerState WebCheckerState) metav1.Condition {
	cond := metav1.Condition{
		LastTransitionTime: metav1.Now(),
		Type:               "Ready",
		Reason:             "WebResourceReady",
	}
	if webCheckerState.IsSuccessful {
		cond.Status = metav1.ConditionTrue
		cond.Message = "WebChecker is ready"
	} else {
		cond.Status = metav1.ConditionFalse
		cond.Message = webCheckerState.LastError
	}
	return cond
}

Ниже метод, который реализован в виде удобного билдера. Позволяет указать, какие ресурсы будут обрабатываться в кластере. В нашем случае, помимо обычных операций добавления и удаления, мы добавляем дополнительный канал для событий, в который собираются данные от проверок внешних серверов:

func (r *WebCheckerReconciler) SetupWithManager(mgr ctrl.Manager) error {
	return ctrl.NewControllerManagedBy(mgr).
		For(&testv1alpha1.WebChecker{}).
		Named("webchecker").
		// Именно WatchesRawSource отвечает за обработку "сторонних событий".
		WatchesRawSource(source.Channel(r.events, &handler.EnqueueRequestForObject{})).
		Complete(r)
}

Воркер, который выполняет проверку внешних серверов:

func (r *WebCheckerReconciler) RunTasksWorker(ctx context.Context) {
	logger := log.FromContext(ctx)
	logger.Info("Starting tasks worker")
	for task := range r.tasks {
		logger.Info("Processing task", "task", task)
		checker := WebCheckerProbe{
			NamespacedName: task.NamespacedName,
			Host:           task.Host,
			Path:           task.Path,
		}
		taskResult := checker.PerformCheck(ctx)
		r.tasksResults <- taskResult
	}
}

Запускаем все горутины, которые будут формировать задачи на проверку, выполнять проверку и обрабатывать результаты:

func (r *WebCheckerReconciler) RunWorkers(ctx context.Context) {
	ctx, cancel := context.WithCancel(ctx)
	r.cancelFunc = cancel

	go r.runTasksScheduler(ctx)

	for i := 0; i < WebCheckerWorkersCount; i++ {
		go r.RunTasksWorker(ctx)
	}

	go r.runTaskResultsAnalyzer(ctx)
}

Данный метод анализирует результаты проверки и записывает их в канал событий:

func (r *WebCheckerReconciler) runTaskResultsAnalyzer(ctx context.Context) {
	logger := log.FromContext(ctx)
	logger.Info("Starting task results analyzer")
	for result := range r.tasksResults {
		r.mu.Lock()
		if resultDetail, ok := r.webCheckersStates[result.NamespacedName]; !ok {
			logger.Info("WebChecker resource not found", "namespacedName", result.NamespacedName)
		} else {
			resultDetail.LastCheckTime = time.Now()
			resultDetail.IsSuccessful = result.IsSuccessful
			resultDetail.LastError = result.LastError
			r.webCheckersStates[result.NamespacedName] = resultDetail
			// Отправляем событие в канал, чтобы контроллер в reconcile обновил статус ресурса.
			r.events <- event.GenericEvent{
				Object: &testv1alpha1.WebChecker{
					ObjectMeta: metav1.ObjectMeta{
						Namespace: result.NamespacedName.Namespace,
						Name:      result.NamespacedName.Name,
					},
				},
			}
		}
		r.mu.Unlock()
	}
}

Метод, который по заданному временному интервалу в 10 секунд определяет, нужно ли запустить очередную проверку для внешних серверов:

func (r *WebCheckerReconciler) runTasksScheduler(ctx context.Context) {
	logger := log.FromContext(ctx)
	logger.Info("Starting tasks scheduler")
	ticker := time.NewTicker(1 * time.Second)
	defer ticker.Stop()

	for {
		select {
		case <-ticker.C:
			r.mu.RLock()
			for namespacedName, state := range r.webCheckersStates {
				now := time.Now()
				// Если ресурс не проверялся более 10 секунд, формируем задачу на проверку.
				if now.Sub(state.LastCheckTime) > 10*time.Second {
					r.tasks <- WebCheckerProbeTask{
						NamespacedName: namespacedName,
						Host:           state.Host,
						Path:           state.Path,
					}
				}
			}
			r.mu.RUnlock()
		case <-ctx.Done():
			return
		}
	}
}

Код, который проверяет доступность хоста по HTTP, вынесем в отдельный файл — internal/controller/types.go. Я его максимально упростил. В реальной жизни ресурс в кластере мог бы символизировать что-то другое. Например, мы могли бы обращаться к API внешних сервисов, получать от них данные и анализировать их.

package controller

import (
	"context"
	"fmt"
	"net/http"
	"slices"
	"time"

	"k8s.io/apimachinery/pkg/types"
)

var (
	successfulStatuses = []int{200, 300, 301, 302, 303}
)

type WebCheckerProbe struct {
	NamespacedName types.NamespacedName
	Host           string
	Path           string
}

func (p *WebCheckerProbe) PerformCheck(ctx context.Context) WebCheckerProbeResult {
	c := http.Client{
		Timeout: 1 * time.Second,
	}

	req, err := http.NewRequest("GET", p.Host+p.Path, nil)
	if err != nil {
		return WebCheckerProbeResult{
			NamespacedName: p.NamespacedName,
			IsSuccessful:   false,
			LastError:      err.Error(),
		}
	}

	resp, err := c.Do(req)
	if err != nil {
		return WebCheckerProbeResult{
			NamespacedName: p.NamespacedName,
			IsSuccessful:   false,
			LastError:      err.Error(),
		}
	}

	defer resp.Body.Close()

	if slices.Contains(successfulStatuses, resp.StatusCode) {
		return WebCheckerProbeResult{
			NamespacedName: p.NamespacedName,
			IsSuccessful:   true,
			LastError:      "",
		}
	}
	return WebCheckerProbeResult{
		NamespacedName: p.NamespacedName,
		IsSuccessful:   false,
		LastError:      fmt.Sprintf("Status code: %d", resp.StatusCode),
	}
}

Я попытался сделать проверку максимально простой. Не стал заморачиваться, например, с sync.Pool и повторным использованием соединений.

Запуск контроллера

В файле cmd/main.go немного изменим вызов SetupWithManager, чтобы он выглядел так:

webcheckerController := controller.NewWebChecker(mgr.GetClient(), mgr.GetScheme())
	if err = webcheckerController.SetupWithManager(mgr); err != nil {
		setupLog.Error(err, "unable to create controller", "controller", "WebChecker")
		os.Exit(1)
	}
	webcheckerController.RunWorkers(context.Background())

Теперь можно посмотреть, как наш контроллер будет работать в кластере. Используем kind и создадим локальный кластер:

kind create cluster

Установим наш CustomResourceDefinition в кластер:

make install

Запустим наш контроллер:

make build && make run

После запуска контроллера сможем наблюдать его логи. Запустим ещё один терминал и задеплоим в кластер два ресурса. Раз:

apiVersion: test.network.io/v1alpha1
kind: WebChecker
metadata:
  name: flant
spec:
  host: http://flant.ru

Два:

apiVersion: test.network.io/v1alpha1
kind: WebChecker
metadata:
  name: badnews
spec:
  host: http://badnews.me

Теперь можно посмотреть статус этих ресурсов. Раз:

kubectl get webcheckers.test.network.io badnews -o yaml
apiVersion: test.network.io/v1alpha1
kind: WebChecker
metadata:
  annotations:
  creationTimestamp: "2025-02-04T12:32:55Z"
  generation: 1
  name: badnews
  namespace: default
  resourceVersion: "3118"
  uid: 942765a7-e3d1-4f2a-b73f-f11377e1c72d
spec:
  host: http://badnews.me
  path: /
status:
  conditions:
  - lastTransitionTime: "2025-02-04T12:33:07Z"
    message: 'Get "http://badnews.me/": dial tcp: lookup badnews.me: no such host'
    reason: WebResourceReady
    status: "False"
    type: Ready

Два:

kubectl get webcheckers.test.network.io flant -o yaml
apiVersion: test.network.io/v1alpha1
kind: WebChecker
metadata:
  annotations:
  creationTimestamp: "2025-02-04T12:32:51Z"
  generation: 1
  name: flant
  namespace: default
  resourceVersion: "3112"
  uid: 4dc3f052-4893-4089-8e6d-ec98fad393f5
spec:
  host: http://flant.ru
  path: /
status:
  conditions:
  - lastTransitionTime: "2025-02-04T12:33:02Z"
    message: WebChecker is ready
    reason: WebResourceReady
    status: "True"
    type: Ready

Итоги

В этой статье мы рассмотрели пример создания Kubernetes-оператора, который наблюдает за внешними HTTP-серверами. Мы использовали kubebuilder для создания базовой структуры контроллера, определили CustomResourceDefinition для описания внешних ресурсов и реализовали логику для проверки доступности этих ресурсов.

Не стоит рассматривать данный пример как рабочий контроллер. Он создан исключительно в ознакомительных целях. Исходный код можно найти здесь. Но его можно расширить для управления более сложными внешними ресурсами, такими как базы данных или облачные сервисы.

P. S.

Читайте также в нашем блоге: