Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 37 additions & 5 deletions crossview-go-server/services/kubernetes_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,36 @@ func (k *KubernetesService) getKubeConfigPath() string {
return filepath.Join(homeDir, ".kube", "config")
}

func (k *KubernetesService) normalizeKubeConfigPaths() error {
kubeConfigPath := k.getKubeConfigPath()
kubeConfigDir := filepath.Dir(kubeConfigPath)

for clusterName, cluster := range k.kubeConfig.Clusters {
if cluster != nil && cluster.CertificateAuthority != "" {
certPath := cluster.CertificateAuthority
if !filepath.IsAbs(certPath) {
absPath := filepath.Join(kubeConfigDir, certPath)
cluster.CertificateAuthority = absPath
k.kubeConfig.Clusters[clusterName] = cluster
}
}
}

for authName, authInfo := range k.kubeConfig.AuthInfos {
if authInfo != nil {
if authInfo.ClientCertificate != "" && !filepath.IsAbs(authInfo.ClientCertificate) {
authInfo.ClientCertificate = filepath.Join(kubeConfigDir, authInfo.ClientCertificate)
}
if authInfo.ClientKey != "" && !filepath.IsAbs(authInfo.ClientKey) {
authInfo.ClientKey = filepath.Join(kubeConfigDir, authInfo.ClientKey)
}
k.kubeConfig.AuthInfos[authName] = authInfo
}
}

return nil
}

func (k *KubernetesService) loadKubeConfig() error {
kubeConfigPath := k.getKubeConfigPath()
if kubeConfigPath == "" {
Expand All @@ -46,12 +76,15 @@ func (k *KubernetesService) loadKubeConfig() error {
}

k.kubeConfig = config
if err := k.normalizeKubeConfigPaths(); err != nil {
return err
}
return nil
}

func (k *KubernetesService) isInCluster() bool {
serviceAccountPath := "/var/run/secrets/kubernetes.io/serviceaccount"
return fileExists(serviceAccountPath) &&
return fileExists(serviceAccountPath) &&
fileExists(filepath.Join(serviceAccountPath, "token")) &&
fileExists(filepath.Join(serviceAccountPath, "ca.crt"))
}
Expand Down Expand Up @@ -120,7 +153,7 @@ func (k *KubernetesService) SetContext(ctxName string) error {
k.clientset = clientset
k.dynamicClient = nil
delete(k.failedContexts, targetContext)

// Clear managed resources cache when context changes
k.managedResourcesCache = make(map[string]map[string]interface{})
k.managedResourcesCacheTime = make(map[string]time.Time)
Expand All @@ -135,7 +168,7 @@ func (k *KubernetesService) SetContext(ctxName string) error {

func (k *KubernetesService) IsConnected(ctxName string) (bool, error) {
originalContext := k.GetCurrentContext()

if err := k.SetContext(ctxName); err != nil {
return false, err
}
Expand All @@ -155,7 +188,7 @@ func (k *KubernetesService) IsConnected(ctxName string) (bool, error) {
if originalContext != "" && originalContext != ctxName {
k.SetContext(originalContext)
}

if err != nil {
return false, err
}
Expand Down Expand Up @@ -281,4 +314,3 @@ func (k *KubernetesService) RemoveContext(ctxName string) error {

return nil
}

166 changes: 89 additions & 77 deletions crossview-go-server/services/kubernetes_managed.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"sync"
"time"

"crossview-go-server/lib"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
Expand Down Expand Up @@ -67,17 +69,13 @@ func buildManagedResourceTargetsFromMRDs(mrdList []map[string]interface{}) []man
func appendOptionalManagedResourceTargets(resourceTargets []managedResourceTarget) []managedResourceTarget {
return append(resourceTargets,
managedResourceTarget{apiVersion: "pkg.crossplane.io/v1", kind: "ManagedResourceDefinition", plural: "managedresourcedefinitions"},
managedResourceTarget{apiVersion: "pkg.crossplane.io/v1beta1", kind: "ManagedResourceDefinition", plural: "managedresourcedefinitions"},
managedResourceTarget{apiVersion: "pkg.crossplane.io/v1alpha1", kind: "ManagedResourceDefinition", plural: "managedresourcedefinitions"},
managedResourceTarget{apiVersion: "pkg.crossplane.io/v1", kind: "ManagedResourceActivationPolicy", plural: "managedresourceactivationpolicies"},
managedResourceTarget{apiVersion: "pkg.crossplane.io/v1beta1", kind: "ManagedResourceActivationPolicy", plural: "managedresourceactivationpolicies"},
managedResourceTarget{apiVersion: "pkg.crossplane.io/v1alpha1", kind: "ManagedResourceActivationPolicy", plural: "managedresourceactivationpolicies"},
)
}

func dedupeManagedResources(items []interface{}) []interface{} {
allResources := make([]interface{}, 0, len(items))
seenResourceKeys := make(map[string]struct{})
seenUIDs := make(map[string]struct{}, len(items))

for _, item := range items {
itemMap, ok := item.(map[string]interface{})
Expand All @@ -86,34 +84,37 @@ func dedupeManagedResources(items []interface{}) []interface{} {
}

metadata, _ := itemMap["metadata"].(map[string]interface{})
uid, _ := metadata["uid"].(string)
name, _ := metadata["name"].(string)
namespace, _ := metadata["namespace"].(string)
apiVersion, _ := itemMap["apiVersion"].(string)
kind, _ := itemMap["kind"].(string)

resourceKey := uid
if resourceKey == "" {
resourceKey = fmt.Sprintf("%s|%s|%s|%s", apiVersion, kind, namespace, name)
if metadata == nil {
continue
}

if _, exists := seenResourceKeys[resourceKey]; exists {
continue
uid, _ := metadata["uid"].(string)
if uid != "" {
if _, seen := seenUIDs[uid]; seen {
continue
}
seenUIDs[uid] = struct{}{}
}

seenResourceKeys[resourceKey] = struct{}{}
allResources = append(allResources, itemMap)
}

return allResources
}

func (k *KubernetesService) fetchManagedResourceTarget(contextName string, target managedResourceTarget) ([]interface{}, error) {
continueToken := ""
allItems := make([]interface{}, 0)
continueToken := ""
pageSize := int64(1000)
maxItems := int64(5000)
itemCount := int64(0)

for {
result, err := k.GetResources(target.apiVersion, target.kind, "", contextName, target.plural, nil, continueToken)
if itemCount >= maxItems {
break
}

result, err := k.GetResources(target.apiVersion, target.kind, "", contextName, target.plural, &pageSize, continueToken)
if err != nil {
return nil, err
}
Expand All @@ -122,13 +123,13 @@ func (k *KubernetesService) fetchManagedResourceTarget(contextName string, targe
if items != nil {
for _, item := range items {
if itemMap, ok := item.(map[string]interface{}); ok {
itemMapCopy := make(map[string]interface{})
for key, val := range itemMap {
itemMapCopy[key] = val
itemMap["apiVersion"] = target.apiVersion
itemMap["kind"] = target.kind
allItems = append(allItems, itemMap)
itemCount++
if itemCount >= maxItems {
break
}
itemMapCopy["apiVersion"] = target.apiVersion
itemMapCopy["kind"] = target.kind
allItems = append(allItems, itemMapCopy)
}
}
}
Expand Down Expand Up @@ -165,14 +166,9 @@ func (k *KubernetesService) GetManagedResources(contextName string, forceRefresh
if cacheTime, timeExists := k.managedResourcesCacheTime[contextName]; timeExists {
if time.Since(cacheTime) < k.managedResourcesCacheTTL {
k.logger.Infof("Returning cached managed resources for context: %s", contextName)
// Create a copy with fromCache: true
result := make(map[string]interface{})
for key, value := range cachedResult {
result[key] = value
}
result["fromCache"] = true
cachedResult["fromCache"] = true
k.mu.RUnlock()
return result, nil
return cachedResult, nil
}
}
}
Expand All @@ -191,24 +187,61 @@ func (k *KubernetesService) GetManagedResources(contextName string, forceRefresh
return nil, fmt.Errorf("failed to create dynamic client: %w", err)
}

providersResult, err := k.GetResources("pkg.crossplane.io/v1", "Provider", "", contextName, "", nil, "")
if err != nil {
return nil, fmt.Errorf("failed to get providers: %w", err)
var providers, revisions []interface{}
var provErr, revErr error
var provWg sync.WaitGroup
provWg.Add(2)

go func() {
defer provWg.Done()
providersResult, err := k.GetResources("pkg.crossplane.io/v1", "Provider", "", contextName, "", nil, "")
if err != nil {
provErr = err
return
}
providers, _ = providersResult["items"].([]interface{})
}()

go func() {
defer provWg.Done()
revisionsResult, err := k.GetResources("pkg.crossplane.io/v1", "ProviderRevision", "", contextName, "", nil, "")
if err != nil {
revErr = err
return
}
revisions, _ = revisionsResult["items"].([]interface{})
}()

provWg.Wait()
if provErr != nil {
return nil, fmt.Errorf("failed to get providers: %w", provErr)
}
if revErr != nil {
return nil, fmt.Errorf("failed to get provider revisions: %w", revErr)
}
providers, _ := providersResult["items"].([]interface{})
if providers == nil {
providers = []interface{}{}
}

revisionsResult, err := k.GetResources("pkg.crossplane.io/v1", "ProviderRevision", "", contextName, "", nil, "")
if err != nil {
return nil, fmt.Errorf("failed to get provider revisions: %w", err)
}
revisions, _ := revisionsResult["items"].([]interface{})
if revisions == nil {
revisions = []interface{}{}
}

providerNameMap := make(map[string]bool)
for _, prov := range providers {
provMap, _ := prov.(map[string]interface{})
if provMap == nil {
continue
}
provMetadata, _ := provMap["metadata"].(map[string]interface{})
if provMetadata == nil {
continue
}
provName, _ := provMetadata["name"].(string)
if provName != "" {
providerNameMap[provName] = true
}
}

revisionToProvider := make(map[string]string)
for _, rev := range revisions {
revMap, _ := rev.(map[string]interface{})
Expand All @@ -229,22 +262,9 @@ func (k *KubernetesService) GetManagedResources(contextName string, forceRefresh
ownerKind, _ := owner["kind"].(string)
ownerAPIVersion, _ := owner["apiVersion"].(string)
ownerName, _ := owner["name"].(string)
if ownerKind == "Provider" && ownerAPIVersion == "pkg.crossplane.io/v1" {
for _, prov := range providers {
provMap, _ := prov.(map[string]interface{})
if provMap == nil {
continue
}
provMetadata, _ := provMap["metadata"].(map[string]interface{})
if provMetadata == nil {
continue
}
provName, _ := provMetadata["name"].(string)
if provName == ownerName {
revisionToProvider[revName] = ownerName
break
}
}
if ownerKind == "Provider" && ownerAPIVersion == "pkg.crossplane.io/v1" && providerNameMap[ownerName] {
revisionToProvider[revName] = ownerName
break
}
}
}
Expand Down Expand Up @@ -277,22 +297,8 @@ func (k *KubernetesService) GetManagedResources(contextName string, forceRefresh
ownerName, _ := owner["name"].(string)

var providerName string
if ownerKind == "Provider" && ownerAPIVersion == "pkg.crossplane.io/v1" {
for _, prov := range providers {
provMap, _ := prov.(map[string]interface{})
if provMap == nil {
continue
}
provMetadata, _ := provMap["metadata"].(map[string]interface{})
if provMetadata == nil {
continue
}
provName, _ := provMetadata["name"].(string)
if provName == ownerName {
providerName = ownerName
break
}
}
if ownerKind == "Provider" && ownerAPIVersion == "pkg.crossplane.io/v1" && providerNameMap[ownerName] {
providerName = ownerName
} else if ownerKind == "ProviderRevision" && ownerAPIVersion == "pkg.crossplane.io/v1" {
providerName = revisionToProvider[ownerName]
}
Expand Down Expand Up @@ -333,19 +339,26 @@ func (k *KubernetesService) GetManagedResources(contextName string, forceRefresh

resourceTargets := appendOptionalManagedResourceTargets(buildManagedResourceTargetsFromMRDs(mrdList))

semaphore := make(chan struct{}, 10)
resourceChan := make(chan resourceResult, len(resourceTargets))
var wg sync.WaitGroup

for _, target := range resourceTargets {
wg.Add(1)
go func(target managedResourceTarget) {
defer wg.Done()
semaphore <- struct{}{}
defer func() { <-semaphore }()
items, err := k.fetchManagedResourceTarget(contextName, target)
if err != nil {
resourceChan <- resourceResult{items: nil, err: err}
if !lib.IsMissingKubernetesResourceError(err) {
resourceChan <- resourceResult{items: nil, err: err}
}
return
}
resourceChan <- resourceResult{items: items, err: nil}
if items != nil && len(items) > 0 {
resourceChan <- resourceResult{items: items, err: nil}
}
}(target)
}

Expand All @@ -362,7 +375,6 @@ func (k *KubernetesService) GetManagedResources(contextName string, forceRefresh
}
allResources = dedupeManagedResources(allResources)

// Cache the results
result := map[string]interface{}{
"items": allResources,
"fromCache": false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ export class GetManagedResourceActivationPoliciesUseCase {

async execute(context = null) {
try {
const apiVersion = 'apiextensions.crossplane.io/v1alpha1';
const kind = 'ManagedResourceActivationPolicy';
const mrapsResult = await this.kubernetesRepository.getResources(apiVersion, kind, null, context);
const mraps = mrapsResult.items || mrapsResult; // Support both new format and legacy array format
const managedResourcesResult = await this.kubernetesRepository.getManagedResources(context);
const allResources = managedResourcesResult.items || [];

const mraps = allResources.filter(item => item.kind === 'ManagedResourceActivationPolicy');
const mrapsArray = Array.isArray(mraps) ? mraps : [];

return mrapsArray.map(mrap => ({
Expand Down
8 changes: 4 additions & 4 deletions src/domain/usecases/GetManagedResourceDefinitionsUseCase.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ export class GetManagedResourceDefinitionsUseCase {

async execute(context = null) {
try {
const apiVersion = 'apiextensions.crossplane.io/v1alpha1';
const kind = 'ManagedResourceDefinition';
const mrdsResult = await this.kubernetesRepository.getResources(apiVersion, kind, null, context);
const mrds = mrdsResult.items || mrdsResult; // Support both new format and legacy array format
const managedResourcesResult = await this.kubernetesRepository.getManagedResources(context);
const allResources = managedResourcesResult.items || [];

const mrds = allResources.filter(item => item.kind === 'ManagedResourceDefinition');
const mrdsArray = Array.isArray(mrds) ? mrds : [];

return mrdsArray.map(mrd => {
Expand Down
Loading
Loading