aboutsummaryrefslogtreecommitdiffstats
path: root/src/rsync/pkg/status
diff options
context:
space:
mode:
authorEric Multanen <eric.w.multanen@intel.com>2020-07-01 23:30:49 -0700
committerEric Multanen <eric.w.multanen@intel.com>2020-07-08 13:36:34 -0700
commite06b947b03c3fcce2c954feb68890a519c7740c3 (patch)
tree5617b570ea85bf07dd76c6410975059acc23cc70 /src/rsync/pkg/status
parenta43096cbdca3fdabeda3d404bedadd7a7272a3c2 (diff)
Adds composite app status update and query
This patch provides the basic framework for supporting monitoring of composite application resources in clusters. 1. Updates to the monitor files for use with v2. 2. Invokes the Watcher process per cluster/app when the app is instantiated. 3. Adds a ResourceBundleState CR resource to the cluster/app so that monitor will be able to update status to it. 4. Watcher updates appropriate appcontext status object when updates are made in clusters by monitor 5. Update appcontext library to define a status handle and object at the app/cluster level 6. Labels resources with an appropriate tracking label to coordinate with the ResourceBundleState CR Issue-ID: MULTICLOUD-1042 Signed-off-by: Eric Multanen <eric.w.multanen@intel.com> Change-Id: If007c1fd86ca7a65bb941d1776cfd2d3afed766b
Diffstat (limited to 'src/rsync/pkg/status')
-rw-r--r--src/rsync/pkg/status/status.go117
1 files changed, 100 insertions, 17 deletions
diff --git a/src/rsync/pkg/status/status.go b/src/rsync/pkg/status/status.go
index 28bffefd..351da027 100644
--- a/src/rsync/pkg/status/status.go
+++ b/src/rsync/pkg/status/status.go
@@ -17,16 +17,20 @@
package status
import (
+ "encoding/base64"
"encoding/json"
"fmt"
+ "strings"
"sync"
pkgerrors "github.com/pkg/errors"
"github.com/sirupsen/logrus"
+ "github.com/onap/multicloud-k8s/src/clm/pkg/cluster"
v1alpha1 "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1"
clientset "github.com/onap/multicloud-k8s/src/monitor/pkg/generated/clientset/versioned"
informers "github.com/onap/multicloud-k8s/src/monitor/pkg/generated/informers/externalversions"
+ appcontext "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
)
@@ -42,20 +46,75 @@ const monitorLabel = "emco/deployment-id"
// HandleStatusUpdate for an application in a cluster
// TODO: Add code for specific handling
-func HandleStatusUpdate(provider, name string, id string, v *v1alpha1.ResourceBundleState) error {
- logrus.Info("label::", id)
+func HandleStatusUpdate(clusterId string, id string, v *v1alpha1.ResourceBundleState) {
//status := v.Status.ServiceStatuses
//podStatus := v.Status.PodStatuses
- // Store Pod Status in app context
- out, _ := json.Marshal(v.Status)
- logrus.Info("Status::", string(out))
- return nil
+
+ // Get the contextId from the label (id)
+ result := strings.SplitN(id, "-", 2)
+ if result[0] == "" {
+ logrus.Info(clusterId, "::label is missing an appcontext identifier::", id)
+ return
+ }
+
+ if len(result) != 2 {
+ logrus.Info(clusterId, "::invalid label format::", id)
+ return
+ }
+
+ // Get the app from the label (id)
+ if result[1] == "" {
+ logrus.Info(clusterId, "::label is missing an app identifier::", id)
+ return
+ }
+
+ // Look up the contextId
+ var ac appcontext.AppContext
+ _, err := ac.LoadAppContext(result[0])
+ if err != nil {
+ logrus.Info(clusterId, "::App context not found::", result[0], "::Error::", err)
+ return
+ }
+
+ // produce yaml representation of the status
+ vjson, err := json.Marshal(v.Status)
+ if err != nil {
+ logrus.Info(clusterId, "::Error marshalling status information::", err)
+ return
+ }
+
+ // Get the handle for the context/app/cluster status object
+ handle, err := ac.GetStatusHandle(result[1], clusterId)
+ if err != nil {
+ // Expected first time
+ logrus.Info(clusterId, "::Status context handle not found::", id, "::Error::", err)
+ }
+
+ // If status handle was not found, then create the status object in the appcontext
+ if handle == nil {
+ chandle, err := ac.GetClusterHandle(result[1], clusterId)
+ if err != nil {
+ logrus.Info(clusterId, "::Cluster context handle not found::", id, "::Error::", err)
+ } else {
+ ac.AddStatus(chandle, string(vjson))
+ }
+ } else {
+ ac.UpdateStatusValue(handle, string(vjson))
+ }
+
+ return
}
// StartClusterWatcher watches for CR
// configBytes - Kubectl file data
-func StartClusterWatcher(provider, name string, configBytes []byte) error {
- key := provider + "+" + name
+func StartClusterWatcher(clusterId string) error {
+
+ configBytes, err := getKubeConfig(clusterId)
+ if err != nil {
+ return err
+ }
+
+ //key := provider + "+" + name
// Get the lock
channelData.Lock()
defer channelData.Unlock()
@@ -63,10 +122,10 @@ func StartClusterWatcher(provider, name string, configBytes []byte) error {
if channelData.channels == nil {
channelData.channels = make(map[string]chan struct{})
}
- _, ok := channelData.channels[key]
+ _, ok := channelData.channels[clusterId]
if !ok {
// Create Channel
- channelData.channels[key] = make(chan struct{})
+ channelData.channels[clusterId] = make(chan struct{})
// Create config
config, err := clientcmd.RESTConfigFromKubeConfig(configBytes)
if err != nil {
@@ -80,16 +139,16 @@ func StartClusterWatcher(provider, name string, configBytes []byte) error {
// Create Informer
mInformerFactory := informers.NewSharedInformerFactory(k8sClient, 0)
mInformer := mInformerFactory.K8splugin().V1alpha1().ResourceBundleStates().Informer()
- go scheduleStatus(provider, name, channelData.channels[key], mInformer)
+ go scheduleStatus(clusterId, channelData.channels[clusterId], mInformer)
}
return nil
}
// StopClusterWatcher stop watching a cluster
-func StopClusterWatcher(provider, name string) {
- key := provider + "+" + name
+func StopClusterWatcher(clusterId string) {
+ //key := provider + "+" + name
if channelData.channels != nil {
- c, ok := channelData.channels[key]
+ c, ok := channelData.channels[clusterId]
if ok {
close(c)
}
@@ -108,7 +167,7 @@ func CloseAllClusterWatchers() {
}
// Per Cluster Go routine to watch CR
-func scheduleStatus(provider, name string, c <-chan struct{}, s cache.SharedIndexInformer) {
+func scheduleStatus(clusterId string, c <-chan struct{}, s cache.SharedIndexInformer) {
handlers := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
v, ok := obj.(*v1alpha1.ResourceBundleState)
@@ -116,7 +175,7 @@ func scheduleStatus(provider, name string, c <-chan struct{}, s cache.SharedInde
labels := v.GetLabels()
l, ok := labels[monitorLabel]
if ok {
- HandleStatusUpdate(provider, name, l, v)
+ HandleStatusUpdate(clusterId, l, v)
}
}
},
@@ -126,7 +185,7 @@ func scheduleStatus(provider, name string, c <-chan struct{}, s cache.SharedInde
labels := v.GetLabels()
l, ok := labels[monitorLabel]
if ok {
- HandleStatusUpdate(provider, name, l, v)
+ HandleStatusUpdate(clusterId, l, v)
}
}
},
@@ -137,3 +196,27 @@ func scheduleStatus(provider, name string, c <-chan struct{}, s cache.SharedInde
s.AddEventHandler(handlers)
s.Run(c)
}
+
+// getKubeConfig uses the connectivity client to get the kubeconfig based on the name
+// of the clustername. This is written out to a file.
+// TODO - consolidate with other rsync methods to get kubeconfig files
+func getKubeConfig(clustername string) ([]byte, error) {
+
+ if !strings.Contains(clustername, "+") {
+ return nil, pkgerrors.New("Not a valid cluster name")
+ }
+ strs := strings.Split(clustername, "+")
+ if len(strs) != 2 {
+ return nil, pkgerrors.New("Not a valid cluster name")
+ }
+ kubeConfig, err := cluster.NewClusterClient().GetClusterContent(strs[0], strs[1])
+ if err != nil {
+ return nil, pkgerrors.New("Get kubeconfig failed")
+ }
+
+ dec, err := base64.StdEncoding.DecodeString(kubeConfig.Kubeconfig)
+ if err != nil {
+ return nil, err
+ }
+ return dec, nil
+}