From 07ba12708d202189fd4d75b20e461267148331cc Mon Sep 17 00:00:00 2001 From: Anton Bykov Date: Wed, 30 Oct 2024 14:31:47 +0200 Subject: [PATCH 1/3] feat: remote traces viewer --- build/taskmon/Dockerfile | 8 + build/taskmon/entrypoint.sh | 6 + build/taskmon/manual_buildx_push.sh | 15 + build/taskmon/readme.md | 1 + .../weka.weka.io_wekamanualoperations.yaml | 21 + examples/incluster-taskmon.yaml | 16 + .../controllers/operations/trace_session.go | 483 ++++++++++++++++++ internal/controllers/resources/cluster.go | 28 + .../wekamanualoperation_controller.go | 13 + internal/pkg/lifecycle/lifecycle.go | 15 +- internal/services/weka.go | 23 + internal/services/weka_cluster.go | 7 - pkg/util/refs.go | 4 + pkg/weka-k8s-api | 2 +- 14 files changed, 631 insertions(+), 11 deletions(-) create mode 100644 build/taskmon/Dockerfile create mode 100755 build/taskmon/entrypoint.sh create mode 100755 build/taskmon/manual_buildx_push.sh create mode 100644 build/taskmon/readme.md create mode 100644 examples/incluster-taskmon.yaml create mode 100644 internal/controllers/operations/trace_session.go diff --git a/build/taskmon/Dockerfile b/build/taskmon/Dockerfile new file mode 100644 index 000000000..b247b0945 --- /dev/null +++ b/build/taskmon/Dockerfile @@ -0,0 +1,8 @@ +ARG BASE_IMAGE +FROM 389791687681.dkr.ecr.eu-west-1.amazonaws.com/taskmon:$BASE_IMAGE AS taskmon + +FROM alpine:3.20.3 +COPY --link --from=taskmon / /taskmon +COPY --link --from=taskmon /configs /configs +COPY --link entrypoint.sh /entrypoint.sh +ENTRYPOINT "/entrypoint.sh" \ No newline at end of file diff --git a/build/taskmon/entrypoint.sh b/build/taskmon/entrypoint.sh new file mode 100755 index 000000000..900d88d88 --- /dev/null +++ b/build/taskmon/entrypoint.sh @@ -0,0 +1,6 @@ +#!/bin/sh + +set -e + +env | grep -i TASKMON +exec /taskmon/taskmon start \ No newline at end of file diff --git a/build/taskmon/manual_buildx_push.sh b/build/taskmon/manual_buildx_push.sh new file mode 100755 index 000000000..600190fe4 --- /dev/null +++ b/build/taskmon/manual_buildx_push.sh @@ -0,0 +1,15 @@ +#!/bin/zsh + + +set -e + +BASE_IMAGE=6d3eb658e900b4d9_x86_64 +VERSION=0.0.9 + +TARGET_IMAGE=quay.io/weka.io/taskmon:$VERSION-$BASE_IMAGE + + +docker buildx build --build-arg BASE_IMAGE=$BASE_IMAGE --push --platform linux/amd64 -t $TARGET_IMAGE . + + +echo $TARGET_IMAGE is built diff --git a/build/taskmon/readme.md b/build/taskmon/readme.md new file mode 100644 index 000000000..84272f3b0 --- /dev/null +++ b/build/taskmon/readme.md @@ -0,0 +1 @@ +# intended for manual bump \ No newline at end of file diff --git a/charts/weka-operator/crds/weka.weka.io_wekamanualoperations.yaml b/charts/weka-operator/crds/weka.weka.io_wekamanualoperations.yaml index b74174624..fcf1437e0 100644 --- a/charts/weka-operator/crds/weka.weka.io_wekamanualoperations.yaml +++ b/charts/weka-operator/crds/weka.weka.io_wekamanualoperations.yaml @@ -83,6 +83,27 @@ spec: type: string type: object type: object + remoteTracesSessionPayload: + properties: + cluster: + properties: + name: + type: string + namespace: + type: string + required: + - name + - namespace + type: object + duration: + type: string + nodeSelector: + additionalProperties: + type: string + type: object + wekahomeEndpointOverride: + type: string + type: object signDrivesPayload: properties: devicePaths: diff --git a/examples/incluster-taskmon.yaml b/examples/incluster-taskmon.yaml new file mode 100644 index 000000000..79800cc6b --- /dev/null +++ b/examples/incluster-taskmon.yaml @@ -0,0 +1,16 @@ +apiVersion: weka.weka.io/v1alpha1 +kind: WekaManualOperation +metadata: + name: remote-traces-session + namespace: weka-operator-system +spec: + action: "remote-traces-session" + image: quay.io/weka.io/taskmon:0.0.9-6d3eb658e900b4d9_x86_64 + imagePullSecret: "quay-io-robot-secret" + payload: + remoteTracesSessionPayload: + cluster: + name: cluster-dev + namespace: weka-operator-system + duration: 5m + wekahomeEndpointOverride: 10.0.30.81:30052 # direct port of nodeport service, TBD to wrap via envoy and then ALB ingress diff --git a/internal/controllers/operations/trace_session.go b/internal/controllers/operations/trace_session.go new file mode 100644 index 000000000..3ceb1a1de --- /dev/null +++ b/internal/controllers/operations/trace_session.go @@ -0,0 +1,483 @@ +package operations + +import ( + "context" + "encoding/json" + "github.com/pkg/errors" + weka "github.com/weka/weka-k8s-api/api/v1alpha1" + "github.com/weka/weka-operator/internal/controllers/resources" + "github.com/weka/weka-operator/internal/pkg/lifecycle" + "github.com/weka/weka-operator/internal/services" + "github.com/weka/weka-operator/internal/services/discovery" + "github.com/weka/weka-operator/internal/services/exec" + "github.com/weka/weka-operator/pkg/util" + apps "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "strconv" + "time" +) + +type MaintainTraceSession struct { + cluster *weka.WekaCluster + wekaHomeEndpoint string + payload *weka.RemoteTracesSessionConfig + mgr ctrl.Manager + ownerRef client.Object + containerDetails weka.WekaContainerDetails + deployment *apps.Deployment + containers []*weka.WekaContainer +} + +func NewMaintainTraceSession(mgr ctrl.Manager, payload *weka.RemoteTracesSessionConfig, ownerRef client.Object, containerDetails weka.WekaContainerDetails) *MaintainTraceSession { + return &MaintainTraceSession{ + payload: payload, + mgr: mgr, + ownerRef: ownerRef, + containerDetails: containerDetails, + } +} + +func (o *MaintainTraceSession) AsStep() lifecycle.Step { + return lifecycle.Step{ + Name: "MaintainTraceSession", + Run: AsRunFunc(o), + } +} + +func (o *MaintainTraceSession) GetSteps() []lifecycle.Step { + return []lifecycle.Step{ + {Name: "FetchCluster", Run: o.FetchCluster}, + {Name: "DeduceWekaHomeUrl", Run: o.DeduceWekaHomeUrl}, + {Name: "EnsureSecret", Run: o.EnsureSecret}, + {Name: "EnsureWekaNodeRoutingConfigMap", Run: o.EnsureWekaNodeRoutingConfigMap}, + {Name: "EnsureK8sContainerRoutingConfigMap", Run: o.EnsureK8sContainerRoutingConfigMap}, + {Name: "EnsureDeployment", Run: o.EnsureDeployment}, + {Name: "WaitTillExpiration", Run: o.WaitTillExpiration}, + } +} + +func (o *MaintainTraceSession) GetJsonResult() string { + //TODO implement me + panic("implement me") +} + +func (o *MaintainTraceSession) FetchCluster(ctx context.Context) error { + c := o.mgr.GetClient() + cluster := &weka.WekaCluster{} + err := c.Get(ctx, client.ObjectKey{Name: o.payload.Cluster.Name, Namespace: o.payload.Cluster.Namespace}, cluster) + if err != nil { + return err + } + o.cluster = cluster + return nil +} + +func (o *MaintainTraceSession) DeduceWekaHomeUrl(ctx context.Context) error { + whEndpoint := "https://api.home.weka.io" + defer func() { + o.wekaHomeEndpoint = whEndpoint + }() + if o.payload.WekahomeEndpointOverride != "" { + whEndpoint = o.payload.WekahomeEndpointOverride + return nil + } + // TODO: Attempt to fetch from cluster itself(!) + if o.cluster.Spec.WekaHome != nil { + if o.cluster.Spec.WekaHome.Endpoint != "" { + whEndpoint = o.cluster.Spec.WekaHome.Endpoint + return nil + } + } + return nil +} + +func (o *MaintainTraceSession) EnsureDeployment(ctx context.Context) error { + // create deployment object and apply it + // generate token and store it in environment variable of pod + labels := o.ownerRef.GetLabels() + if len(labels) == 0 { + labels = map[string]string{} + } + labels["app"] = "weka-trace-session" + labels["weka.io/cluster-id"] = string(o.cluster.GetUID()) + + deployment := apps.Deployment{ + ObjectMeta: ctrl.ObjectMeta{ + Name: "trace-session-" + o.ownerRef.GetName(), + Namespace: o.ownerRef.GetNamespace(), + Labels: labels, + }, + Spec: apps.DeploymentSpec{ + Replicas: util.Int32Ref(1), + Selector: &meta.LabelSelector{ + MatchLabels: map[string]string{ + "app": "weka-trace-session", + "weka.io/cluster-id": string(o.cluster.GetUID()), + }, + }, + Template: v1.PodTemplateSpec{ + ObjectMeta: ctrl.ObjectMeta{ + Labels: labels, + }, + Spec: v1.PodSpec{ + ImagePullSecrets: []v1.LocalObjectReference{ + {Name: o.containerDetails.ImagePullSecret}, + }, + Tolerations: o.containerDetails.Tolerations, + NodeSelector: o.payload.NodeSelector, + Containers: []v1.Container{ + { + Name: "weka-trace-session", + Image: o.containerDetails.Image, + Command: []string{"/entrypoint.sh"}, + Env: []v1.EnvVar{ + { + Name: "TASKMON_WEKA_HOME_TRACE_STREAMER_ADDRESS", + Value: o.wekaHomeEndpoint, + }, + { + Name: "TASKMON_TRACE_SERVER_VERIFY_TLS", + Value: "false", + }, + { + Name: "TASKMON_TRACE_SERVER_TLS", + Value: "true", + }, + { + Name: "TASKMON_SESSION_TOKEN_LOADER_TYPE", + Value: "file", + }, + { + Name: "TASKMON_SESSION_TOKEN_LOADER_PATH", + Value: "/var/run/secrets/weka-operator/trace-session/token", + }, + { + Name: "TASKMON_WEKA_HOME_TRACE_STREAMER_TLS", + Value: "false", + }, + { + Name: "TASKMON_NODE_AND_CONTAINER_SERVER_DISCOVERY_TYPE", + Value: "file", + }, + { + Name: "TASKMON_NODE_AND_CONTAINER_SERVER_DISCOVERY_PATH", + Value: "/weka-runtime/weka-trace-routing/trace-routing.json", + }, + { + Name: "TASKMON_K8S_SERVER_DISCOVERY_TYPE", + Value: "file", + }, + { + Name: "TASKMON_K8S_SERVER_DISCOVERY_PATH", + Value: "/weka-runtime/k8s-trace-routing/k8s-routing.json", + }, + }, + VolumeMounts: []v1.VolumeMount{ + { + Name: "token", + MountPath: "/var/run/secrets/weka-operator/trace-session", + }, + { + Name: "weka-trace-routing", + MountPath: "/weka-runtime/weka-trace-routing", + }, + { + Name: "k8s-trace-routing", + MountPath: "/weka-runtime/k8s-trace-routing", + }, + }, + }, + }, + Volumes: []v1.Volume{ + { + Name: "token", + VolumeSource: v1.VolumeSource{ + Secret: &v1.SecretVolumeSource{ + SecretName: o.getSecretName(), + }, + }, + }, + { + Name: "weka-trace-routing", + VolumeSource: v1.VolumeSource{ + ConfigMap: &v1.ConfigMapVolumeSource{ + LocalObjectReference: v1.LocalObjectReference{ + Name: o.getWekaRoutingKeyName(), + }, + }, + }, + }, + { + Name: "k8s-trace-routing", + VolumeSource: v1.VolumeSource{ + ConfigMap: &v1.ConfigMapVolumeSource{ + LocalObjectReference: v1.LocalObjectReference{ + Name: o.getK8sRoutingKeyName(), + }, + }, + }, + }, + }, + }, + }, + }, + } + err := ctrl.SetControllerReference(o.ownerRef, &deployment, o.mgr.GetScheme()) + if err != nil { + return err + } + + err = o.mgr.GetClient().Create(ctx, &deployment) + if err != nil { + if alreadyExists := client.IgnoreAlreadyExists(err); alreadyExists == nil { + //fetch current deployment + err = o.mgr.GetClient().Get(ctx, client.ObjectKey{Name: deployment.Name, Namespace: deployment.Namespace}, &deployment) + if err != nil { + return err + } + o.deployment = &deployment + return nil + } + return err + } + o.deployment = &deployment + return nil +} + +func (o *MaintainTraceSession) EnsureSecret(ctx context.Context) error { + // create secret object and apply it + // generate token and store it in secret + secret := v1.Secret{ + ObjectMeta: ctrl.ObjectMeta{ + Name: o.getSecretName(), + Namespace: o.ownerRef.GetNamespace(), + Labels: o.ownerRef.GetLabels(), + }, + StringData: map[string]string{ + "token": util.GeneratePassword(128), + }, + } + err := ctrl.SetControllerReference(o.ownerRef, &secret, o.mgr.GetScheme()) + if err != nil { + return err + } + + err = o.mgr.GetClient().Create(ctx, &secret) + if err != nil { + if alreadyExists := client.IgnoreAlreadyExists(err); alreadyExists == nil { + //fetch current secret + err = o.mgr.GetClient().Get(ctx, client.ObjectKey{Name: secret.Name, Namespace: secret.Namespace}, &secret) + if err != nil { + return err + } + return nil + } + return err + } + return nil + +} + +func (o *MaintainTraceSession) getSecretName() string { + return "weka-trace-session-" + o.ownerRef.GetName() +} + +type ContainerInfo struct { + Name string `json:"name"` + TraceServerEndpoint string `json:"trace_server_endpoint"` +} + +type TraceServerDiscovery struct { + Nodes map[int]NodeInfo `json:"nodes"` + Containers map[int]ContainerInfo `json:"containers"` +} + +type PodDiscovery struct { + Pods map[string]ContainerInfo `json:"pods"` +} + +type NodeInfo struct { + Slot int `json:"slot"` + ContainerID int `json:"container_id"` +} + +// func GetTraceInfoForContainer(ctx context.Context, container weka.WekaContainer) (*ContainerInfo, error) { +// services.NewWekaService() +// // fetch container info from config map +// } +func (o *MaintainTraceSession) EnsureWekaNodeRoutingConfigMap(ctx context.Context) error { + // this config map serves as simplified discovery mechanism, this one contains map of weka node id to weka container id + // in addition to that it contains map of container id to container name + base port + trace server port + // this way trace viewer proxy can discover trace server for given container + execService := exec.NewExecService(o.mgr.GetConfig()) + // fetch all processes + err := o.ensureClusterContainers(ctx) + if err != nil { + return err + } + container, err := resources.SelectActiveContainerWithRole(ctx, o.containers, "compute") + wekaService := services.NewWekaService(execService, container) + processes, err := wekaService.GetProcesses(ctx) + if err != nil { + return err + } + + data := &TraceServerDiscovery{ + Nodes: make(map[int]NodeInfo), + Containers: make(map[int]ContainerInfo), + } + + for _, process := range processes { + nodeId, err := resources.NodeIdToInt(process.NodeId) + if err != nil { + return err + } + containerId, err := resources.HostIdToContainerId(process.NodeInfo.HostId) + if err != nil { + return err + } + data.Nodes[nodeId] = NodeInfo{ + Slot: process.NodeInfo.Slot, + ContainerID: containerId, + } + if _, ok := data.Containers[containerId]; !ok { + data.Containers[containerId] = ContainerInfo{ + Name: process.NodeInfo.ContainerName, + TraceServerEndpoint: process.NodeInfo.ManagementIps[0] + ":" + strconv.Itoa(process.NodeInfo.ManagementPort+50), + } + } + } + + jsonBytes, err := json.Marshal(data) + if err != nil { + return err + } + + configMap := v1.ConfigMap{ + ObjectMeta: ctrl.ObjectMeta{ + Name: o.getWekaRoutingKeyName(), + Namespace: o.ownerRef.GetNamespace(), + }, + BinaryData: map[string][]byte{ + "trace-routing.json": jsonBytes, + }, + } + + err = ctrl.SetControllerReference(o.ownerRef, &configMap, o.mgr.GetScheme()) + if err != nil { + return err + } + + err = o.mgr.GetClient().Create(ctx, &configMap) + if err != nil { + if alreadyExists := client.IgnoreAlreadyExists(err); alreadyExists == nil { + //fetch current config map + err = o.mgr.GetClient().Get(ctx, client.ObjectKey{Name: configMap.Name, Namespace: configMap.Namespace}, &configMap) + if err != nil { + return err + } + // update current config map + configMap.BinaryData["trace-routing.json"] = jsonBytes + err = o.mgr.GetClient().Update(ctx, &configMap) + // TODO: check if we need to update config map to begin with, while json wont maintain order of elements, original struct should + if err != nil { + return err + } + return nil + } + return err + } + return nil +} + +func (o *MaintainTraceSession) getWekaRoutingKeyName() string { + return "weka-trace-routing-" + o.ownerRef.GetName() +} + +func (o *MaintainTraceSession) getK8sRoutingKeyName() string { + return "weka-k8s-trace-routing-" + o.ownerRef.GetName() +} + +func (o *MaintainTraceSession) EnsureK8sContainerRoutingConfigMap(ctx context.Context) error { + // another discovery mechanism, this one will map k8s pod name to endpoint of trace server that serves that pod + err := o.ensureClusterContainers(ctx) + if err != nil { + return err + } + data := &PodDiscovery{ + Pods: make(map[string]ContainerInfo), + } + for _, container := range o.containers { + data.Pods[container.Name] = ContainerInfo{ + Name: container.Spec.WekaContainerName, + TraceServerEndpoint: container.Status.ManagementIP + ":" + strconv.Itoa(container.GetPort()+50), + } + } + + jsonBytes, err := json.Marshal(data) + if err != nil { + return err + } + + configMap := v1.ConfigMap{ + ObjectMeta: ctrl.ObjectMeta{ + Name: o.getK8sRoutingKeyName(), + Namespace: o.ownerRef.GetNamespace(), + Labels: o.ownerRef.GetLabels(), + }, + BinaryData: map[string][]byte{ + "k8s-routing.json": jsonBytes, + }, + } + + err = ctrl.SetControllerReference(o.ownerRef, &configMap, o.mgr.GetScheme()) + if err != nil { + return err + } + + err = o.mgr.GetClient().Create(ctx, &configMap) + if err != nil { + if alreadyExists := client.IgnoreAlreadyExists(err); alreadyExists == nil { + //fetch current config map + err = o.mgr.GetClient().Get(ctx, client.ObjectKey{Name: configMap.Name, Namespace: configMap.Namespace}, &configMap) + if err != nil { + return err + } + // update current config map + configMap.BinaryData["k8s-routing.json"] = jsonBytes + err = o.mgr.GetClient().Update(ctx, &configMap) + if err != nil { + return err + } + } + } + return nil +} + +func (o *MaintainTraceSession) ensureClusterContainers(ctx context.Context) error { + if len(o.containers) == 0 { + o.containers = discovery.GetClusterContainers(ctx, o.mgr.GetClient(), o.cluster, "") + } + if len(o.containers) == 0 { + return errors.New("no cluster containers found") + } + return nil +} + +func (o *MaintainTraceSession) WaitTillExpiration(ctx context.Context) error { + // wait till expiration of trace session + startTime := o.ownerRef.GetCreationTimestamp() + expirationTime := startTime.Add(o.payload.Duration.Duration) + if meta.Now().After(expirationTime) { + return nil + } + + sleepFor := time.Minute + if expirationTime.Sub(time.Now()) < sleepFor { + sleepFor = expirationTime.Sub(time.Now()) + } + return lifecycle.NewWaitErrorWithDuration(errors.New("waiting for session expiration"), sleepFor) +} diff --git a/internal/controllers/resources/cluster.go b/internal/controllers/resources/cluster.go index 597527180..f90623236 100644 --- a/internal/controllers/resources/cluster.go +++ b/internal/controllers/resources/cluster.go @@ -1,6 +1,9 @@ package resources import ( + "context" + "fmt" + wekav1alpha1 "github.com/weka/weka-k8s-api/api/v1alpha1" "strconv" "strings" ) @@ -32,3 +35,28 @@ func HostIdToContainerId(hostId string) (int, error) { } return id, nil } + +func NodeIdToInt(nodeId string) (int, error) { + nodeId = strings.Replace(nodeId, "NodeId<", "", 1) + nodeId = strings.Replace(nodeId, ">", "", 1) + id, err := strconv.Atoi(nodeId) + if err != nil { + return 0, err + } + return id, nil +} + +func SelectActiveContainerWithRole(ctx context.Context, containers []*wekav1alpha1.WekaContainer, role string) (*wekav1alpha1.WekaContainer, error) { + for _, container := range containers { + if container.Spec.Mode != role { + continue + } + if container.Status.ClusterContainerID == nil { + continue + } + return container, nil + } + + err := fmt.Errorf("no container with role %s found", role) + return nil, err +} diff --git a/internal/controllers/wekamanualoperation_controller.go b/internal/controllers/wekamanualoperation_controller.go index 36d64a6b8..14498058e 100644 --- a/internal/controllers/wekamanualoperation_controller.go +++ b/internal/controllers/wekamanualoperation_controller.go @@ -107,6 +107,19 @@ func (r *WekaManualOperationReconciler) Reconcile(ctx context.Context, req ctrl. true, ) loop.Op = discoverDrivesOp + case "remote-traces-session": + remoteTracesOp := operations.NewMaintainTraceSession( + r.Mgr, + wekaManualOperation.Spec.Payload.RemoteTracesSessionConfig, + wekaManualOperation, + weka.WekaContainerDetails{ + Image: wekaManualOperation.Spec.Image, + ImagePullSecret: wekaManualOperation.Spec.ImagePullSecret, + Tolerations: wekaManualOperation.Spec.Tolerations, + Labels: wekaManualOperation.ObjectMeta.GetLabels(), + }, + ) + loop.Op = remoteTracesOp default: return ctrl.Result{}, fmt.Errorf("unknown operation type: %s", wekaManualOperation.Spec.Action) } diff --git a/internal/pkg/lifecycle/lifecycle.go b/internal/pkg/lifecycle/lifecycle.go index 9a3f67c06..ca11b7d37 100644 --- a/internal/pkg/lifecycle/lifecycle.go +++ b/internal/pkg/lifecycle/lifecycle.go @@ -91,7 +91,8 @@ type AbortedByPredicate struct { } type WaitError struct { - Err error + Duration time.Duration + Err error } func (w WaitError) Error() string { @@ -102,6 +103,10 @@ func NewWaitError(err error) error { return &WaitError{Err: err} } +func NewWaitErrorWithDuration(err error, duration time.Duration) error { + return &WaitError{Err: err, Duration: duration} +} + type ReconciliationError struct { Err error Subject metav1.Object @@ -301,9 +306,13 @@ func (r *ReconciliationSteps) RunAsReconcilerResponse(ctx context.Context) (ctrl if lastUnpacked == nil { break } - if _, ok := lastUnpacked.Err.(*WaitError); ok { + if waitError, ok := lastUnpacked.Err.(*WaitError); ok { logger.Info("waiting for conditions to be met", "error", err) - return ctrl.Result{RequeueAfter: 3 * time.Second}, nil + sleepDuration := 3 * time.Second + if waitError.Duration > 0 { + sleepDuration = waitError.Duration + } + return ctrl.Result{RequeueAfter: sleepDuration}, nil } if _, ok := lastUnpacked.Err.(*AbortedByPredicate); ok { logger.Info("aborted by predicate", "error", err) diff --git a/internal/services/weka.go b/internal/services/weka.go index 99166e025..2b85fe1b5 100644 --- a/internal/services/weka.go +++ b/internal/services/weka.go @@ -60,6 +60,17 @@ type DriveListOptions struct { ContainerId *int `json:"container_id"` } +type Process struct { + NodeId string `json:"node_id"` + NodeInfo struct { + HostId string `json:"host_id"` + ContainerName string `json:"container_name"` + Slot int `json:"slot"` + ManagementIps []string `json:"mgmt_ips"` + ManagementPort int `json:"mgmt_port"` + } +} + type WekaService interface { GetWekaStatus(ctx context.Context) (WekaStatusResponse, error) CreateFilesystem(ctx context.Context, name, group string, params FSParams) error @@ -72,6 +83,7 @@ type WekaService interface { EnsureNoUser(ctx context.Context, username string) error SetWekaHome(ctx context.Context, WekaHomeConfig v1alpha1.WekaHomeConfig) error ListDrives(ctx context.Context, listOptions DriveListOptions) ([]Drive, error) + GetProcesses(ctx context.Context) ([]Process, error) //GetFilesystemByName(ctx context.Context, name string) (WekaFilesystem, error) } @@ -99,6 +111,17 @@ type CliWekaService struct { Container *v1alpha1.WekaContainer } +func (c *CliWekaService) GetProcesses(ctx context.Context) ([]Process, error) { + var processes []Process + err := c.RunJsonCmd(ctx, []string{ + "weka", "cluster", "process", "--json", + }, "GetProcesses", &processes) + if err != nil { + return nil, err + } + return processes, nil +} + func (c *CliWekaService) ListDrives(ctx context.Context, listOptions DriveListOptions) ([]Drive, error) { var drives []Drive filters := []string{} diff --git a/internal/services/weka_cluster.go b/internal/services/weka_cluster.go index 1e5a15343..3ac38d536 100644 --- a/internal/services/weka_cluster.go +++ b/internal/services/weka_cluster.go @@ -165,13 +165,6 @@ func (r *wekaClusterService) FormCluster(ctx context.Context, containers []*weka return errors.Wrapf(err, "Failed to set authenticate client join: %s", stderr.String()) } - //skip validation used to bypass memory system constriants, that we dont want bypassing. but might come handy for various dev flows, so keeping as reference - //cmd = "wekaauthcli debug override add --key constraints.skip_validation" - //_, stderr, err = executor.ExecNamed(ctx, "WekaClusterSetSkipValidation", []string{"bash", "-ce", cmd}) - //if err != nil { - // return errors.Wrapf(err, "Failed to set skip validation: %s", stderr.String()) - //} - if err := r.Client.Status().Update(ctx, r.Cluster); err != nil { return errors.Wrap(err, "Failed to update wekaCluster status") } diff --git a/pkg/util/refs.go b/pkg/util/refs.go index e83da743a..46de70b15 100644 --- a/pkg/util/refs.go +++ b/pkg/util/refs.go @@ -3,3 +3,7 @@ package util func IntRef(i int) *int { return &i } + +func Int32Ref(i int32) *int32 { + return &i +} diff --git a/pkg/weka-k8s-api b/pkg/weka-k8s-api index 046e88395..6e4bd004d 160000 --- a/pkg/weka-k8s-api +++ b/pkg/weka-k8s-api @@ -1 +1 @@ -Subproject commit 046e88395cb36b0e430f76df12b17d94bd0b0337 +Subproject commit 6e4bd004d514c9ce17dc8980942012da8574eb75 From 385ddbec3508d933c7e43045401ba9219afc5ace Mon Sep 17 00:00:00 2001 From: Dan Mordechay Date: Wed, 18 Dec 2024 14:29:24 +0200 Subject: [PATCH 2/3] new taskmon version --- build/taskmon/manual_buildx_push.sh | 7 ++----- examples/incluster-taskmon.yaml | 2 +- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/build/taskmon/manual_buildx_push.sh b/build/taskmon/manual_buildx_push.sh index 600190fe4..1b28ef49f 100755 --- a/build/taskmon/manual_buildx_push.sh +++ b/build/taskmon/manual_buildx_push.sh @@ -1,15 +1,12 @@ #!/bin/zsh - set -e -BASE_IMAGE=6d3eb658e900b4d9_x86_64 -VERSION=0.0.9 +BASE_IMAGE=cc21256cddb833ab_x86_64 +VERSION=0.0.15 TARGET_IMAGE=quay.io/weka.io/taskmon:$VERSION-$BASE_IMAGE - docker buildx build --build-arg BASE_IMAGE=$BASE_IMAGE --push --platform linux/amd64 -t $TARGET_IMAGE . - echo $TARGET_IMAGE is built diff --git a/examples/incluster-taskmon.yaml b/examples/incluster-taskmon.yaml index 79800cc6b..eaa13bfd1 100644 --- a/examples/incluster-taskmon.yaml +++ b/examples/incluster-taskmon.yaml @@ -5,7 +5,7 @@ metadata: namespace: weka-operator-system spec: action: "remote-traces-session" - image: quay.io/weka.io/taskmon:0.0.9-6d3eb658e900b4d9_x86_64 + image: quay.io/weka.io/taskmon:0.0.15-cc21256cddb833ab_x86_64 imagePullSecret: "quay-io-robot-secret" payload: remoteTracesSessionPayload: From ca5ae2164c4da4b86556ff546d438efa25f52202 Mon Sep 17 00:00:00 2001 From: Dan Mordechay Date: Mon, 17 Mar 2025 15:56:59 +0200 Subject: [PATCH 3/3] taskmon: 0.0.18 package --- build/taskmon/entrypoint.sh | 4 ++-- build/taskmon/manual_buildx_push.sh | 4 ++-- examples/incluster-taskmon.yaml | 6 +++--- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/build/taskmon/entrypoint.sh b/build/taskmon/entrypoint.sh index 900d88d88..efb58d0dc 100755 --- a/build/taskmon/entrypoint.sh +++ b/build/taskmon/entrypoint.sh @@ -2,5 +2,5 @@ set -e -env | grep -i TASKMON -exec /taskmon/taskmon start \ No newline at end of file +env | grep -i TASKMON || true +exec /taskmon/taskmon start diff --git a/build/taskmon/manual_buildx_push.sh b/build/taskmon/manual_buildx_push.sh index 1b28ef49f..38657c362 100755 --- a/build/taskmon/manual_buildx_push.sh +++ b/build/taskmon/manual_buildx_push.sh @@ -2,8 +2,8 @@ set -e -BASE_IMAGE=cc21256cddb833ab_x86_64 -VERSION=0.0.15 +BASE_IMAGE=f8a10f04603cfddb_x86_64 +VERSION=0.0.18 TARGET_IMAGE=quay.io/weka.io/taskmon:$VERSION-$BASE_IMAGE diff --git a/examples/incluster-taskmon.yaml b/examples/incluster-taskmon.yaml index eaa13bfd1..6fe5f2a76 100644 --- a/examples/incluster-taskmon.yaml +++ b/examples/incluster-taskmon.yaml @@ -5,12 +5,12 @@ metadata: namespace: weka-operator-system spec: action: "remote-traces-session" - image: quay.io/weka.io/taskmon:0.0.15-cc21256cddb833ab_x86_64 + image: quay.io/weka.io/taskmon:0.0.17-b39e025c393f60f3_x86_64 imagePullSecret: "quay-io-robot-secret" payload: remoteTracesSessionPayload: cluster: name: cluster-dev - namespace: weka-operator-system + namespace: default duration: 5m - wekahomeEndpointOverride: 10.0.30.81:30052 # direct port of nodeport service, TBD to wrap via envoy and then ALB ingress + wekahomeEndpointOverride: weka-home-wekahome-grpc-api.weka-home:50052 # direct port of nodeport service, TBD to wrap via envoy and then ALB ingress