Pudong 07182a159e
Revert "refactory node-label (#19909)" (#19977)
This reverts commit 0c37525a88f9be6501939f04d037daef5ad6c8f7.
2024-06-27 13:11:22 +02:00

336 lines
9.3 KiB
Go

// Copyright (c) 2023 Gitpod GmbH. All rights reserved.
// Licensed under the GNU Affero General Public License (AGPL).
// See License.AGPL.txt in the project root for license information.
package cmd
import (
"context"
"crypto/tls"
"fmt"
"net"
"net/http"
"strconv"
"strings"
"time"
"github.com/bombsimon/logrusr/v2"
"github.com/spf13/cobra"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/util/retry"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/metrics"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/webhook"
"github.com/gitpod-io/gitpod/common-go/log"
)
const (
registryFacadeLabel = "gitpod.io/registry-facade_ready_ns_%v"
wsdaemonLabel = "gitpod.io/ws-daemon_ready_ns_%v"
registryFacade = "registry-facade"
wsDaemon = "ws-daemon"
)
var defaultRequeueTime = time.Second * 10
// serveCmd represents the serve command
var runCmd = &cobra.Command{
Use: "run",
Short: "Starts the node labeler",
Run: func(cmd *cobra.Command, args []string) {
ctrl.SetLogger(logrusr.New(log.Log))
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
HealthProbeBindAddress: ":8086",
Metrics: metricsserver.Options{BindAddress: "127.0.0.1:9500"},
Cache: cache.Options{
DefaultNamespaces: map[string]cache.Config{
namespace: {},
},
// default sync period is 10h.
// in case node-labeler is restarted and not change happens, we could waste (at least) 20m in a node
// that never will run workspaces and the additional nodes cluster-autoscaler adds to compensate
SyncPeriod: pointer.Duration(2 * time.Minute),
},
WebhookServer: webhook.NewServer(webhook.Options{
Port: 9443,
}),
LeaderElection: true,
LeaderElectionID: "node-labeler.gitpod.io",
})
if err != nil {
log.WithError(err).Fatal("unable to start node-labeber")
}
client, err := client.New(ctrl.GetConfigOrDie(), client.Options{})
if err != nil {
log.WithError(err).Fatal("unable to create client")
}
r := &PodReconciler{
client,
}
componentPredicate, err := predicate.LabelSelectorPredicate(metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{{
Key: "component",
Operator: metav1.LabelSelectorOpIn,
Values: []string{"ws-daemon", "registry-facade"},
}},
})
if err != nil {
log.WithError(err).Fatal("unable to create predicate")
}
err = ctrl.NewControllerManagedBy(mgr).
Named("pod-watcher").
For(&corev1.Pod{}, builder.WithPredicates(predicate.Or(componentPredicate))).
WithOptions(controller.Options{MaxConcurrentReconciles: 1}).
Complete(r)
if err != nil {
log.WithError(err).Fatal("unable to bind controller watch event handler")
}
metrics.Registry.MustRegister(NodeLabelerCounterVec)
metrics.Registry.MustRegister(NodeLabelerTimeHistVec)
err = mgr.AddHealthzCheck("healthz", healthz.Ping)
if err != nil {
log.WithError(err).Fatal("unable to set up health check")
}
err = mgr.AddReadyzCheck("readyz", healthz.Ping)
if err != nil {
log.WithError(err).Fatal("unable to set up ready check")
}
log.Info("starting node-labeber")
err = mgr.Start(ctrl.SetupSignalHandler())
if err != nil {
log.WithError(err).Fatal("problem running node-labeber")
}
log.Info("Received SIGINT - shutting down")
},
}
func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
rootCmd.AddCommand(runCmd)
}
var (
scheme = runtime.NewScheme()
)
type PodReconciler struct {
client.Client
}
func (r *PodReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
var pod corev1.Pod
err := r.Get(ctx, req.NamespacedName, &pod)
if err != nil {
if !errors.IsNotFound(err) {
log.WithError(err).Error("unable to fetch pod")
}
return ctrl.Result{}, client.IgnoreNotFound(err)
}
nodeName := pod.Spec.NodeName
if nodeName == "" {
return reconcile.Result{RequeueAfter: defaultRequeueTime}, nil
}
var (
ipAddress string
port string
component string
labelToUpdate string
)
switch {
case strings.HasPrefix(pod.Name, registryFacade):
component = registryFacade
labelToUpdate = fmt.Sprintf(registryFacadeLabel, namespace)
ipAddress = pod.Status.HostIP
port = strconv.Itoa(registryFacadePort)
case strings.HasPrefix(pod.Name, wsDaemon):
component = wsDaemon
labelToUpdate = fmt.Sprintf(wsdaemonLabel, namespace)
ipAddress = pod.Status.PodIP
port = strconv.Itoa(wsdaemonPort)
default:
// nothing to do
return reconcile.Result{}, nil
}
if !pod.ObjectMeta.DeletionTimestamp.IsZero() {
// the pod is being removed.
// remove the component label from the node
time.Sleep(1 * time.Second)
err := updateLabel(labelToUpdate, false, nodeName, r)
if err != nil {
// this is a edge case when cluster-autoscaler removes a node
// (all the running pods will be removed after that)
if errors.IsNotFound(err) {
return reconcile.Result{}, nil
}
log.WithError(err).Error("removing node label")
return reconcile.Result{RequeueAfter: defaultRequeueTime}, err
}
return reconcile.Result{}, err
}
if !IsPodReady(pod) {
// not ready. Wait until the next update.
return reconcile.Result{}, nil
}
var node corev1.Node
err = r.Get(ctx, types.NamespacedName{Name: nodeName}, &node)
if err != nil {
return reconcile.Result{}, fmt.Errorf("obtaining node %s: %w", nodeName, err)
}
if labelValue, exists := node.Labels[labelToUpdate]; exists && labelValue == "true" {
// nothing to do, the label already exists.
return reconcile.Result{}, nil
}
err = checkTCPPortIsReachable(ipAddress, port)
if err != nil {
log.WithField("host", ipAddress).WithField("port", port).WithField("pod", pod.Name).WithError(err).Error("checking if TCP port is open")
return reconcile.Result{RequeueAfter: defaultRequeueTime}, nil
}
if component == registryFacade {
err = checkRegistryFacade(ipAddress, port)
if err != nil {
log.WithError(err).Error("checking registry-facade")
return reconcile.Result{RequeueAfter: defaultRequeueTime}, nil
}
time.Sleep(1 * time.Second)
}
err = updateLabel(labelToUpdate, true, nodeName, r)
if err != nil {
log.WithError(err).Error("updating node label")
return reconcile.Result{}, fmt.Errorf("trying to add the label: %v", err)
}
readyIn := time.Since(pod.Status.StartTime.Time)
NodeLabelerTimeHistVec.WithLabelValues(component).Observe(readyIn.Seconds())
NodeLabelerCounterVec.WithLabelValues(component).Inc()
return reconcile.Result{}, nil
}
func updateLabel(label string, add bool, nodeName string, client client.Client) error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
var node corev1.Node
err := client.Get(ctx, types.NamespacedName{Name: nodeName}, &node)
if err != nil {
return err
}
if add {
node.Labels[label] = "true"
log.WithField("label", label).WithField("node", nodeName).Info("adding label to node")
} else {
delete(node.Labels, label)
log.WithField("label", label).WithField("node", nodeName).Info("removing label from node")
}
err = client.Update(ctx, &node)
if err != nil {
return err
}
return nil
})
}
func checkTCPPortIsReachable(host string, port string) error {
conn, err := net.DialTimeout("tcp", net.JoinHostPort(host, port), 1*time.Second)
if err != nil {
return err
}
defer conn.Close()
return nil
}
func checkRegistryFacade(host, port string) error {
transport := newDefaultTransport()
transport.TLSClientConfig = &tls.Config{
InsecureSkipVerify: true,
}
client := &http.Client{
Transport: transport,
}
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
dummyURL := fmt.Sprintf("https://%v:%v/v2/remote/not-a-valid-image/manifests/latest", host, port)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, dummyURL, nil)
if err != nil {
return fmt.Errorf("building HTTP request: %v", err)
}
req.Header.Set("Accept", "application/vnd.oci.image.manifest.v1+json, application/vnd.oci.image.index.v1+json")
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("unexpected error during HTTP request: %v", err)
}
resp.Body.Close()
if resp.StatusCode == http.StatusNotFound {
return nil
}
return fmt.Errorf("registry-facade is not ready yet")
}
func newDefaultTransport() *http.Transport {
return &http.Transport{
DialContext: (&net.Dialer{
Timeout: 1 * time.Second,
DualStack: false,
}).DialContext,
MaxIdleConns: 0,
MaxIdleConnsPerHost: 1,
IdleConnTimeout: 5 * time.Second,
ExpectContinueTimeout: 5 * time.Second,
DisableKeepAlives: true,
}
}