summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rsync/go.mod5
-rw-r--r--src/rsync/pkg/status/status.go139
2 files changed, 139 insertions, 5 deletions
diff --git a/src/rsync/go.mod b/src/rsync/go.mod
index a2c5f83b..c6831e5e 100644
--- a/src/rsync/go.mod
+++ b/src/rsync/go.mod
@@ -5,12 +5,7 @@ go 1.13
require (
github.com/golang/protobuf v1.4.1
github.com/googleapis/gnostic v0.4.0
- github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4 // indirect
- github.com/grpc-ecosystem/grpc-gateway v1.9.5 // indirect
- github.com/mattn/go-isatty v0.0.4 // indirect
- github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/onap/multicloud-k8s/src/orchestrator v0.0.0-20200601021239-7959bd4c6fd4
- go.etcd.io/bbolt v1.3.3 // indirect
google.golang.org/grpc v1.27.1
k8s.io/kubernetes v1.14.1
)
diff --git a/src/rsync/pkg/status/status.go b/src/rsync/pkg/status/status.go
new file mode 100644
index 00000000..28bffefd
--- /dev/null
+++ b/src/rsync/pkg/status/status.go
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2020 Intel Corporation, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package status
+
+import (
+ "encoding/json"
+ "fmt"
+ "sync"
+
+ pkgerrors "github.com/pkg/errors"
+ "github.com/sirupsen/logrus"
+
+ v1alpha1 "github.com/onap/multicloud-k8s/src/monitor/pkg/apis/k8splugin/v1alpha1"
+ clientset "github.com/onap/multicloud-k8s/src/monitor/pkg/generated/clientset/versioned"
+ informers "github.com/onap/multicloud-k8s/src/monitor/pkg/generated/informers/externalversions"
+ "k8s.io/client-go/tools/cache"
+ "k8s.io/client-go/tools/clientcmd"
+)
+
+type channelManager struct {
+ channels map[string]chan struct{}
+ sync.Mutex
+}
+
+var channelData channelManager
+
+const monitorLabel = "emco/deployment-id"
+
+// HandleStatusUpdate for an application in a cluster
+// TODO: Add code for specific handling
+func HandleStatusUpdate(provider, name string, id string, v *v1alpha1.ResourceBundleState) error {
+ logrus.Info("label::", id)
+ //status := v.Status.ServiceStatuses
+ //podStatus := v.Status.PodStatuses
+ // Store Pod Status in app context
+ out, _ := json.Marshal(v.Status)
+ logrus.Info("Status::", string(out))
+ return nil
+}
+
+// StartClusterWatcher watches for CR
+// configBytes - Kubectl file data
+func StartClusterWatcher(provider, name string, configBytes []byte) error {
+ key := provider + "+" + name
+ // Get the lock
+ channelData.Lock()
+ defer channelData.Unlock()
+ // For first time
+ if channelData.channels == nil {
+ channelData.channels = make(map[string]chan struct{})
+ }
+ _, ok := channelData.channels[key]
+ if !ok {
+ // Create Channel
+ channelData.channels[key] = make(chan struct{})
+ // Create config
+ config, err := clientcmd.RESTConfigFromKubeConfig(configBytes)
+ if err != nil {
+ logrus.Info(fmt.Sprintf("RESTConfigFromKubeConfig error: %s", err.Error()))
+ return pkgerrors.Wrap(err, "RESTConfigFromKubeConfig error")
+ }
+ k8sClient, err := clientset.NewForConfig(config)
+ if err != nil {
+ return pkgerrors.Wrap(err, "Clientset NewForConfig error")
+ }
+ // Create Informer
+ mInformerFactory := informers.NewSharedInformerFactory(k8sClient, 0)
+ mInformer := mInformerFactory.K8splugin().V1alpha1().ResourceBundleStates().Informer()
+ go scheduleStatus(provider, name, channelData.channels[key], mInformer)
+ }
+ return nil
+}
+
+// StopClusterWatcher stop watching a cluster
+func StopClusterWatcher(provider, name string) {
+ key := provider + "+" + name
+ if channelData.channels != nil {
+ c, ok := channelData.channels[key]
+ if ok {
+ close(c)
+ }
+ }
+}
+
+// CloseAllClusterWatchers close all channels
+func CloseAllClusterWatchers() {
+ if channelData.channels == nil {
+ return
+ }
+ // Close all Channels to stop all watchers
+ for _, e := range channelData.channels {
+ close(e)
+ }
+}
+
+// Per Cluster Go routine to watch CR
+func scheduleStatus(provider, name string, c <-chan struct{}, s cache.SharedIndexInformer) {
+ handlers := cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ v, ok := obj.(*v1alpha1.ResourceBundleState)
+ if ok {
+ labels := v.GetLabels()
+ l, ok := labels[monitorLabel]
+ if ok {
+ HandleStatusUpdate(provider, name, l, v)
+ }
+ }
+ },
+ UpdateFunc: func(oldObj, obj interface{}) {
+ v, ok := obj.(*v1alpha1.ResourceBundleState)
+ if ok {
+ labels := v.GetLabels()
+ l, ok := labels[monitorLabel]
+ if ok {
+ HandleStatusUpdate(provider, name, l, v)
+ }
+ }
+ },
+ DeleteFunc: func(obj interface{}) {
+ // Ignore it
+ },
+ }
+ s.AddEventHandler(handlers)
+ s.Run(c)
+}