diff options
author | Eric Multanen <eric.w.multanen@intel.com> | 2020-07-01 23:30:49 -0700 |
---|---|---|
committer | Eric Multanen <eric.w.multanen@intel.com> | 2020-07-08 13:36:34 -0700 |
commit | e06b947b03c3fcce2c954feb68890a519c7740c3 (patch) | |
tree | 5617b570ea85bf07dd76c6410975059acc23cc70 /src/rsync/pkg/status | |
parent | a43096cbdca3fdabeda3d404bedadd7a7272a3c2 (diff) |
Adds composite app status update and query
This patch provides the basic framework for supporting
monitoring of composite application resources in clusters.
1. Updates to the monitor files for use with v2.
2. Invokes the Watcher process per cluster/app when the
app is instantiated.
3. Adds a ResourceBundleState CR resource to the cluster/app
so that monitor will be able to update status to it.
4. Watcher updates appropriate appcontext status object
when updates are made in clusters by monitor
5. Update appcontext library to define a status handle
and object at the app/cluster level
6. Labels resources with an appropriate tracking label
to coordinate with the ResourceBundleState CR
Issue-ID: MULTICLOUD-1042
Signed-off-by: Eric Multanen <eric.w.multanen@intel.com>
Change-Id: If007c1fd86ca7a65bb941d1776cfd2d3afed766b
Diffstat (limited to 'src/rsync/pkg/status')
-rw-r--r-- | src/rsync/pkg/status/status.go | 117 |
1 files changed, 100 insertions, 17 deletions
diff --git a/src/rsync/pkg/status/status.go b/src/rsync/pkg/status/status.go index 28bffefd..351da027 100644 --- a/src/rsync/pkg/status/status.go +++ b/src/rsync/pkg/status/status.go @@ -17,16 +17,20 @@ package status import ( + "encoding/base64" "encoding/json" "fmt" + "strings" "sync" pkgerrors "github.com/pkg/errors" "github.com/sirupsen/logrus" + "github.com/onap/multicloud-k8s/src/clm/pkg/cluster" v1alpha1 "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1" clientset "github.com/onap/multicloud-k8s/src/monitor/pkg/generated/clientset/versioned" informers "github.com/onap/multicloud-k8s/src/monitor/pkg/generated/informers/externalversions" + appcontext "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" ) @@ -42,20 +46,75 @@ const monitorLabel = "emco/deployment-id" // HandleStatusUpdate for an application in a cluster // TODO: Add code for specific handling -func HandleStatusUpdate(provider, name string, id string, v *v1alpha1.ResourceBundleState) error { - logrus.Info("label::", id) +func HandleStatusUpdate(clusterId string, id string, v *v1alpha1.ResourceBundleState) { //status := v.Status.ServiceStatuses //podStatus := v.Status.PodStatuses - // Store Pod Status in app context - out, _ := json.Marshal(v.Status) - logrus.Info("Status::", string(out)) - return nil + + // Get the contextId from the label (id) + result := strings.SplitN(id, "-", 2) + if result[0] == "" { + logrus.Info(clusterId, "::label is missing an appcontext identifier::", id) + return + } + + if len(result) != 2 { + logrus.Info(clusterId, "::invalid label format::", id) + return + } + + // Get the app from the label (id) + if result[1] == "" { + logrus.Info(clusterId, "::label is missing an app identifier::", id) + return + } + + // Look up the contextId + var ac appcontext.AppContext + _, err := ac.LoadAppContext(result[0]) + if err != nil { + logrus.Info(clusterId, "::App context not found::", result[0], "::Error::", err) + return + } + + // produce yaml representation of the status + vjson, err := json.Marshal(v.Status) + if err != nil { + logrus.Info(clusterId, "::Error marshalling status information::", err) + return + } + + // Get the handle for the context/app/cluster status object + handle, err := ac.GetStatusHandle(result[1], clusterId) + if err != nil { + // Expected first time + logrus.Info(clusterId, "::Status context handle not found::", id, "::Error::", err) + } + + // If status handle was not found, then create the status object in the appcontext + if handle == nil { + chandle, err := ac.GetClusterHandle(result[1], clusterId) + if err != nil { + logrus.Info(clusterId, "::Cluster context handle not found::", id, "::Error::", err) + } else { + ac.AddStatus(chandle, string(vjson)) + } + } else { + ac.UpdateStatusValue(handle, string(vjson)) + } + + return } // StartClusterWatcher watches for CR // configBytes - Kubectl file data -func StartClusterWatcher(provider, name string, configBytes []byte) error { - key := provider + "+" + name +func StartClusterWatcher(clusterId string) error { + + configBytes, err := getKubeConfig(clusterId) + if err != nil { + return err + } + + //key := provider + "+" + name // Get the lock channelData.Lock() defer channelData.Unlock() @@ -63,10 +122,10 @@ func StartClusterWatcher(provider, name string, configBytes []byte) error { if channelData.channels == nil { channelData.channels = make(map[string]chan struct{}) } - _, ok := channelData.channels[key] + _, ok := channelData.channels[clusterId] if !ok { // Create Channel - channelData.channels[key] = make(chan struct{}) + channelData.channels[clusterId] = make(chan struct{}) // Create config config, err := clientcmd.RESTConfigFromKubeConfig(configBytes) if err != nil { @@ -80,16 +139,16 @@ func StartClusterWatcher(provider, name string, configBytes []byte) error { // Create Informer mInformerFactory := informers.NewSharedInformerFactory(k8sClient, 0) mInformer := mInformerFactory.K8splugin().V1alpha1().ResourceBundleStates().Informer() - go scheduleStatus(provider, name, channelData.channels[key], mInformer) + go scheduleStatus(clusterId, channelData.channels[clusterId], mInformer) } return nil } // StopClusterWatcher stop watching a cluster -func StopClusterWatcher(provider, name string) { - key := provider + "+" + name +func StopClusterWatcher(clusterId string) { + //key := provider + "+" + name if channelData.channels != nil { - c, ok := channelData.channels[key] + c, ok := channelData.channels[clusterId] if ok { close(c) } @@ -108,7 +167,7 @@ func CloseAllClusterWatchers() { } // Per Cluster Go routine to watch CR -func scheduleStatus(provider, name string, c <-chan struct{}, s cache.SharedIndexInformer) { +func scheduleStatus(clusterId string, c <-chan struct{}, s cache.SharedIndexInformer) { handlers := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { v, ok := obj.(*v1alpha1.ResourceBundleState) @@ -116,7 +175,7 @@ func scheduleStatus(provider, name string, c <-chan struct{}, s cache.SharedInde labels := v.GetLabels() l, ok := labels[monitorLabel] if ok { - HandleStatusUpdate(provider, name, l, v) + HandleStatusUpdate(clusterId, l, v) } } }, @@ -126,7 +185,7 @@ func scheduleStatus(provider, name string, c <-chan struct{}, s cache.SharedInde labels := v.GetLabels() l, ok := labels[monitorLabel] if ok { - HandleStatusUpdate(provider, name, l, v) + HandleStatusUpdate(clusterId, l, v) } } }, @@ -137,3 +196,27 @@ func scheduleStatus(provider, name string, c <-chan struct{}, s cache.SharedInde s.AddEventHandler(handlers) s.Run(c) } + +// getKubeConfig uses the connectivity client to get the kubeconfig based on the name +// of the clustername. This is written out to a file. +// TODO - consolidate with other rsync methods to get kubeconfig files +func getKubeConfig(clustername string) ([]byte, error) { + + if !strings.Contains(clustername, "+") { + return nil, pkgerrors.New("Not a valid cluster name") + } + strs := strings.Split(clustername, "+") + if len(strs) != 2 { + return nil, pkgerrors.New("Not a valid cluster name") + } + kubeConfig, err := cluster.NewClusterClient().GetClusterContent(strs[0], strs[1]) + if err != nil { + return nil, pkgerrors.New("Get kubeconfig failed") + } + + dec, err := base64.StdEncoding.DecodeString(kubeConfig.Kubeconfig) + if err != nil { + return nil, err + } + return dec, nil +} |