Выходим за рамки: создание оператора для наблюдения за внешними ресурсами в Kubernetes
- пятница, 7 марта 2025 г. в 00:00:14
Привет! Я Александр Зверев, инженер архитектурных решений в компании «Флант». Сегодня поговорим про всеми любимый Kubernetes. Этот оркестратор стал таким популярным не только потому что он эффективно управляет контейнерами. Ещё его можно прокачать под себя. И для этого у него есть все необходимые инструменты. Новые возможности можно добавить с помощью контроллеров, операторов и создания новых типов ресурсов.
При этом уже есть проекты, которые пошли дальше и научились управлять ресурсами за пределами кластера. Например, Deckhouse Commander, Argo CD, Crossplane. Они позволяют описывать ресурсы в кластере, а операторы «идут» во внешний мир и развёртывают новые кластеры или приложения.
В статье попробуем научить контроллер взаимодействовать с внешним миром. Мы создадим простой оператор, который будет следить за доступностью HTTP-серверов вне кластера, используя kubebuilder. Статья будет полезна тем, кто уже пробовал писать свои контроллеры Kubernetes и хочет расширить их возможности.
В своё время для написания контролеров появилось много подходов, библиотек и фреймворков. Среди популярных библиотек выделю controller-runtime, а у фреймворков можно выделить kubebuilder и operator SDK, созданные на базе controller-runtime.
Давайте ближе познакомимся с kubebuilder. Верхнеуровнево созданный на нём контроллер включает в себя:
менеджер, который организовывает работу других компонентов и кэширование, осуществляет leader-election, если в нём есть необходимость, и отвечает за обработку сигналов от ОС;
вебхуки, являющиеся опциональными, отвечают за установку пропущенных полей в спецификации и валидацию ресурсов. Они останутся за рамками данной статьи;
один и более контроллеров, которые «следят» за ресурсами определённого типа в кластере и дочерними ресурсами.
Про контроллеры поговорим подробнее. Их основная задача — получать события, связанные с нужными ресурсами: создание, удаление, изменения. Дальше осуществляется фильтрация этих событий. И затем событие передаётся reconciler'у, который сравнивает желаемое состояние ресурса с существующим в кластере и пытается привести к желаемому, если они расходятся.
Когда контроллер реагирует на изменения внутри кластера, всё просто. У него есть методы For и Owns, которые за это отвечают. А что касается кейса, когда контроллер реагирует на изменения вне кластера, тут информации мало. Этот пробел я и постараюсь исправить.
Чтобы показать, как контроллеры управляют внешними ресурсами, я придумал простую задачку. Главное — понять принцип работы. Представим, что у нас есть ресурсы, которые описывают HTTP-серверы вне кластера. Контроллер постоянно проверяет, доступны ли эти серверы, и обновляет статус соответствующих ресурсов в кластере.
Процесс инсталляции kubebuilder под различные платформы описан в официальной документации.
Инициализируем проект в kubebuilder:
mkdir probes && cd probes
kubebuilder init --domain network.io --repo probes
Создаём наш ресурс:
kubebuilder create api --group test --version v1alpha1 --kind WebChecker --namespaced=true
Система задаст два вопроса про создание ресурса и контроллера. Отвечаем утвердительно.
На выходе получаем сгенерированный проект на Go со всей необходимой структурой и Makefile. Открываем проект в любимом редакторе и находим файл probes/api/v1alpha1/webchecker_types.go
. В нём мы опишем структуру нашего ресурса, а затем сгенерируем нужные методы, CustomResourceDefinitions и манифесты.
Отредактируем структуры, описывающие спецификации и статус ресурса, следующим образом:
type WebCheckerSpec struct {
Host string `json:"host"`
// +kubebuilder:default:="/"
Path string `json:"path,omitempty"`
}
type WebCheckerStatus struct {
Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"`
}
type WebChecker struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec WebCheckerSpec `json:"spec,omitempty"`
Status WebCheckerStatus `json:"status,omitempty"`
}
Запустим кодогенерацию:
make generate && make manifests
В файле config/crd/bases/test.network.io_webcheckers.yaml
сгенерируется CustomResourceDefinintion. Также в файле api/v1alpha1/zz_generated.deepcopy.go
будут сгенерированы методы, реализующие DeepCopy.
Файл main.go
мы редактировать не будем. Предлагаю познакомиться с его содержимым самостоятельно. В нём считываются ключи запуска, закодирован выбор лидера, запускается единственный наш контроллер и регистрируются ответные действия на сигналы от ОС.
А мы переходим к файлу internal/controller/webchecker_controller.go
, так как там находится всё, что интересует нас сейчас.
Определяем типы и константы:
package controller
...
const (
WebCheckerWorkersCount = 2
WebCheckerProbeTaskChannelSize = 100
WebCheckerProbeResultChannelSize = 100
)
type WebCheckerState struct {
Host string
Path string
LastCheckTime time.Time
IsSuccessful bool
LastError string
}
type WebCheckerProbeTask struct {
NamespacedName types.NamespacedName
Host string
Path string
}
type WebCheckerProbeResult struct {
NamespacedName types.NamespacedName
IsSuccessful bool
LastError string
}
Наш основной тип данных, который содержит все необходимые поля и определяет методы для reconcile:
// WebCheckerReconciler reconciles a WebChecker object
type WebCheckerReconciler struct {
client.Client
Scheme *runtime.Scheme
events chan event.GenericEvent
mu sync.RWMutex
webCheckersStates map[types.NamespacedName]WebCheckerState
// канал для задач на проверку
tasks chan WebCheckerProbeTask
// канал для результатов проверки
tasksResults chan WebCheckerProbeResult
cancelFunc context.CancelFunc
}
Ниже основной метод для нашего контроллера. В нём определено, как реагировать на добавление, удаление или изменение ресурсов в кластере. В нашем примере этот метод ещё и обрабатывает запросы от внешних раздражителей:
func (r *WebCheckerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
logger.Info("Reconciling WebChecker", "request", req)
var reconciledResource testv1alpha1.WebChecker
// Вместо ресурса в метод Reconcile приходит запрос с указанием пространства имён / имени ресурса,
// поэтому нужно запросить ресурс из API кластера.
if err := r.Get(ctx, req.NamespacedName, &reconciledResource); err != nil {
logger.Error(err, "Failed to get WebChecker resource")
if errors.IsNotFound(err) {
r.mu.Lock()
delete(r.webCheckersStates, req.NamespacedName)
r.mu.Unlock()
}
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// Если ресурс помечен на удаление, то удаляем запись о нём из контроллера,
// чтобы по нему не формировались задачи на проверку.
if reconciledResource.DeletionTimestamp != nil {
logger.Info("WebChecker resource is being deleted")
r.mu.Lock()
delete(r.webCheckersStates, req.NamespacedName)
r.mu.Unlock()
return ctrl.Result{}, nil
}
// Если ресурс создан в кластере, добавляем его в список проверяемых ресурсов.
r.mu.Lock()
defer r.mu.Unlock()
var cachedState WebCheckerState
var ok bool
if cachedState, ok = r.webCheckersStates[req.NamespacedName]; !ok {
r.webCheckersStates[req.NamespacedName] = WebCheckerState{
Host: reconciledResource.Spec.Host,
Path: reconciledResource.Spec.Path,
}
return ctrl.Result{}, nil
}
if cachedState.Host != reconciledResource.Spec.Host || cachedState.Path != reconciledResource.Spec.Path {
r.webCheckersStates[req.NamespacedName] = WebCheckerState{
Host: reconciledResource.Spec.Host,
Path: reconciledResource.Spec.Path,
}
return ctrl.Result{}, nil
}
// Если запрос пришёл из канала событий, необходимо обновить статус ресурса.
webCheckerStatus := testv1alpha1.WebCheckerStatus{
Conditions: []metav1.Condition{
statusFromWebCheckerState(cachedState),
},
}
reconciledResource.Status = webCheckerStatus
err := r.Status().Update(ctx, &reconciledResource)
if err != nil {
logger.Error(err, "Failed to update WebChecker status")
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
Небольшая вспомогательная функция, которая формирует статус ресурса:
func statusFromWebCheckerState(webCheckerState WebCheckerState) metav1.Condition {
cond := metav1.Condition{
LastTransitionTime: metav1.Now(),
Type: "Ready",
Reason: "WebResourceReady",
}
if webCheckerState.IsSuccessful {
cond.Status = metav1.ConditionTrue
cond.Message = "WebChecker is ready"
} else {
cond.Status = metav1.ConditionFalse
cond.Message = webCheckerState.LastError
}
return cond
}
Ниже метод, который реализован в виде удобного билдера. Позволяет указать, какие ресурсы будут обрабатываться в кластере. В нашем случае, помимо обычных операций добавления и удаления, мы добавляем дополнительный канал для событий, в который собираются данные от проверок внешних серверов:
func (r *WebCheckerReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&testv1alpha1.WebChecker{}).
Named("webchecker").
// Именно WatchesRawSource отвечает за обработку "сторонних событий".
WatchesRawSource(source.Channel(r.events, &handler.EnqueueRequestForObject{})).
Complete(r)
}
Воркер, который выполняет проверку внешних серверов:
func (r *WebCheckerReconciler) RunTasksWorker(ctx context.Context) {
logger := log.FromContext(ctx)
logger.Info("Starting tasks worker")
for task := range r.tasks {
logger.Info("Processing task", "task", task)
checker := WebCheckerProbe{
NamespacedName: task.NamespacedName,
Host: task.Host,
Path: task.Path,
}
taskResult := checker.PerformCheck(ctx)
r.tasksResults <- taskResult
}
}
Запускаем все горутины, которые будут формировать задачи на проверку, выполнять проверку и обрабатывать результаты:
func (r *WebCheckerReconciler) RunWorkers(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
r.cancelFunc = cancel
go r.runTasksScheduler(ctx)
for i := 0; i < WebCheckerWorkersCount; i++ {
go r.RunTasksWorker(ctx)
}
go r.runTaskResultsAnalyzer(ctx)
}
Данный метод анализирует результаты проверки и записывает их в канал событий:
func (r *WebCheckerReconciler) runTaskResultsAnalyzer(ctx context.Context) {
logger := log.FromContext(ctx)
logger.Info("Starting task results analyzer")
for result := range r.tasksResults {
r.mu.Lock()
if resultDetail, ok := r.webCheckersStates[result.NamespacedName]; !ok {
logger.Info("WebChecker resource not found", "namespacedName", result.NamespacedName)
} else {
resultDetail.LastCheckTime = time.Now()
resultDetail.IsSuccessful = result.IsSuccessful
resultDetail.LastError = result.LastError
r.webCheckersStates[result.NamespacedName] = resultDetail
// Отправляем событие в канал, чтобы контроллер в reconcile обновил статус ресурса.
r.events <- event.GenericEvent{
Object: &testv1alpha1.WebChecker{
ObjectMeta: metav1.ObjectMeta{
Namespace: result.NamespacedName.Namespace,
Name: result.NamespacedName.Name,
},
},
}
}
r.mu.Unlock()
}
}
Метод, который по заданному временному интервалу в 10 секунд определяет, нужно ли запустить очередную проверку для внешних серверов:
func (r *WebCheckerReconciler) runTasksScheduler(ctx context.Context) {
logger := log.FromContext(ctx)
logger.Info("Starting tasks scheduler")
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
r.mu.RLock()
for namespacedName, state := range r.webCheckersStates {
now := time.Now()
// Если ресурс не проверялся более 10 секунд, формируем задачу на проверку.
if now.Sub(state.LastCheckTime) > 10*time.Second {
r.tasks <- WebCheckerProbeTask{
NamespacedName: namespacedName,
Host: state.Host,
Path: state.Path,
}
}
}
r.mu.RUnlock()
case <-ctx.Done():
return
}
}
}
Код, который проверяет доступность хоста по HTTP, вынесем в отдельный файл — internal/controller/types.go
. Я его максимально упростил. В реальной жизни ресурс в кластере мог бы символизировать что-то другое. Например, мы могли бы обращаться к API внешних сервисов, получать от них данные и анализировать их.
package controller
import (
"context"
"fmt"
"net/http"
"slices"
"time"
"k8s.io/apimachinery/pkg/types"
)
var (
successfulStatuses = []int{200, 300, 301, 302, 303}
)
type WebCheckerProbe struct {
NamespacedName types.NamespacedName
Host string
Path string
}
func (p *WebCheckerProbe) PerformCheck(ctx context.Context) WebCheckerProbeResult {
c := http.Client{
Timeout: 1 * time.Second,
}
req, err := http.NewRequest("GET", p.Host+p.Path, nil)
if err != nil {
return WebCheckerProbeResult{
NamespacedName: p.NamespacedName,
IsSuccessful: false,
LastError: err.Error(),
}
}
resp, err := c.Do(req)
if err != nil {
return WebCheckerProbeResult{
NamespacedName: p.NamespacedName,
IsSuccessful: false,
LastError: err.Error(),
}
}
defer resp.Body.Close()
if slices.Contains(successfulStatuses, resp.StatusCode) {
return WebCheckerProbeResult{
NamespacedName: p.NamespacedName,
IsSuccessful: true,
LastError: "",
}
}
return WebCheckerProbeResult{
NamespacedName: p.NamespacedName,
IsSuccessful: false,
LastError: fmt.Sprintf("Status code: %d", resp.StatusCode),
}
}
Я попытался сделать проверку максимально простой. Не стал заморачиваться, например, с sync.Pool
и повторным использованием соединений.
В файле cmd/main.go
немного изменим вызов SetupWithManager
, чтобы он выглядел так:
webcheckerController := controller.NewWebChecker(mgr.GetClient(), mgr.GetScheme())
if err = webcheckerController.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "WebChecker")
os.Exit(1)
}
webcheckerController.RunWorkers(context.Background())
Теперь можно посмотреть, как наш контроллер будет работать в кластере. Используем kind и создадим локальный кластер:
kind create cluster
Установим наш CustomResourceDefinition
в кластер:
make install
Запустим наш контроллер:
make build && make run
После запуска контроллера сможем наблюдать его логи. Запустим ещё один терминал и задеплоим в кластер два ресурса. Раз:
apiVersion: test.network.io/v1alpha1
kind: WebChecker
metadata:
name: flant
spec:
host: http://flant.ru
Два:
apiVersion: test.network.io/v1alpha1
kind: WebChecker
metadata:
name: badnews
spec:
host: http://badnews.me
Теперь можно посмотреть статус этих ресурсов. Раз:
kubectl get webcheckers.test.network.io badnews -o yaml
apiVersion: test.network.io/v1alpha1
kind: WebChecker
metadata:
annotations:
creationTimestamp: "2025-02-04T12:32:55Z"
generation: 1
name: badnews
namespace: default
resourceVersion: "3118"
uid: 942765a7-e3d1-4f2a-b73f-f11377e1c72d
spec:
host: http://badnews.me
path: /
status:
conditions:
- lastTransitionTime: "2025-02-04T12:33:07Z"
message: 'Get "http://badnews.me/": dial tcp: lookup badnews.me: no such host'
reason: WebResourceReady
status: "False"
type: Ready
Два:
kubectl get webcheckers.test.network.io flant -o yaml
apiVersion: test.network.io/v1alpha1
kind: WebChecker
metadata:
annotations:
creationTimestamp: "2025-02-04T12:32:51Z"
generation: 1
name: flant
namespace: default
resourceVersion: "3112"
uid: 4dc3f052-4893-4089-8e6d-ec98fad393f5
spec:
host: http://flant.ru
path: /
status:
conditions:
- lastTransitionTime: "2025-02-04T12:33:02Z"
message: WebChecker is ready
reason: WebResourceReady
status: "True"
type: Ready
В этой статье мы рассмотрели пример создания Kubernetes-оператора, который наблюдает за внешними HTTP-серверами. Мы использовали kubebuilder для создания базовой структуры контроллера, определили CustomResourceDefinition для описания внешних ресурсов и реализовали логику для проверки доступности этих ресурсов.
Не стоит рассматривать данный пример как рабочий контроллер. Он создан исключительно в ознакомительных целях. Исходный код можно найти здесь. Но его можно расширить для управления более сложными внешними ресурсами, такими как базы данных или облачные сервисы.
Читайте также в нашем блоге: