summaryrefslogtreecommitdiffstats
path: root/src/rsync/pkg/status/status.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/rsync/pkg/status/status.go')
-rw-r--r--src/rsync/pkg/status/status.go117
1 files changed, 100 insertions, 17 deletions
diff --git a/src/rsync/pkg/status/status.go b/src/rsync/pkg/status/status.go
index 28bffefd..351da027 100644
--- a/src/rsync/pkg/status/status.go
+++ b/src/rsync/pkg/status/status.go
@@ -17,16 +17,20 @@
package status
import (
+ "encoding/base64"
"encoding/json"
"fmt"
+ "strings"
"sync"
pkgerrors "github.com/pkg/errors"
"github.com/sirupsen/logrus"
+ "github.com/onap/multicloud-k8s/src/clm/pkg/cluster"
v1alpha1 "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1"
clientset "github.com/onap/multicloud-k8s/src/monitor/pkg/generated/clientset/versioned"
informers "github.com/onap/multicloud-k8s/src/monitor/pkg/generated/informers/externalversions"
+ appcontext "github.com/onap/multicloud-k8s/src/orchestrator/pkg/appcontext"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
)
@@ -42,20 +46,75 @@ const monitorLabel = "emco/deployment-id"
// HandleStatusUpdate for an application in a cluster
// TODO: Add code for specific handling
-func HandleStatusUpdate(provider, name string, id string, v *v1alpha1.ResourceBundleState) error {
- logrus.Info("label::", id)
+func HandleStatusUpdate(clusterId string, id string, v *v1alpha1.ResourceBundleState) {
//status := v.Status.ServiceStatuses
//podStatus := v.Status.PodStatuses
- // Store Pod Status in app context
- out, _ := json.Marshal(v.Status)
- logrus.Info("Status::", string(out))
- return nil
+
+ // Get the contextId from the label (id)
+ result := strings.SplitN(id, "-", 2)
+ if result[0] == "" {
+ logrus.Info(clusterId, "::label is missing an appcontext identifier::", id)
+ return
+ }
+
+ if len(result) != 2 {
+ logrus.Info(clusterId, "::invalid label format::", id)
+ return
+ }
+
+ // Get the app from the label (id)
+ if result[1] == "" {
+ logrus.Info(clusterId, "::label is missing an app identifier::", id)
+ return
+ }
+
+ // Look up the contextId
+ var ac appcontext.AppContext
+ _, err := ac.LoadAppContext(result[0])
+ if err != nil {
+ logrus.Info(clusterId, "::App context not found::", result[0], "::Error::", err)
+ return
+ }
+
+ // produce yaml representation of the status
+ vjson, err := json.Marshal(v.Status)
+ if err != nil {
+ logrus.Info(clusterId, "::Error marshalling status information::", err)
+ return
+ }
+
+ // Get the handle for the context/app/cluster status object
+ handle, err := ac.GetStatusHandle(result[1], clusterId)
+ if err != nil {
+ // Expected first time
+ logrus.Info(clusterId, "::Status context handle not found::", id, "::Error::", err)
+ }
+
+ // If status handle was not found, then create the status object in the appcontext
+ if handle == nil {
+ chandle, err := ac.GetClusterHandle(result[1], clusterId)
+ if err != nil {
+ logrus.Info(clusterId, "::Cluster context handle not found::", id, "::Error::", err)
+ } else {
+ ac.AddStatus(chandle, string(vjson))
+ }
+ } else {
+ ac.UpdateStatusValue(handle, string(vjson))
+ }
+
+ return
}
// StartClusterWatcher watches for CR
// configBytes - Kubectl file data
-func StartClusterWatcher(provider, name string, configBytes []byte) error {
- key := provider + "+" + name
+func StartClusterWatcher(clusterId string) error {
+
+ configBytes, err := getKubeConfig(clusterId)
+ if err != nil {
+ return err
+ }
+
+ //key := provider + "+" + name
// Get the lock
channelData.Lock()
defer channelData.Unlock()
@@ -63,10 +122,10 @@ func StartClusterWatcher(provider, name string, configBytes []byte) error {
if channelData.channels == nil {
channelData.channels = make(map[string]chan struct{})
}
- _, ok := channelData.channels[key]
+ _, ok := channelData.channels[clusterId]
if !ok {
// Create Channel
- channelData.channels[key] = make(chan struct{})
+ channelData.channels[clusterId] = make(chan struct{})
// Create config
config, err := clientcmd.RESTConfigFromKubeConfig(configBytes)
if err != nil {
@@ -80,16 +139,16 @@ func StartClusterWatcher(provider, name string, configBytes []byte) error {
// Create Informer
mInformerFactory := informers.NewSharedInformerFactory(k8sClient, 0)
mInformer := mInformerFactory.K8splugin().V1alpha1().ResourceBundleStates().Informer()
- go scheduleStatus(provider, name, channelData.channels[key], mInformer)
+ go scheduleStatus(clusterId, channelData.channels[clusterId], mInformer)
}
return nil
}
// StopClusterWatcher stop watching a cluster
-func StopClusterWatcher(provider, name string) {
- key := provider + "+" + name
+func StopClusterWatcher(clusterId string) {
+ //key := provider + "+" + name
if channelData.channels != nil {
- c, ok := channelData.channels[key]
+ c, ok := channelData.channels[clusterId]
if ok {
close(c)
}
@@ -108,7 +167,7 @@ func CloseAllClusterWatchers() {
}
// Per Cluster Go routine to watch CR
-func scheduleStatus(provider, name string, c <-chan struct{}, s cache.SharedIndexInformer) {
+func scheduleStatus(clusterId string, c <-chan struct{}, s cache.SharedIndexInformer) {
handlers := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
v, ok := obj.(*v1alpha1.ResourceBundleState)
@@ -116,7 +175,7 @@ func scheduleStatus(provider, name string, c <-chan struct{}, s cache.SharedInde
labels := v.GetLabels()
l, ok := labels[monitorLabel]
if ok {
- HandleStatusUpdate(provider, name, l, v)
+ HandleStatusUpdate(clusterId, l, v)
}
}
},
@@ -126,7 +185,7 @@ func scheduleStatus(provider, name string, c <-chan struct{}, s cache.SharedInde
labels := v.GetLabels()
l, ok := labels[monitorLabel]
if ok {
- HandleStatusUpdate(provider, name, l, v)
+ HandleStatusUpdate(clusterId, l, v)
}
}
},
@@ -137,3 +196,27 @@ func scheduleStatus(provider, name string, c <-chan struct{}, s cache.SharedInde
s.AddEventHandler(handlers)
s.Run(c)
}
+
+// getKubeConfig uses the connectivity client to get the kubeconfig based on the name
+// of the clustername. This is written out to a file.
+// TODO - consolidate with other rsync methods to get kubeconfig files
+func getKubeConfig(clustername string) ([]byte, error) {
+
+ if !strings.Contains(clustername, "+") {
+ return nil, pkgerrors.New("Not a valid cluster name")
+ }
+ strs := strings.Split(clustername, "+")
+ if len(strs) != 2 {
+ return nil, pkgerrors.New("Not a valid cluster name")
+ }
+ kubeConfig, err := cluster.NewClusterClient().GetClusterContent(strs[0], strs[1])
+ if err != nil {
+ return nil, pkgerrors.New("Get kubeconfig failed")
+ }
+
+ dec, err := base64.StdEncoding.DecodeString(kubeConfig.Kubeconfig)
+ if err != nil {
+ return nil, err
+ }
+ return dec, nil
+}