diff options
Diffstat (limited to 'src/rsync/pkg/status/status.go')
-rw-r--r-- | src/rsync/pkg/status/status.go | 117 |
1 files changed, 100 insertions, 17 deletions
diff --git a/src/rsync/pkg/status/status.go b/src/rsync/pkg/status/status.go index 28bffefd..351da027 100644 --- a/src/rsync/pkg/status/status.go +++ b/src/rsync/pkg/status/status.go @@ -17,16 +17,20 @@ package status import ( + "encoding/base64" "encoding/json" "fmt" + "strings" "sync" pkgerrors "github.com/pkg/errors" "github.com/sirupsen/logrus" + "github.com/onap/multicloud-k8s/src/clm/pkg/cluster" v1alpha1 "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1" clientset "github.com/onap/multicloud-k8s/src/monitor/pkg/generated/clientset/versioned" informers "github.com/onap/multicloud-k8s/src/monitor/pkg/generated/informers/externalversions" + appcontext "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" ) @@ -42,20 +46,75 @@ const monitorLabel = "emco/deployment-id" // HandleStatusUpdate for an application in a cluster // TODO: Add code for specific handling -func HandleStatusUpdate(provider, name string, id string, v *v1alpha1.ResourceBundleState) error { - logrus.Info("label::", id) +func HandleStatusUpdate(clusterId string, id string, v *v1alpha1.ResourceBundleState) { //status := v.Status.ServiceStatuses //podStatus := v.Status.PodStatuses - // Store Pod Status in app context - out, _ := json.Marshal(v.Status) - logrus.Info("Status::", string(out)) - return nil + + // Get the contextId from the label (id) + result := strings.SplitN(id, "-", 2) + if result[0] == "" { + logrus.Info(clusterId, "::label is missing an appcontext identifier::", id) + return + } + + if len(result) != 2 { + logrus.Info(clusterId, "::invalid label format::", id) + return + } + + // Get the app from the label (id) + if result[1] == "" { + logrus.Info(clusterId, "::label is missing an app identifier::", id) + return + } + + // Look up the contextId + var ac appcontext.AppContext + _, err := ac.LoadAppContext(result[0]) + if err != nil { + logrus.Info(clusterId, "::App context not found::", result[0], "::Error::", err) + return + } + + // produce yaml representation of the status + vjson, err := json.Marshal(v.Status) + if err != nil { + logrus.Info(clusterId, "::Error marshalling status information::", err) + return + } + + // Get the handle for the context/app/cluster status object + handle, err := ac.GetStatusHandle(result[1], clusterId) + if err != nil { + // Expected first time + logrus.Info(clusterId, "::Status context handle not found::", id, "::Error::", err) + } + + // If status handle was not found, then create the status object in the appcontext + if handle == nil { + chandle, err := ac.GetClusterHandle(result[1], clusterId) + if err != nil { + logrus.Info(clusterId, "::Cluster context handle not found::", id, "::Error::", err) + } else { + ac.AddStatus(chandle, string(vjson)) + } + } else { + ac.UpdateStatusValue(handle, string(vjson)) + } + + return } // StartClusterWatcher watches for CR // configBytes - Kubectl file data -func StartClusterWatcher(provider, name string, configBytes []byte) error { - key := provider + "+" + name +func StartClusterWatcher(clusterId string) error { + + configBytes, err := getKubeConfig(clusterId) + if err != nil { + return err + } + + //key := provider + "+" + name // Get the lock channelData.Lock() defer channelData.Unlock() @@ -63,10 +122,10 @@ func StartClusterWatcher(provider, name string, configBytes []byte) error { if channelData.channels == nil { channelData.channels = make(map[string]chan struct{}) } - _, ok := channelData.channels[key] + _, ok := channelData.channels[clusterId] if !ok { // Create Channel - channelData.channels[key] = make(chan struct{}) + channelData.channels[clusterId] = make(chan struct{}) // Create config config, err := clientcmd.RESTConfigFromKubeConfig(configBytes) if err != nil { @@ -80,16 +139,16 @@ func StartClusterWatcher(provider, name string, configBytes []byte) error { // Create Informer mInformerFactory := informers.NewSharedInformerFactory(k8sClient, 0) mInformer := mInformerFactory.K8splugin().V1alpha1().ResourceBundleStates().Informer() - go scheduleStatus(provider, name, channelData.channels[key], mInformer) + go scheduleStatus(clusterId, channelData.channels[clusterId], mInformer) } return nil } // StopClusterWatcher stop watching a cluster -func StopClusterWatcher(provider, name string) { - key := provider + "+" + name +func StopClusterWatcher(clusterId string) { + //key := provider + "+" + name if channelData.channels != nil { - c, ok := channelData.channels[key] + c, ok := channelData.channels[clusterId] if ok { close(c) } @@ -108,7 +167,7 @@ func CloseAllClusterWatchers() { } // Per Cluster Go routine to watch CR -func scheduleStatus(provider, name string, c <-chan struct{}, s cache.SharedIndexInformer) { +func scheduleStatus(clusterId string, c <-chan struct{}, s cache.SharedIndexInformer) { handlers := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { v, ok := obj.(*v1alpha1.ResourceBundleState) @@ -116,7 +175,7 @@ func scheduleStatus(provider, name string, c <-chan struct{}, s cache.SharedInde labels := v.GetLabels() l, ok := labels[monitorLabel] if ok { - HandleStatusUpdate(provider, name, l, v) + HandleStatusUpdate(clusterId, l, v) } } }, @@ -126,7 +185,7 @@ func scheduleStatus(provider, name string, c <-chan struct{}, s cache.SharedInde labels := v.GetLabels() l, ok := labels[monitorLabel] if ok { - HandleStatusUpdate(provider, name, l, v) + HandleStatusUpdate(clusterId, l, v) } } }, @@ -137,3 +196,27 @@ func scheduleStatus(provider, name string, c <-chan struct{}, s cache.SharedInde s.AddEventHandler(handlers) s.Run(c) } + +// getKubeConfig uses the connectivity client to get the kubeconfig based on the name +// of the clustername. This is written out to a file. +// TODO - consolidate with other rsync methods to get kubeconfig files +func getKubeConfig(clustername string) ([]byte, error) { + + if !strings.Contains(clustername, "+") { + return nil, pkgerrors.New("Not a valid cluster name") + } + strs := strings.Split(clustername, "+") + if len(strs) != 2 { + return nil, pkgerrors.New("Not a valid cluster name") + } + kubeConfig, err := cluster.NewClusterClient().GetClusterContent(strs[0], strs[1]) + if err != nil { + return nil, pkgerrors.New("Get kubeconfig failed") + } + + dec, err := base64.StdEncoding.DecodeString(kubeConfig.Kubeconfig) + if err != nil { + return nil, err + } + return dec, nil +} |