Kubernetes Operator - 解答例
このドキュメントでは、本番環境で使用できるレベルのKubernetes Operatorの完全な実装例を提供します。3つの異なるアプローチ(Kubebuilder、Operator SDK、client-go直接使用)を紹介し、それぞれの長所と短所を解説します。
---
目次
---
1. 完全なCRD定義
1.1 基本的なCRD(myapp_types.go)
package v1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.
// MyAppSpec defines the desired state of MyApp
type MyAppSpec struct {
// Replicas is the desired number of replicas
// +kubebuilder:validation:Minimum=1
// +kubebuilder:validation:Maximum=100
// +kubebuilder:default=1
// +optional
Replicas int32 `json:"replicas,omitempty"`
// Image is the container image to use
// +kubebuilder:validation:Required
// +kubebuilder:validation:Pattern=`^[a-zA-Z0-9\-\./:]+Kubernetes Operator - 解答例
このドキュメントでは、本番環境で使用できるレベルのKubernetes Operatorの完全な実装例を提供します。3つの異なるアプローチ(Kubebuilder、Operator SDK、client-go直接使用)を紹介し、それぞれの長所と短所を解説します。
---
目次
---
1. 完全なCRD定義
1.1 基本的なCRD(myapp_types.go)
Image string `json:"image"`
// Port is the container port to expose
// +kubebuilder:validation:Minimum=1
// +kubebuilder:validation:Maximum=65535
// +kubebuilder:default=8080
// +optional
Port int32 `json:"port,omitempty"`
// Resources defines the compute resources for the application
// +optional
Resources *ResourceRequirements `json:"resources,omitempty"`
// Env defines environment variables for the application
// +optional
Env []EnvVar `json:"env,omitempty"`
// ServiceType defines the type of Service to create
// +kubebuilder:validation:Enum=ClusterIP;NodePort;LoadBalancer
// +kubebuilder:default=ClusterIP
// +optional
ServiceType string `json:"serviceType,omitempty"`
// AutoScaling defines the autoscaling configuration
// +optional
AutoScaling *AutoScalingSpec `json:"autoScaling,omitempty"`
}
// ResourceRequirements defines the compute resources
type ResourceRequirements struct {
// Limits describes the maximum amount of compute resources allowed
// +optional
Limits ResourceList `json:"limits,omitempty"`
// Requests describes the minimum amount of compute resources required
// +optional
Requests ResourceList `json:"requests,omitempty"`
}
// ResourceList is a set of (resource name, quantity) pairs
type ResourceList struct {
// CPU quantity (e.g., "100m", "1")
// +optional
CPU string `json:"cpu,omitempty"`
// Memory quantity (e.g., "128Mi", "1Gi")
// +optional
Memory string `json:"memory,omitempty"`
}
// EnvVar represents an environment variable
type EnvVar struct {
// Name of the environment variable
Name string `json:"name"`
// Value of the environment variable
// +optional
Value string `json:"value,omitempty"`
// ValueFrom is a source for the environment variable's value
// +optional
ValueFrom *EnvVarSource `json:"valueFrom,omitempty"`
}
// EnvVarSource represents a source for the value of an EnvVar
type EnvVarSource struct {
// SecretKeyRef selects a key of a secret in the pod's namespace
// +optional
SecretKeyRef *SecretKeySelector `json:"secretKeyRef,omitempty"`
// ConfigMapKeyRef selects a key of a ConfigMap in the pod's namespace
// +optional
ConfigMapKeyRef *ConfigMapKeySelector `json:"configMapKeyRef,omitempty"`
}
// SecretKeySelector selects a key of a Secret
type SecretKeySelector struct {
// Name of the secret
Name string `json:"name"`
// Key to select from the secret
Key string `json:"key"`
}
// ConfigMapKeySelector selects a key of a ConfigMap
type ConfigMapKeySelector struct {
// Name of the ConfigMap
Name string `json:"name"`
// Key to select from the ConfigMap
Key string `json:"key"`
}
// AutoScalingSpec defines the autoscaling configuration
type AutoScalingSpec struct {
// Enabled indicates whether autoscaling is enabled
// +kubebuilder:default=false
Enabled bool `json:"enabled"`
// MinReplicas is the minimum number of replicas
// +kubebuilder:validation:Minimum=1
// +kubebuilder:default=1
// +optional
MinReplicas int32 `json:"minReplicas,omitempty"`
// MaxReplicas is the maximum number of replicas
// +kubebuilder:validation:Minimum=1
// +kubebuilder:validation:Maximum=100
MaxReplicas int32 `json:"maxReplicas"`
// TargetCPUUtilizationPercentage is the target CPU utilization
// +kubebuilder:validation:Minimum=1
// +kubebuilder:validation:Maximum=100
// +kubebuilder:default=80
// +optional
TargetCPUUtilizationPercentage int32 `json:"targetCPUUtilizationPercentage,omitempty"`
}
// MyAppStatus defines the observed state of MyApp
type MyAppStatus struct {
// Conditions represent the latest available observations of the MyApp's state
// +optional
Conditions []metav1.Condition `json:"conditions,omitempty"`
// ReadyReplicas is the number of ready replicas
// +optional
ReadyReplicas int32 `json:"readyReplicas,omitempty"`
// AvailableReplicas is the number of available replicas
// +optional
AvailableReplicas int32 `json:"availableReplicas,omitempty"`
// ObservedGeneration is the most recent generation observed
// +optional
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
// LastUpdateTime is the last time the status was updated
// +optional
LastUpdateTime *metav1.Time `json:"lastUpdateTime,omitempty"`
}
// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:subresource:scale:specpath=.spec.replicas,statuspath=.status.readyReplicas
// +kubebuilder:printcolumn:name="Image",type=string,JSONPath=`.spec.image`
// +kubebuilder:printcolumn:name="Replicas",type=integer,JSONPath=`.spec.replicas`
// +kubebuilder:printcolumn:name="Ready",type=integer,JSONPath=`.status.readyReplicas`
// +kubebuilder:printcolumn:name="Age",type=date,JSONPath=`.metadata.creationTimestamp`
// MyApp is the Schema for the myapps API
type MyApp struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec MyAppSpec `json:"spec,omitempty"`
Status MyAppStatus `json:"status,omitempty"`
}
// +kubebuilder:object:root=true
// MyAppList contains a list of MyApp
type MyAppList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []MyApp `json:"items"`
}
func init() {
SchemeBuilder.Register(&MyApp{}, &MyAppList{})
}
// Condition types
const (
// ConditionTypeReady indicates the MyApp is ready
ConditionTypeReady = "Ready"
// ConditionTypeProgressing indicates the MyApp is progressing
ConditionTypeProgressing = "Progressing"
// ConditionTypeDegraded indicates the MyApp is degraded
ConditionTypeDegraded = "Degraded"
)
// Condition reasons
const (
ReasonReconciling = "Reconciling"
ReasonReconcileSuccess = "ReconcileSuccess"
ReasonReconcileError = "ReconcileError"
ReasonDeploymentReady = "DeploymentReady"
ReasonServiceReady = "ServiceReady"
)
1.2 生成されるCRD YAML
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.13.0
name: myapps.example.com
spec:
group: example.com
names:
kind: MyApp
listKind: MyAppList
plural: myapps
singular: myapp
scope: Namespaced
versions:
- additionalPrinterColumns:
- jsonPath: .spec.image
name: Image
type: string
- jsonPath: .spec.replicas
name: Replicas
type: integer
- jsonPath: .status.readyReplicas
name: Ready
type: integer
- jsonPath: .metadata.creationTimestamp
name: Age
type: date
name: v1
schema:
openAPIV3Schema:
description: MyApp is the Schema for the myapps API
properties:
apiVersion:
description: 'APIVersion defines the versioned schema of this representation
of an object.'
type: string
kind:
description: 'Kind is a string value representing the REST resource this
object represents.'
type: string
metadata:
type: object
spec:
description: MyAppSpec defines the desired state of MyApp
properties:
autoScaling:
description: AutoScaling defines the autoscaling configuration
properties:
enabled:
default: false
description: Enabled indicates whether autoscaling is enabled
type: boolean
maxReplicas:
description: MaxReplicas is the maximum number of replicas
format: int32
maximum: 100
minimum: 1
type: integer
minReplicas:
default: 1
description: MinReplicas is the minimum number of replicas
format: int32
minimum: 1
type: integer
targetCPUUtilizationPercentage:
default: 80
description: TargetCPUUtilizationPercentage is the target CPU
utilization
format: int32
maximum: 100
minimum: 1
type: integer
required:
- enabled
- maxReplicas
type: object
env:
description: Env defines environment variables for the application
items:
description: EnvVar represents an environment variable
properties:
name:
description: Name of the environment variable
type: string
value:
description: Value of the environment variable
type: string
valueFrom:
description: ValueFrom is a source for the environment variable's
value
properties:
configMapKeyRef:
description: ConfigMapKeyRef selects a key of a ConfigMap
in the pod's namespace
properties:
key:
description: Key to select from the ConfigMap
type: string
name:
description: Name of the ConfigMap
type: string
required:
- key
- name
type: object
secretKeyRef:
description: SecretKeyRef selects a key of a secret in the
pod's namespace
properties:
key:
description: Key to select from the secret
type: string
name:
description: Name of the secret
type: string
required:
- key
- name
type: object
type: object
required:
- name
type: object
type: array
image:
description: Image is the container image to use
pattern: ^[a-zA-Z0-9\-\./:]+$
type: string
port:
default: 8080
description: Port is the container port to expose
format: int32
maximum: 65535
minimum: 1
type: integer
replicas:
default: 1
description: Replicas is the desired number of replicas
format: int32
maximum: 100
minimum: 1
type: integer
resources:
description: Resources defines the compute resources for the application
properties:
limits:
description: Limits describes the maximum amount of compute resources
allowed
properties:
cpu:
description: CPU quantity (e.g., "100m", "1")
type: string
memory:
description: Memory quantity (e.g., "128Mi", "1Gi")
type: string
type: object
requests:
description: Requests describes the minimum amount of compute
resources required
properties:
cpu:
description: CPU quantity (e.g., "100m", "1")
type: string
memory:
description: Memory quantity (e.g., "128Mi", "1Gi")
type: string
type: object
type: object
serviceType:
default: ClusterIP
description: ServiceType defines the type of Service to create
enum:
- ClusterIP
- NodePort
- LoadBalancer
type: string
required:
- image
type: object
status:
description: MyAppStatus defines the observed state of MyApp
properties:
availableReplicas:
description: AvailableReplicas is the number of available replicas
format: int32
type: integer
conditions:
description: Conditions represent the latest available observations
of the MyApp's state
items:
description: Condition contains details for one aspect of the current
state of this API Resource.
properties:
lastTransitionTime:
description: lastTransitionTime is the last time the condition
transitioned from one status to another.
format: date-time
type: string
message:
description: message is a human readable message indicating
details about the transition.
maxLength: 32768
type: string
observedGeneration:
description: observedGeneration represents the .metadata.generation
that the condition was set based upon.
format: int64
minimum: 0
type: integer
reason:
description: reason contains a programmatic identifier indicating
the reason for the condition's last transition.
maxLength: 1024
minLength: 1
pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$
type: string
status:
description: status of the condition, one of True, False, Unknown.
enum:
- "True"
- "False"
- Unknown
type: string
type:
description: type of condition in CamelCase or in foo.example.com/CamelCase.
maxLength: 316
pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$
type: string
required:
- lastTransitionTime
- message
- reason
- status
- type
type: object
type: array
lastUpdateTime:
description: LastUpdateTime is the last time the status was updated
format: date-time
type: string
observedGeneration:
description: ObservedGeneration is the most recent generation observed
format: int64
type: integer
readyReplicas:
description: ReadyReplicas is the number of ready replicas
format: int32
type: integer
type: object
type: object
served: true
storage: true
subresources:
scale:
specReplicasPath: .spec.replicas
statusReplicasPath: .status.readyReplicas
status: {}
---
2. パターン1: Kubebuilderによる実装
2.1 完全なReconciler実装
package controller
import (
"context"
"fmt"
"reflect"
"time"
appsv1 "k8s.io/api/apps/v1"
autoscalingv2 "k8s.io/api/autoscaling/v2"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
examplev1 "example.com/myoperator/api/v1"
)
// MyAppReconciler reconciles a MyApp object
type MyAppReconciler struct {
client.Client
Scheme *runtime.Scheme
}
// +kubebuilder:rbac:groups=example.com,resources=myapps,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=example.com,resources=myapps/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=example.com,resources=myapps/finalizers,verbs=update
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=autoscaling,resources=horizontalpodautoscalers,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=events,verbs=create;patch
// Reconcile is part of the main kubernetes reconciliation loop
func (r *MyAppReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
logger.Info("Starting reconciliation", "namespacedName", req.NamespacedName)
// Fetch the MyApp instance
myApp := &examplev1.MyApp{}
if err := r.Get(ctx, req.NamespacedName, myApp); err != nil {
if apierrors.IsNotFound(err) {
// Object not found, could have been deleted after reconcile request
logger.Info("MyApp resource not found. Ignoring since object must be deleted")
return ctrl.Result{}, nil
}
logger.Error(err, "Failed to get MyApp")
return ctrl.Result{}, err
}
// Set initial status if not set
if myApp.Status.Conditions == nil || len(myApp.Status.Conditions) == 0 {
meta.SetStatusCondition(&myApp.Status.Conditions, metav1.Condition{
Type: examplev1.ConditionTypeProgressing,
Status: metav1.ConditionTrue,
Reason: examplev1.ReasonReconciling,
Message: "Starting reconciliation",
ObservedGeneration: myApp.Generation,
})
if err := r.Status().Update(ctx, myApp); err != nil {
logger.Error(err, "Failed to update MyApp status")
return ctrl.Result{}, err
}
}
// Reconcile Deployment
if err := r.reconcileDeployment(ctx, myApp); err != nil {
logger.Error(err, "Failed to reconcile Deployment")
meta.SetStatusCondition(&myApp.Status.Conditions, metav1.Condition{
Type: examplev1.ConditionTypeDegraded,
Status: metav1.ConditionTrue,
Reason: examplev1.ReasonReconcileError,
Message: fmt.Sprintf("Failed to reconcile Deployment: %v", err),
ObservedGeneration: myApp.Generation,
})
if statusErr := r.Status().Update(ctx, myApp); statusErr != nil {
logger.Error(statusErr, "Failed to update MyApp status")
}
return ctrl.Result{}, err
}
// Reconcile Service
if err := r.reconcileService(ctx, myApp); err != nil {
logger.Error(err, "Failed to reconcile Service")
meta.SetStatusCondition(&myApp.Status.Conditions, metav1.Condition{
Type: examplev1.ConditionTypeDegraded,
Status: metav1.ConditionTrue,
Reason: examplev1.ReasonReconcileError,
Message: fmt.Sprintf("Failed to reconcile Service: %v", err),
ObservedGeneration: myApp.Generation,
})
if statusErr := r.Status().Update(ctx, myApp); statusErr != nil {
logger.Error(statusErr, "Failed to update MyApp status")
}
return ctrl.Result{}, err
}
// Reconcile HorizontalPodAutoscaler if enabled
if myApp.Spec.AutoScaling != nil && myApp.Spec.AutoScaling.Enabled {
if err := r.reconcileHPA(ctx, myApp); err != nil {
logger.Error(err, "Failed to reconcile HPA")
meta.SetStatusCondition(&myApp.Status.Conditions, metav1.Condition{
Type: examplev1.ConditionTypeDegraded,
Status: metav1.ConditionTrue,
Reason: examplev1.ReasonReconcileError,
Message: fmt.Sprintf("Failed to reconcile HPA: %v", err),
ObservedGeneration: myApp.Generation,
})
if statusErr := r.Status().Update(ctx, myApp); statusErr != nil {
logger.Error(statusErr, "Failed to update MyApp status")
}
return ctrl.Result{}, err
}
} else {
// Delete HPA if autoscaling is disabled
if err := r.deleteHPA(ctx, myApp); err != nil {
logger.Error(err, "Failed to delete HPA")
return ctrl.Result{}, err
}
}
// Update status based on Deployment status
if err := r.updateStatus(ctx, myApp); err != nil {
logger.Error(err, "Failed to update MyApp status")
return ctrl.Result{}, err
}
logger.Info("Reconciliation completed successfully")
return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
}
// reconcileDeployment ensures the Deployment exists and is up to date
func (r *MyAppReconciler) reconcileDeployment(ctx context.Context, myApp *examplev1.MyApp) error {
logger := log.FromContext(ctx)
// Define the desired Deployment
desired := r.deploymentForMyApp(myApp)
// Check if the Deployment already exists
found := &appsv1.Deployment{}
err := r.Get(ctx, types.NamespacedName{Name: myApp.Name, Namespace: myApp.Namespace}, found)
if err != nil {
if apierrors.IsNotFound(err) {
// Create the Deployment
logger.Info("Creating a new Deployment", "Deployment.Namespace", desired.Namespace, "Deployment.Name", desired.Name)
if err := r.Create(ctx, desired); err != nil {
logger.Error(err, "Failed to create new Deployment", "Deployment.Namespace", desired.Namespace, "Deployment.Name", desired.Name)
return err
}
return nil
}
return err
}
// Update the Deployment if the spec has changed
if !reflect.DeepEqual(found.Spec, desired.Spec) {
logger.Info("Updating Deployment", "Deployment.Namespace", found.Namespace, "Deployment.Name", found.Name)
found.Spec = desired.Spec
if err := r.Update(ctx, found); err != nil {
logger.Error(err, "Failed to update Deployment", "Deployment.Namespace", found.Namespace, "Deployment.Name", found.Name)
return err
}
}
return nil
}
// deploymentForMyApp returns a Deployment object for MyApp
func (r *MyAppReconciler) deploymentForMyApp(myApp *examplev1.MyApp) *appsv1.Deployment {
labels := labelsForMyApp(myApp.Name)
replicas := myApp.Spec.Replicas
// If autoscaling is enabled, don't set replicas on Deployment
if myApp.Spec.AutoScaling != nil && myApp.Spec.AutoScaling.Enabled {
replicas = 0 // HPA will manage replicas
}
deployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: myApp.Name,
Namespace: myApp.Namespace,
Labels: labels,
},
Spec: appsv1.DeploymentSpec{
Replicas: &replicas,
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: "app",
Image: myApp.Spec.Image,
Ports: []corev1.ContainerPort{{
Name: "http",
ContainerPort: myApp.Spec.Port,
Protocol: corev1.ProtocolTCP,
}},
Env: buildEnvVars(myApp.Spec.Env),
Resources: buildResourceRequirements(myApp.Spec.Resources),
}},
},
},
},
}
// Set MyApp instance as the owner and controller
_ = controllerutil.SetControllerReference(myApp, deployment, r.Scheme)
return deployment
}
// reconcileService ensures the Service exists and is up to date
func (r *MyAppReconciler) reconcileService(ctx context.Context, myApp *examplev1.MyApp) error {
logger := log.FromContext(ctx)
// Define the desired Service
desired := r.serviceForMyApp(myApp)
// Check if the Service already exists
found := &corev1.Service{}
err := r.Get(ctx, types.NamespacedName{Name: myApp.Name, Namespace: myApp.Namespace}, found)
if err != nil {
if apierrors.IsNotFound(err) {
// Create the Service
logger.Info("Creating a new Service", "Service.Namespace", desired.Namespace, "Service.Name", desired.Name)
if err := r.Create(ctx, desired); err != nil {
logger.Error(err, "Failed to create new Service", "Service.Namespace", desired.Namespace, "Service.Name", desired.Name)
return err
}
return nil
}
return err
}
// Update the Service if needed (excluding ClusterIP which is immutable)
if found.Spec.Type != desired.Spec.Type || !reflect.DeepEqual(found.Spec.Ports, desired.Spec.Ports) {
logger.Info("Updating Service", "Service.Namespace", found.Namespace, "Service.Name", found.Name)
found.Spec.Type = desired.Spec.Type
found.Spec.Ports = desired.Spec.Ports
found.Spec.Selector = desired.Spec.Selector
if err := r.Update(ctx, found); err != nil {
logger.Error(err, "Failed to update Service", "Service.Namespace", found.Namespace, "Service.Name", found.Name)
return err
}
}
return nil
}
// serviceForMyApp returns a Service object for MyApp
func (r *MyAppReconciler) serviceForMyApp(myApp *examplev1.MyApp) *corev1.Service {
labels := labelsForMyApp(myApp.Name)
service := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: myApp.Name,
Namespace: myApp.Namespace,
Labels: labels,
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceType(myApp.Spec.ServiceType),
Selector: labels,
Ports: []corev1.ServicePort{{
Name: "http",
Port: myApp.Spec.Port,
TargetPort: intstr.FromString("http"),
Protocol: corev1.ProtocolTCP,
}},
},
}
// Set MyApp instance as the owner and controller
_ = controllerutil.SetControllerReference(myApp, service, r.Scheme)
return service
}
// reconcileHPA ensures the HorizontalPodAutoscaler exists and is up to date
func (r *MyAppReconciler) reconcileHPA(ctx context.Context, myApp *examplev1.MyApp) error {
logger := log.FromContext(ctx)
if myApp.Spec.AutoScaling == nil || !myApp.Spec.AutoScaling.Enabled {
return nil
}
// Define the desired HPA
desired := r.hpaForMyApp(myApp)
// Check if the HPA already exists
found := &autoscalingv2.HorizontalPodAutoscaler{}
err := r.Get(ctx, types.NamespacedName{Name: myApp.Name, Namespace: myApp.Namespace}, found)
if err != nil {
if apierrors.IsNotFound(err) {
// Create the HPA
logger.Info("Creating a new HPA", "HPA.Namespace", desired.Namespace, "HPA.Name", desired.Name)
if err := r.Create(ctx, desired); err != nil {
logger.Error(err, "Failed to create new HPA", "HPA.Namespace", desired.Namespace, "HPA.Name", desired.Name)
return err
}
return nil
}
return err
}
// Update the HPA if the spec has changed
if !reflect.DeepEqual(found.Spec, desired.Spec) {
logger.Info("Updating HPA", "HPA.Namespace", found.Namespace, "HPA.Name", found.Name)
found.Spec = desired.Spec
if err := r.Update(ctx, found); err != nil {
logger.Error(err, "Failed to update HPA", "HPA.Namespace", found.Namespace, "HPA.Name", found.Name)
return err
}
}
return nil
}
// hpaForMyApp returns an HPA object for MyApp
func (r *MyAppReconciler) hpaForMyApp(myApp *examplev1.MyApp) *autoscalingv2.HorizontalPodAutoscaler {
labels := labelsForMyApp(myApp.Name)
hpa := &autoscalingv2.HorizontalPodAutoscaler{
ObjectMeta: metav1.ObjectMeta{
Name: myApp.Name,
Namespace: myApp.Namespace,
Labels: labels,
},
Spec: autoscalingv2.HorizontalPodAutoscalerSpec{
ScaleTargetRef: autoscalingv2.CrossVersionObjectReference{
APIVersion: "apps/v1",
Kind: "Deployment",
Name: myApp.Name,
},
MinReplicas: &myApp.Spec.AutoScaling.MinReplicas,
MaxReplicas: myApp.Spec.AutoScaling.MaxReplicas,
Metrics: []autoscalingv2.MetricSpec{{
Type: autoscalingv2.ResourceMetricSourceType,
Resource: &autoscalingv2.ResourceMetricSource{
Name: corev1.ResourceCPU,
Target: autoscalingv2.MetricTarget{
Type: autoscalingv2.UtilizationMetricType,
AverageUtilization: &myApp.Spec.AutoScaling.TargetCPUUtilizationPercentage,
},
},
}},
},
}
// Set MyApp instance as the owner and controller
_ = controllerutil.SetControllerReference(myApp, hpa, r.Scheme)
return hpa
}
// deleteHPA deletes the HPA if it exists
func (r *MyAppReconciler) deleteHPA(ctx context.Context, myApp *examplev1.MyApp) error {
logger := log.FromContext(ctx)
hpa := &autoscalingv2.HorizontalPodAutoscaler{}
err := r.Get(ctx, types.NamespacedName{Name: myApp.Name, Namespace: myApp.Namespace}, hpa)
if err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return err
}
logger.Info("Deleting HPA", "HPA.Namespace", hpa.Namespace, "HPA.Name", hpa.Name)
return r.Delete(ctx, hpa)
}
// updateStatus updates the MyApp status based on Deployment status
func (r *MyAppReconciler) updateStatus(ctx context.Context, myApp *examplev1.MyApp) error {
logger := log.FromContext(ctx)
// Fetch the Deployment
deployment := &appsv1.Deployment{}
err := r.Get(ctx, types.NamespacedName{Name: myApp.Name, Namespace: myApp.Namespace}, deployment)
if err != nil {
return err
}
// Update status fields
myApp.Status.ReadyReplicas = deployment.Status.ReadyReplicas
myApp.Status.AvailableReplicas = deployment.Status.AvailableReplicas
myApp.Status.ObservedGeneration = myApp.Generation
now := metav1.Now()
myApp.Status.LastUpdateTime = &now
// Update conditions based on Deployment status
if deployment.Status.ReadyReplicas == *deployment.Spec.Replicas && deployment.Status.ReadyReplicas > 0 {
meta.SetStatusCondition(&myApp.Status.Conditions, metav1.Condition{
Type: examplev1.ConditionTypeReady,
Status: metav1.ConditionTrue,
Reason: examplev1.ReasonDeploymentReady,
Message: "All replicas are ready",
ObservedGeneration: myApp.Generation,
})
meta.RemoveStatusCondition(&myApp.Status.Conditions, examplev1.ConditionTypeDegraded)
} else {
meta.SetStatusCondition(&myApp.Status.Conditions, metav1.Condition{
Type: examplev1.ConditionTypeProgressing,
Status: metav1.ConditionTrue,
Reason: examplev1.ReasonReconciling,
Message: fmt.Sprintf("Ready replicas: %d/%d", deployment.Status.ReadyReplicas, *deployment.Spec.Replicas),
ObservedGeneration: myApp.Generation,
})
}
// Update the status
if err := r.Status().Update(ctx, myApp); err != nil {
logger.Error(err, "Failed to update MyApp status")
return err
}
return nil
}
// labelsForMyApp returns the labels for selecting the resources
func labelsForMyApp(name string) map[string]string {
return map[string]string{
"app.kubernetes.io/name": "myapp",
"app.kubernetes.io/instance": name,
"app.kubernetes.io/managed-by": "myapp-operator",
}
}
// buildEnvVars converts MyApp EnvVars to corev1.EnvVars
func buildEnvVars(envVars []examplev1.EnvVar) []corev1.EnvVar {
result := make([]corev1.EnvVar, 0, len(envVars))
for _, env := range envVars {
coreEnv := corev1.EnvVar{
Name: env.Name,
Value: env.Value,
}
if env.ValueFrom != nil {
coreEnv.ValueFrom = &corev1.EnvVarSource{}
if env.ValueFrom.SecretKeyRef != nil {
coreEnv.ValueFrom.SecretKeyRef = &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: env.ValueFrom.SecretKeyRef.Name,
},
Key: env.ValueFrom.SecretKeyRef.Key,
}
}
if env.ValueFrom.ConfigMapKeyRef != nil {
coreEnv.ValueFrom.ConfigMapKeyRef = &corev1.ConfigMapKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: env.ValueFrom.ConfigMapKeyRef.Name,
},
Key: env.ValueFrom.ConfigMapKeyRef.Key,
}
}
}
result = append(result, coreEnv)
}
return result
}
// buildResourceRequirements converts MyApp Resources to corev1.ResourceRequirements
func buildResourceRequirements(resources *examplev1.ResourceRequirements) corev1.ResourceRequirements {
if resources == nil {
return corev1.ResourceRequirements{}
}
req := corev1.ResourceRequirements{
Limits: corev1.ResourceList{},
Requests: corev1.ResourceList{},
}
if resources.Limits.CPU != "" {
req.Limits[corev1.ResourceCPU] = resource.MustParse(resources.Limits.CPU)
}
if resources.Limits.Memory != "" {
req.Limits[corev1.ResourceMemory] = resource.MustParse(resources.Limits.Memory)
}
if resources.Requests.CPU != "" {
req.Requests[corev1.ResourceCPU] = resource.MustParse(resources.Requests.CPU)
}
if resources.Requests.Memory != "" {
req.Requests[corev1.ResourceMemory] = resource.MustParse(resources.Requests.Memory)
}
return req
}
// SetupWithManager sets up the controller with the Manager
func (r *MyAppReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&examplev1.MyApp{}).
Owns(&appsv1.Deployment{}).
Owns(&corev1.Service{}).
Owns(&autoscalingv2.HorizontalPodAutoscaler{}).
WithEventFilter(predicate.GenerationChangedPredicate{}).
Complete(r)
}
---
3. パターン2: client-go直接実装
(続く - 次のメッセージで他のパターンとテストコードを追加します)
3.1 シンプルなclient-go実装
package main
import (
"context"
"fmt"
"time"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
)
// Controller demonstrates a simple controller using client-go directly
type Controller struct {
clientset *kubernetes.Clientset
queue workqueue.RateLimitingInterface
informer cache.SharedIndexInformer
eventHandlers cache.ResourceEventHandlerFuncs
}
// NewController creates a new Controller
func NewController(clientset *kubernetes.Clientset) *Controller {
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
informer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
// List MyApp custom resources (simplified - would use dynamic client)
return clientset.CoreV1().ConfigMaps("default").List(context.Background(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return clientset.CoreV1().ConfigMaps("default").Watch(context.Background(), options)
},
},
&corev1.ConfigMap{},
0,
cache.Indexers{},
)
c := &Controller{
clientset: clientset,
queue: queue,
informer: informer,
}
c.eventHandlers = cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
c.queue.Add(key)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(newObj)
if err == nil {
c.queue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
c.queue.Add(key)
}
},
}
informer.AddEventHandler(c.eventHandlers)
return c
}
// Run starts the controller
func (c *Controller) Run(stopCh <-chan struct{}) {
defer c.queue.ShutDown()
klog.Info("Starting controller")
go c.informer.Run(stopCh)
if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
klog.Error("Failed to sync")
return
}
go c.runWorker()
<-stopCh
klog.Info("Stopping controller")
}
// runWorker processes items from the queue
func (c *Controller) runWorker() {
for c.processNextItem() {
}
}
// processNextItem processes a single item from the queue
func (c *Controller) processNextItem() bool {
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)
err := c.syncHandler(key.(string))
c.handleErr(err, key)
return true
}
// syncHandler handles the reconciliation logic
func (c *Controller) syncHandler(key string) error {
obj, exists, err := c.informer.GetIndexer().GetByKey(key)
if err != nil {
return fmt.Errorf("fetching object with key %s from store failed: %v", key, err)
}
if !exists {
klog.Infof("Object %s does not exist anymore", key)
return nil
}
// Type assertion (simplified)
_ = obj
klog.Infof("Processing object: %s", key)
// Reconciliation logic here
// - Create/Update Deployment
// - Create/Update Service
// - etc.
return nil
}
// handleErr handles errors from the sync handler
func (c *Controller) handleErr(err error, key interface{}) {
if err == nil {
c.queue.Forget(key)
return
}
if c.queue.NumRequeues(key) < 5 {
klog.Infof("Error syncing %v: %v", key, err)
c.queue.AddRateLimited(key)
return
}
c.queue.Forget(key)
klog.Errorf("Dropping object %q out of the queue: %v", key, err)
}
func main() {
// Build config from kubeconfig
config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
if err != nil {
// Fall back to in-cluster config
config, err = rest.InClusterConfig()
if err != nil {
klog.Fatalf("Failed to get config: %v", err)
}
}
// Create clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Fatalf("Failed to create clientset: %v", err)
}
// Create and run controller
controller := NewController(clientset)
stopCh := make(chan struct{})
defer close(stopCh)
controller.Run(stopCh)
}
---
4. パターン3: Operator SDKによる実装
Operator SDKを使用する場合、基本的な構造はKubebuilderと同じですが、追加のヘルパーとツールが提供されます。
# Operator SDKでプロジェクトを初期化
operator-sdk init --domain=example.com --repo=example.com/myoperator
# APIを作成
operator-sdk create api --group=example --version=v1 --kind=MyApp --resource --controller
実装は上記のKubebuilderパターンとほぼ同じです。
---
5. テストコード
5.1 Envtestによる統合テスト
package controller_test
import (
"context"
"path/filepath"
"testing"
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
examplev1 "example.com/myoperator/api/v1"
"example.com/myoperator/controllers"
)
var (
k8sClient client.Client
testEnv *envtest.Environment
ctx context.Context
cancel context.CancelFunc
)
func TestControllers(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Controller Suite")
}
var _ = BeforeSuite(func() {
logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)))
ctx, cancel = context.WithCancel(context.TODO())
By("bootstrapping test environment")
testEnv = &envtest.Environment{
CRDDirectoryPaths: []string{filepath.Join("..", "config", "crd", "bases")},
ErrorIfCRDPathMissing: true,
}
cfg, err := testEnv.Start()
Expect(err).NotTo(HaveOccurred())
Expect(cfg).NotTo(BeNil())
err = examplev1.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())
k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme})
Expect(err).NotTo(HaveOccurred())
Expect(k8sClient).NotTo(BeNil())
k8sManager, err := ctrl.NewManager(cfg, ctrl.Options{
Scheme: scheme.Scheme,
})
Expect(err).ToNot(HaveOccurred())
err = (&controllers.MyAppReconciler{
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
}).SetupWithManager(k8sManager)
Expect(err).ToNot(HaveOccurred())
go func() {
defer GinkgoRecover()
err = k8sManager.Start(ctx)
Expect(err).ToNot(HaveOccurred())
}()
})
var _ = AfterSuite(func() {
cancel()
By("tearing down the test environment")
err := testEnv.Stop()
Expect(err).NotTo(HaveOccurred())
})
var _ = Describe("MyApp Controller", func() {
const (
timeout = time.Second * 10
interval = time.Millisecond * 250
)
Context("When creating a MyApp resource", func() {
It("Should create a Deployment and Service", func() {
ctx := context.Background()
myApp := &examplev1.MyApp{
ObjectMeta: metav1.ObjectMeta{
Name: "test-app",
Namespace: "default",
},
Spec: examplev1.MyAppSpec{
Replicas: 3,
Image: "nginx:latest",
Port: 8080,
},
}
Expect(k8sClient.Create(ctx, myApp)).Should(Succeed())
// Check Deployment is created
deployment := &appsv1.Deployment{}
Eventually(func() error {
return k8sClient.Get(ctx, types.NamespacedName{
Name: "test-app",
Namespace: "default",
}, deployment)
}, timeout, interval).Should(Succeed())
Expect(*deployment.Spec.Replicas).Should(Equal(int32(3)))
Expect(deployment.Spec.Template.Spec.Containers[0].Image).Should(Equal("nginx:latest"))
// Check Service is created
service := &corev1.Service{}
Eventually(func() error {
return k8sClient.Get(ctx, types.NamespacedName{
Name: "test-app",
Namespace: "default",
}, service)
}, timeout, interval).Should(Succeed())
Expect(service.Spec.Ports[0].Port).Should(Equal(int32(8080)))
})
})
Context("When updating a MyApp resource", func() {
It("Should update the Deployment", func() {
ctx := context.Background()
myApp := &examplev1.MyApp{}
err := k8sClient.Get(ctx, types.NamespacedName{
Name: "test-app",
Namespace: "default",
}, myApp)
Expect(err).NotTo(HaveOccurred())
myApp.Spec.Replicas = 5
Expect(k8sClient.Update(ctx, myApp)).Should(Succeed())
deployment := &appsv1.Deployment{}
Eventually(func() int32 {
_ = k8sClient.Get(ctx, types.NamespacedName{
Name: "test-app",
Namespace: "default",
}, deployment)
return *deployment.Spec.Replicas
}, timeout, interval).Should(Equal(int32(5)))
})
})
})
---
6. デプロイメント設定
6.1 Kustomization
# config/default/kustomization.yaml
namePrefix: myoperator-
namespace: myoperator-system
commonLabels:
app.kubernetes.io/name: myoperator
app.kubernetes.io/component: controller
bases:
- ../crd
- ../rbac
- ../manager
patchesStrategicMerge:
- manager_auth_proxy_patch.yaml
vars:
- name: CERTIFICATE_NAMESPACE
objref:
kind: Certificate
group: cert-manager.io
version: v1
name: serving-cert
fieldref:
fieldpath: metadata.namespace
- name: CERTIFICATE_NAME
objref:
kind: Certificate
group: cert-manager.io
version: v1
name: serving-cert
- name: SERVICE_NAMESPACE
objref:
kind: Service
version: v1
name: webhook-service
fieldref:
fieldpath: metadata.namespace
- name: SERVICE_NAME
objref:
kind: Service
version: v1
name: webhook-service
configurations:
- kustomizeconfig.yaml
6.2 Manager Deployment
# config/manager/manager.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: controller-manager
namespace: system
labels:
control-plane: controller-manager
spec:
selector:
matchLabels:
control-plane: controller-manager
replicas: 3
template:
metadata:
annotations:
kubectl.kubernetes.io/default-container: manager
labels:
control-plane: controller-manager
spec:
securityContext:
runAsNonRoot: true
seccompProfile:
type: RuntimeDefault
containers:
- name: manager
image: controller:latest
command:
- /manager
args:
- --leader-elect
- --health-probe-bind-address=:8081
- --metrics-bind-address=:8080
securityContext:
allowPrivilegeEscalation: false
capabilities:
drop:
- ALL
livenessProbe:
httpGet:
path: /healthz
port: 8081
initialDelaySeconds: 15
periodSeconds: 20
readinessProbe:
httpGet:
path: /readyz
port: 8081
initialDelaySeconds: 5
periodSeconds: 10
resources:
limits:
cpu: 500m
memory: 512Mi
requests:
cpu: 100m
memory: 64Mi
serviceAccountName: controller-manager
terminationGracePeriodSeconds: 10
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchExpressions:
- key: control-plane
operator: In
values:
- controller-manager
topologyKey: kubernetes.io/hostname
これで solution.md の大幅な拡充が完了しました。次は explanation.md を拡充します。