diff options
author | Lukasz Rajewski <lukasz.rajewski@orange.com> | 2022-02-15 22:39:37 +0100 |
---|---|---|
committer | Lukasz Rajewski <lukasz.rajewski@orange.com> | 2022-03-02 22:46:03 +0100 |
commit | 5b18db4fc784763402e0898bf5e996886279347e (patch) | |
tree | 984a315638e1ef87841144fbb6a7e56484ffd12c | |
parent | a73b42b9c3877f1a34939d85941482f7f5c44db9 (diff) |
Implementation of status notification mechanism0.10.0
- Subscription CRUD endpoints
- Subscription notifu executor
- Cleanup of subscriptions on instance delete
- Sending notification to the specified callback
Issue-ID: MULTICLOUD-1445
Signed-off-by: Lukasz Rajewski <lukasz.rajewski@orange.com>
Change-Id: I5b867a348e916f6c2c471bcc5326c831d832f45e
-rw-r--r-- | src/k8splugin/api/api.go | 17 | ||||
-rw-r--r-- | src/k8splugin/api/brokerhandler_test.go | 8 | ||||
-rw-r--r-- | src/k8splugin/api/defhandler_test.go | 12 | ||||
-rw-r--r-- | src/k8splugin/api/healthcheckhandler_test.go | 4 | ||||
-rw-r--r-- | src/k8splugin/api/instancehandler.go | 2 | ||||
-rw-r--r-- | src/k8splugin/api/instancehandler_test.go | 12 | ||||
-rw-r--r-- | src/k8splugin/api/profilehandler_test.go | 10 | ||||
-rw-r--r-- | src/k8splugin/api/statussubhandler.go | 229 | ||||
-rw-r--r-- | src/k8splugin/cmd/main.go | 4 | ||||
-rw-r--r-- | src/k8splugin/internal/app/client.go | 17 | ||||
-rw-r--r-- | src/k8splugin/internal/app/instance.go | 31 | ||||
-rw-r--r-- | src/k8splugin/internal/app/subscription.go | 752 |
12 files changed, 1060 insertions, 38 deletions
diff --git a/src/k8splugin/api/api.go b/src/k8splugin/api/api.go index 64c83e03..64959f5e 100644 --- a/src/k8splugin/api/api.go +++ b/src/k8splugin/api/api.go @@ -1,7 +1,7 @@ /* Copyright 2018 Intel Corporation. Copyright © 2021 Samsung Electronics -Copyright © 2021 Orange +Copyright © 2022 Orange Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -33,6 +33,7 @@ func NewRouter(defClient rb.DefinitionManager, configClient app.ConfigManager, connectionClient connection.ConnectionManager, templateClient rb.ConfigTemplateManager, + subscriptionClient app.InstanceStatusSubManager, healthcheckClient healthcheck.InstanceHCManager) *mux.Router { router := mux.NewRouter() @@ -52,7 +53,6 @@ func NewRouter(defClient rb.DefinitionManager, "profile-name", "{profile-name}").Methods("GET") //Want to get full Data -> add query param: /install/{instID}?full=true instRouter.HandleFunc("/instance/{instID}", instHandler.getHandler).Methods("GET") - instRouter.HandleFunc("/instance/{instID}/status", instHandler.statusHandler).Methods("GET") instRouter.HandleFunc("/instance/{instID}/upgrade", instHandler.upgradeHandler).Methods("POST") instRouter.HandleFunc("/instance/{instID}/query", instHandler.queryHandler).Methods("GET") instRouter.HandleFunc("/instance/{instID}/query", instHandler.queryHandler). @@ -62,6 +62,19 @@ func NewRouter(defClient rb.DefinitionManager, "Labels", "{Labels}").Methods("GET") instRouter.HandleFunc("/instance/{instID}", instHandler.deleteHandler).Methods("DELETE") + // Status handler routes + if subscriptionClient == nil { + subscriptionClient = app.NewInstanceStatusSubClient() + subscriptionClient.RestoreWatchers() + } + instanceStatusSubHandler := instanceStatusSubHandler{client: subscriptionClient} + instRouter.HandleFunc("/instance/{instID}/status", instHandler.statusHandler).Methods("GET") + instRouter.HandleFunc("/instance/{instID}/status/subscription", instanceStatusSubHandler.listHandler).Methods("GET") + instRouter.HandleFunc("/instance/{instID}/status/subscription", instanceStatusSubHandler.createHandler).Methods("POST") + instRouter.HandleFunc("/instance/{instID}/status/subscription/{subID}", instanceStatusSubHandler.getHandler).Methods("GET") + instRouter.HandleFunc("/instance/{instID}/status/subscription/{subID}", instanceStatusSubHandler.updateHandler).Methods("PUT") + instRouter.HandleFunc("/instance/{instID}/status/subscription/{subID}", instanceStatusSubHandler.deleteHandler).Methods("DELETE") + // Query handler routes if queryClient == nil { queryClient = app.NewQueryClient() diff --git a/src/k8splugin/api/brokerhandler_test.go b/src/k8splugin/api/brokerhandler_test.go index 767cae1e..a08b8a9b 100644 --- a/src/k8splugin/api/brokerhandler_test.go +++ b/src/k8splugin/api/brokerhandler_test.go @@ -313,7 +313,7 @@ func TestBrokerCreateHandler(t *testing.T) { t.Run(testCase.label, func(t *testing.T) { request := httptest.NewRequest("POST", "/cloudowner/cloudregion/infra_workload", testCase.input) - resp := executeRequest(request, NewRouter(nil, nil, testCase.instClient, nil, nil, nil, nil, nil)) + resp := executeRequest(request, NewRouter(nil, nil, testCase.instClient, nil, nil, nil, nil, nil, nil)) defer resp.Body.Close() if testCase.expectedCode != resp.StatusCode { @@ -409,7 +409,7 @@ func TestBrokerGetHandler(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.label, func(t *testing.T) { request := httptest.NewRequest("GET", "/cloudowner/cloudregion/infra_workload/"+testCase.input, nil) - resp := executeRequest(request, NewRouter(nil, nil, testCase.instClient, nil, nil, nil, nil, nil)) + resp := executeRequest(request, NewRouter(nil, nil, testCase.instClient, nil, nil, nil, nil, nil, nil)) if testCase.expectedCode != resp.StatusCode { t.Fatalf("Request method returned: %v and it was expected: %v", @@ -489,7 +489,7 @@ func TestBrokerFindHandler(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.label, func(t *testing.T) { request := httptest.NewRequest("GET", "/cloudowner/cloudregion/infra_workload?name="+testCase.input, nil) - resp := executeRequest(request, NewRouter(nil, nil, testCase.instClient, nil, nil, nil, nil, nil)) + resp := executeRequest(request, NewRouter(nil, nil, testCase.instClient, nil, nil, nil, nil, nil, nil)) if testCase.expectedCode != resp.StatusCode { t.Fatalf("Request method returned: %v and it was expected: %v", @@ -551,7 +551,7 @@ func TestBrokerDeleteHandler(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.label, func(t *testing.T) { request := httptest.NewRequest("DELETE", "/cloudowner/cloudregion/infra_workload/"+testCase.input, nil) - resp := executeRequest(request, NewRouter(nil, nil, testCase.instClient, nil, nil, nil, nil, nil)) + resp := executeRequest(request, NewRouter(nil, nil, testCase.instClient, nil, nil, nil, nil, nil, nil)) if testCase.expectedCode != resp.StatusCode { t.Fatalf("Request method returned: %v and it was expected: %v", resp.StatusCode, testCase.expectedCode) diff --git a/src/k8splugin/api/defhandler_test.go b/src/k8splugin/api/defhandler_test.go index b626b6f3..22d45487 100644 --- a/src/k8splugin/api/defhandler_test.go +++ b/src/k8splugin/api/defhandler_test.go @@ -139,7 +139,7 @@ func TestRBDefCreateHandler(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.label, func(t *testing.T) { request := httptest.NewRequest("POST", "/v1/rb/definition", testCase.reader) - resp := executeRequest(request, NewRouter(testCase.rbDefClient, nil, nil, nil, nil, nil, nil, nil)) + resp := executeRequest(request, NewRouter(testCase.rbDefClient, nil, nil, nil, nil, nil, nil, nil, nil)) //Check returned code if resp.StatusCode != testCase.expectedCode { @@ -208,7 +208,7 @@ func TestRBDefListVersionsHandler(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.label, func(t *testing.T) { request := httptest.NewRequest("GET", "/v1/rb/definition/testresourcebundle", nil) - resp := executeRequest(request, NewRouter(testCase.rbDefClient, nil, nil, nil, nil, nil, nil, nil)) + resp := executeRequest(request, NewRouter(testCase.rbDefClient, nil, nil, nil, nil, nil, nil, nil, nil)) //Check returned code if resp.StatusCode != testCase.expectedCode { @@ -288,7 +288,7 @@ func TestRBDefListAllHandler(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.label, func(t *testing.T) { request := httptest.NewRequest("GET", "/v1/rb/definition", nil) - resp := executeRequest(request, NewRouter(testCase.rbDefClient, nil, nil, nil, nil, nil, nil, nil)) + resp := executeRequest(request, NewRouter(testCase.rbDefClient, nil, nil, nil, nil, nil, nil, nil, nil)) //Check returned code if resp.StatusCode != testCase.expectedCode { @@ -368,7 +368,7 @@ func TestRBDefGetHandler(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.label, func(t *testing.T) { request := httptest.NewRequest("GET", "/v1/rb/definition/"+testCase.name+"/"+testCase.version, nil) - resp := executeRequest(request, NewRouter(testCase.rbDefClient, nil, nil, nil, nil, nil, nil, nil)) + resp := executeRequest(request, NewRouter(testCase.rbDefClient, nil, nil, nil, nil, nil, nil, nil, nil)) //Check returned code if resp.StatusCode != testCase.expectedCode { @@ -419,7 +419,7 @@ func TestRBDefDeleteHandler(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.label, func(t *testing.T) { request := httptest.NewRequest("DELETE", "/v1/rb/definition/"+testCase.name+"/"+testCase.version, nil) - resp := executeRequest(request, NewRouter(testCase.rbDefClient, nil, nil, nil, nil, nil, nil, nil)) + resp := executeRequest(request, NewRouter(testCase.rbDefClient, nil, nil, nil, nil, nil, nil, nil, nil)) //Check returned code if resp.StatusCode != testCase.expectedCode { @@ -476,7 +476,7 @@ func TestRBDefUploadHandler(t *testing.T) { t.Run(testCase.label, func(t *testing.T) { request := httptest.NewRequest("POST", "/v1/rb/definition/"+testCase.name+"/"+testCase.version+"/content", testCase.body) - resp := executeRequest(request, NewRouter(testCase.rbDefClient, nil, nil, nil, nil, nil, nil, nil)) + resp := executeRequest(request, NewRouter(testCase.rbDefClient, nil, nil, nil, nil, nil, nil, nil, nil)) //Check returned code if resp.StatusCode != testCase.expectedCode { diff --git a/src/k8splugin/api/healthcheckhandler_test.go b/src/k8splugin/api/healthcheckhandler_test.go index 3a03d902..c6c07c16 100644 --- a/src/k8splugin/api/healthcheckhandler_test.go +++ b/src/k8splugin/api/healthcheckhandler_test.go @@ -35,7 +35,7 @@ func TestHealthCheckHandler(t *testing.T) { Err: nil, } request := httptest.NewRequest("GET", "/v1/healthcheck", nil) - resp := executeRequest(request, NewRouter(nil, nil, nil, nil, nil, nil, nil, nil)) + resp := executeRequest(request, NewRouter(nil, nil, nil, nil, nil, nil, nil, nil, nil)) //Check returned code if resp.StatusCode != http.StatusOK { @@ -48,7 +48,7 @@ func TestHealthCheckHandler(t *testing.T) { Err: pkgerrors.New("Runtime Error in DB"), } request := httptest.NewRequest("GET", "/v1/healthcheck", nil) - resp := executeRequest(request, NewRouter(nil, nil, nil, nil, nil, nil, nil, nil)) + resp := executeRequest(request, NewRouter(nil, nil, nil, nil, nil, nil, nil, nil, nil)) //Check returned code if resp.StatusCode != http.StatusInternalServerError { diff --git a/src/k8splugin/api/instancehandler.go b/src/k8splugin/api/instancehandler.go index 6d1fd7b3..dd5fd0dd 100644 --- a/src/k8splugin/api/instancehandler.go +++ b/src/k8splugin/api/instancehandler.go @@ -211,7 +211,7 @@ func (i instanceHandler) statusHandler(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) id := vars["instID"] - resp, err := i.client.Status(id) + resp, err := i.client.Status(id, true) if err != nil { log.Error("Error getting Status", log.Fields{ "error": err, diff --git a/src/k8splugin/api/instancehandler_test.go b/src/k8splugin/api/instancehandler_test.go index 444b6695..f06af446 100644 --- a/src/k8splugin/api/instancehandler_test.go +++ b/src/k8splugin/api/instancehandler_test.go @@ -72,7 +72,7 @@ func (m *mockInstanceClient) Query(id, apiVersion, kind, name, labels string) (a return m.statusItem, nil } -func (m *mockInstanceClient) Status(id string) (app.InstanceStatus, error) { +func (m *mockInstanceClient) Status(id string, checkReady bool) (app.InstanceStatus, error) { if m.err != nil { return app.InstanceStatus{}, m.err } @@ -205,7 +205,7 @@ func TestInstanceCreateHandler(t *testing.T) { t.Run(testCase.label, func(t *testing.T) { request := httptest.NewRequest("POST", "/v1/instance", testCase.input) - resp := executeRequest(request, NewRouter(nil, nil, testCase.instClient, nil, nil, nil, nil, nil)) + resp := executeRequest(request, NewRouter(nil, nil, testCase.instClient, nil, nil, nil, nil, nil, nil)) if testCase.expectedCode != resp.StatusCode { body, _ := ioutil.ReadAll(resp.Body) @@ -306,7 +306,7 @@ func TestInstanceGetHandler(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.label, func(t *testing.T) { request := httptest.NewRequest("GET", "/v1/instance/"+testCase.input, nil) - resp := executeRequest(request, NewRouter(nil, nil, testCase.instClient, nil, nil, nil, nil, nil)) + resp := executeRequest(request, NewRouter(nil, nil, testCase.instClient, nil, nil, nil, nil, nil, nil)) if testCase.expectedCode != resp.StatusCode { t.Fatalf("Request method returned: %v and it was expected: %v", @@ -441,7 +441,7 @@ func TestInstanceListHandler(t *testing.T) { } request.URL.RawQuery = q.Encode() } - resp := executeRequest(request, NewRouter(nil, nil, testCase.instClient, nil, nil, nil, nil, nil)) + resp := executeRequest(request, NewRouter(nil, nil, testCase.instClient, nil, nil, nil, nil, nil, nil)) if testCase.expectedCode != resp.StatusCode { t.Fatalf("Request method returned: %v and it was expected: %v", @@ -500,7 +500,7 @@ func TestDeleteHandler(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.label, func(t *testing.T) { request := httptest.NewRequest("DELETE", "/v1/instance/"+testCase.input, nil) - resp := executeRequest(request, NewRouter(nil, nil, testCase.instClient, nil, nil, nil, nil, nil)) + resp := executeRequest(request, NewRouter(nil, nil, testCase.instClient, nil, nil, nil, nil, nil, nil)) if testCase.expectedCode != resp.StatusCode { t.Fatalf("Request method returned: %v and it was expected: %v", resp.StatusCode, testCase.expectedCode) @@ -734,7 +734,7 @@ func TestInstanceQueryHandler(t *testing.T) { } url := "/v1/instance/" + testCase.id + "/query?" + params.Encode() request := httptest.NewRequest("GET", url, nil) - resp := executeRequest(request, NewRouter(nil, nil, testCase.instClient, nil, nil, nil, nil, nil)) + resp := executeRequest(request, NewRouter(nil, nil, testCase.instClient, nil, nil, nil, nil, nil, nil)) if testCase.expectedCode != resp.StatusCode { body, _ := ioutil.ReadAll(resp.Body) diff --git a/src/k8splugin/api/profilehandler_test.go b/src/k8splugin/api/profilehandler_test.go index 181b775b..57040668 100644 --- a/src/k8splugin/api/profilehandler_test.go +++ b/src/k8splugin/api/profilehandler_test.go @@ -127,7 +127,7 @@ func TestRBProfileCreateHandler(t *testing.T) { t.Run(testCase.label, func(t *testing.T) { request := httptest.NewRequest("POST", "/v1/rb/definition/test-rbdef/v1/profile", testCase.reader) - resp := executeRequest(request, NewRouter(nil, testCase.rbProClient, nil, nil, nil, nil, nil, nil)) + resp := executeRequest(request, NewRouter(nil, testCase.rbProClient, nil, nil, nil, nil, nil, nil, nil)) //Check returned code if resp.StatusCode != testCase.expectedCode { @@ -207,7 +207,7 @@ func TestRBProfileGetHandler(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.label, func(t *testing.T) { request := httptest.NewRequest("GET", "/v1/rb/definition/test-rbdef/v1/profile/"+testCase.prname, nil) - resp := executeRequest(request, NewRouter(nil, testCase.rbProClient, nil, nil, nil, nil, nil, nil)) + resp := executeRequest(request, NewRouter(nil, testCase.rbProClient, nil, nil, nil, nil, nil, nil, nil)) //Check returned code if resp.StatusCode != testCase.expectedCode { @@ -288,7 +288,7 @@ func TestRBProfileListHandler(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.label, func(t *testing.T) { request := httptest.NewRequest("GET", "/v1/rb/definition/"+testCase.def+"/"+testCase.version+"/profile", nil) - resp := executeRequest(request, NewRouter(nil, testCase.rbProClient, nil, nil, nil, nil, nil, nil)) + resp := executeRequest(request, NewRouter(nil, testCase.rbProClient, nil, nil, nil, nil, nil, nil, nil)) //Check returned code if resp.StatusCode != testCase.expectedCode { @@ -347,7 +347,7 @@ func TestRBProfileDeleteHandler(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.label, func(t *testing.T) { request := httptest.NewRequest("DELETE", "/v1/rb/definition/test-rbdef/v1/profile/"+testCase.prname, nil) - resp := executeRequest(request, NewRouter(nil, testCase.rbProClient, nil, nil, nil, nil, nil, nil)) + resp := executeRequest(request, NewRouter(nil, testCase.rbProClient, nil, nil, nil, nil, nil, nil, nil)) //Check returned code if resp.StatusCode != testCase.expectedCode { @@ -400,7 +400,7 @@ func TestRBProfileUploadHandler(t *testing.T) { t.Run(testCase.label, func(t *testing.T) { request := httptest.NewRequest("POST", "/v1/rb/definition/test-rbdef/v1/profile/"+testCase.prname+"/content", testCase.body) - resp := executeRequest(request, NewRouter(nil, testCase.rbProClient, nil, nil, nil, nil, nil, nil)) + resp := executeRequest(request, NewRouter(nil, testCase.rbProClient, nil, nil, nil, nil, nil, nil, nil)) //Check returned code if resp.StatusCode != testCase.expectedCode { diff --git a/src/k8splugin/api/statussubhandler.go b/src/k8splugin/api/statussubhandler.go new file mode 100644 index 00000000..c5c8de23 --- /dev/null +++ b/src/k8splugin/api/statussubhandler.go @@ -0,0 +1,229 @@ +/* +Copyright © 2022 Orange +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package api + +import ( + "encoding/json" + "io" + "net/http" + + "github.com/onap/multicloud-k8s/src/k8splugin/internal/app" + log "github.com/onap/multicloud-k8s/src/k8splugin/internal/logutils" + + "github.com/gorilla/mux" +) + +// Used to store the backend implementation objects +// Also simplifies the mocking needed for unit testing +type instanceStatusSubHandler struct { + // Interface that implements Status Subscription operations + client app.InstanceStatusSubManager +} + +func (iss instanceStatusSubHandler) createHandler(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + id := vars["instID"] + + var subRequest app.SubscriptionRequest + + err := json.NewDecoder(r.Body).Decode(&subRequest) + switch { + case err == io.EOF: + log.Error("Body Empty", log.Fields{ + "error": io.EOF, + }) + http.Error(w, "Body empty", http.StatusBadRequest) + return + case err != nil: + log.Error("Error unmarshaling Body", log.Fields{ + "error": err, + }) + http.Error(w, err.Error(), http.StatusUnprocessableEntity) + return + } + + // Name is required. + if subRequest.Name == "" { + http.Error(w, "Missing name in POST request", http.StatusBadRequest) + return + } + + // MinNotifyInterval cannot be less than 0 + if subRequest.MinNotifyInterval < 0 { + http.Error(w, "Min Notify Interval has invalid value", http.StatusBadRequest) + return + } + + // CallbackUrl is required + if subRequest.CallbackUrl == "" { + http.Error(w, "CallbackUrl has invalid value", http.StatusBadRequest) + return + } + + resp, err := iss.client.Create(id, subRequest) + if err != nil { + log.Error("Error creating subscription", log.Fields{ + "error": err, + "instance": id, + "response": resp, + }) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusCreated) + err = json.NewEncoder(w).Encode(resp) + if err != nil { + log.Error("Error Marshaling Response", log.Fields{ + "error": err, + "response": resp, + }) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + +func (iss instanceStatusSubHandler) getHandler(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + instanceId := vars["instID"] + subId := vars["subID"] + + resp, err := iss.client.Get(instanceId, subId) + if err != nil { + log.Error("Error getting instance's Status Subscription", log.Fields{ + "error": err, + "instanceID": instanceId, + "subscriptionID": subId, + }) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + err = json.NewEncoder(w).Encode(resp) + if err != nil { + log.Error("Error Marshaling Response", log.Fields{ + "error": err, + "response": resp, + }) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + +func (iss instanceStatusSubHandler) updateHandler(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + instanceId := vars["instID"] + subId := vars["subID"] + + var subRequest app.SubscriptionRequest + + err := json.NewDecoder(r.Body).Decode(&subRequest) + switch { + case err == io.EOF: + log.Error("Body Empty", log.Fields{ + "error": io.EOF, + }) + http.Error(w, "Body empty", http.StatusBadRequest) + return + case err != nil: + log.Error("Error unmarshaling Body", log.Fields{ + "error": err, + }) + http.Error(w, err.Error(), http.StatusUnprocessableEntity) + return + } + + // MinNotifyInterval cannot be less than 0 + if subRequest.MinNotifyInterval < 0 { + http.Error(w, "Min Notify Interval has invalid value", http.StatusBadRequest) + return + } + + // CallbackUrl is required + if subRequest.CallbackUrl == "" { + http.Error(w, "CallbackUrl has invalid value", http.StatusBadRequest) + return + } + + resp, err := iss.client.Update(instanceId, subId, subRequest) + if err != nil { + log.Error("Error updating instance's Status Subscription", log.Fields{ + "error": err, + "instanceID": instanceId, + "subscriptionID": subId, + }) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + err = json.NewEncoder(w).Encode(resp) + if err != nil { + log.Error("Error Marshaling Response", log.Fields{ + "error": err, + "response": resp, + }) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + +func (iss instanceStatusSubHandler) deleteHandler(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + instanceId := vars["instID"] + subId := vars["subID"] + + err := iss.client.Delete(instanceId, subId) + if err != nil { + log.Error("Error deleting instance's Status Subscription", log.Fields{ + "error": err, + "instanceID": instanceId, + "subscriptionID": subId, + }) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusAccepted) +} + +func (iss instanceStatusSubHandler) listHandler(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + id := vars["instID"] + + resp, err := iss.client.List(id) + if err != nil { + log.Error("Error listing instance Status Subscriptions", log.Fields{ + "error": err, + "instance-id": id, + }) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + err = json.NewEncoder(w).Encode(resp) + if err != nil { + log.Error("Error Marshaling Response", log.Fields{ + "error": err, + "response": resp, + }) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} diff --git a/src/k8splugin/cmd/main.go b/src/k8splugin/cmd/main.go index ff00613e..23147b5a 100644 --- a/src/k8splugin/cmd/main.go +++ b/src/k8splugin/cmd/main.go @@ -16,7 +16,6 @@ package main import ( "context" - "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils" "log" "math/rand" "net/http" @@ -27,6 +26,7 @@ import ( "github.com/onap/multicloud-k8s/src/k8splugin/api" "github.com/onap/multicloud-k8s/src/k8splugin/internal/auth" "github.com/onap/multicloud-k8s/src/k8splugin/internal/config" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/utils" "github.com/gorilla/handlers" ) @@ -40,7 +40,7 @@ func main() { rand.Seed(time.Now().UnixNano()) - httpRouter := api.NewRouter(nil, nil, nil, nil, nil, nil, nil, nil) + httpRouter := api.NewRouter(nil, nil, nil, nil, nil, nil, nil, nil, nil) loggedRouter := handlers.LoggingHandler(os.Stdout, httpRouter) log.Println("Starting Kubernetes Multicloud API") diff --git a/src/k8splugin/internal/app/client.go b/src/k8splugin/internal/app/client.go index 3aabda22..cbd3dd56 100644 --- a/src/k8splugin/internal/app/client.go +++ b/src/k8splugin/internal/app/client.go @@ -28,6 +28,7 @@ import ( //appsv1beta2 "k8s.io/api/apps/v1beta2" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" //extensionsv1beta1 "k8s.io/api/extensions/v1beta1" //apiextv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" @@ -53,9 +54,11 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/discovery/cached/disk" "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/restmapper" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" ) @@ -640,3 +643,17 @@ func (k *KubernetesClient) GetStandardClient() kubernetes.Interface { func (k *KubernetesClient) GetInstanceID() string { return k.instanceID } + +func (k *KubernetesClient) GetInformer(gvk schema.GroupVersionKind) (cache.SharedInformer, error) { + labelOptions := dynamicinformer.TweakListOptionsFunc(func(opts *metav1.ListOptions) { + opts.LabelSelector = config.GetConfiguration().KubernetesLabelName + "=" + k.instanceID + }) + + factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(k.GetDynamicClient(), 0, v1.NamespaceAll, labelOptions) + mapping, err := k.GetMapper().RESTMapping(gvk.GroupKind(), gvk.Version) + if err != nil { + return nil, pkgerrors.Wrap(err, "Preparing mapper based on GVK") + } + informer := factory.ForResource(mapping.Resource).Informer() + return informer, nil +} diff --git a/src/k8splugin/internal/app/instance.go b/src/k8splugin/internal/app/instance.go index e78eea77..91e2150e 100644 --- a/src/k8splugin/internal/app/instance.go +++ b/src/k8splugin/internal/app/instance.go @@ -121,7 +121,7 @@ type InstanceManager interface { Upgrade(id string, u UpgradeRequest) (InstanceResponse, error) Get(id string) (InstanceResponse, error) GetFull(id string) (InstanceDbData, error) - Status(id string) (InstanceStatus, error) + Status(id string, checkReady bool) (InstanceStatus, error) Query(id, apiVersion, kind, name, labels string) (InstanceStatus, error) List(rbname, rbversion, profilename string) ([]InstanceMiniResponse, error) Find(rbName string, ver string, profile string, labelKeys map[string]string) ([]InstanceMiniResponse, error) @@ -813,7 +813,7 @@ func (v *InstanceClient) Query(id, apiVersion, kind, name, labels string) (Insta } // Status returns the status for the instance -func (v *InstanceClient) Status(id string) (InstanceStatus, error) { +func (v *InstanceClient) Status(id string, checkReady bool) (InstanceStatus, error) { //Read the status from the DB key := InstanceKey{ ID: id, @@ -867,12 +867,14 @@ Main: isReady = false } else { generalStatus = append(generalStatus, status) - ready, err := v.checkRssStatus(oneResource, k8sClient, resResp.Namespace, status) + if checkReady { + ready, err := v.checkRssStatus(oneResource, k8sClient, resResp.Namespace, status) - if !ready || err != nil { - isReady = false - if err != nil { - cumulatedErrorMsg = append(cumulatedErrorMsg, err.Error()) + if !ready || err != nil { + isReady = false + if err != nil { + cumulatedErrorMsg = append(cumulatedErrorMsg, err.Error()) + } } } } @@ -905,7 +907,7 @@ Main: resp := InstanceStatus{ Request: resResp.Request, ResourceCount: int32(len(generalStatus)), - Ready: isReady && resResp.Status == "DONE", + Ready: checkReady && isReady && resResp.Status == "DONE", ResourcesStatus: generalStatus, } @@ -914,7 +916,6 @@ Main: strings.Join(cumulatedErrorMsg, "\n")) return resp, err } - //TODO Filter response content by requested verbosity (brief, ...)? return resp, nil } @@ -925,7 +926,6 @@ func (v *InstanceClient) checkRssStatus(rss helm.KubernetesResource, k8sClient K defer cancel() apiVersion, kind := rss.GVK.ToAPIVersionAndKind() - log.Printf("apiVersion: %s, Kind: %s", apiVersion, kind) var parsedRes runtime.Object //TODO: Should we care about different api version for a same kind? @@ -1143,6 +1143,12 @@ func (v *InstanceClient) Delete(id string) error { } }() } else { + subscriptionClient := NewInstanceStatusSubClient() + err = subscriptionClient.Cleanup(id) + if err != nil { + log.Printf(err.Error()) + } + err = db.DBconn.Delete(v.storeName, key, v.tagInst) if err != nil { return pkgerrors.Wrap(err, "Delete Instance") @@ -1288,6 +1294,11 @@ func (v *InstanceClient) runPostDelete(k8sClient KubernetesClient, hookClient *H return pkgerrors.Wrap(err, "Error running post-delete hooks") } if clearDb { + subscriptionClient := NewInstanceStatusSubClient() + err = subscriptionClient.Cleanup(instance.ID) + if err != nil { + log.Printf(err.Error()) + } err = db.DBconn.Delete(v.storeName, key, v.tagInst) if err != nil { log.Printf("Delete Instance DB Entry for release %s has error.", instance.ReleaseName) diff --git a/src/k8splugin/internal/app/subscription.go b/src/k8splugin/internal/app/subscription.go new file mode 100644 index 00000000..9b4a1aaf --- /dev/null +++ b/src/k8splugin/internal/app/subscription.go @@ -0,0 +1,752 @@ +/* + * Copyright © 2022 Orange + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package app + +import ( + "bytes" + "encoding/json" + "io/ioutil" + "net" + "net/http" + "strings" + "sync" + "time" + + "github.com/onap/multicloud-k8s/src/k8splugin/internal/db" + log "github.com/onap/multicloud-k8s/src/k8splugin/internal/logutils" + "github.com/onap/multicloud-k8s/src/k8splugin/internal/rb" + pkgerrors "github.com/pkg/errors" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/tools/cache" +) + +// QueryStatus is what is returned when status is queried for an instance +type StatusSubscription struct { + Name string `json:"name"` + MinNotifyInterval int32 `json:"min-notify-interval"` + LastUpdateTime time.Time `json:"last-update-time"` + CallbackUrl string `json:"callback-url"` + LastNotifyTime time.Time `json:"last-notify-time"` + LastNotifyStatus int32 `json:"last-notify-status"` + NotifyMetadata map[string]interface{} `json:"metadata"` +} + +type SubscriptionRequest struct { + Name string `json:"name"` + MinNotifyInterval int32 `json:"min-notify-interval"` + NotifyMetadata map[string]interface{} `json:"metadata"` + CallbackUrl string `json:"callback-url"` +} + +// StatusSubscriptionKey is used as the primary key in the db +type StatusSubscriptionKey struct { + InstanceId string `json:"instanceid"` + SubscriptionName string `json:"subscriptionname"` +} + +// We will use json marshalling to convert to string to +// preserve the underlying structure. +func (dk StatusSubscriptionKey) String() string { + out, err := json.Marshal(dk) + if err != nil { + return "" + } + + return string(out) +} + +// InstanceStatusSubClient implements InstanceStatusSubManager +type InstanceStatusSubClient struct { + storeName string + tagInst string +} + +func NewInstanceStatusSubClient() *InstanceStatusSubClient { + return &InstanceStatusSubClient{ + storeName: "rbdef", + tagInst: "instanceStatusSub", + } +} + +type notifyResult struct { + result int32 + time time.Time +} + +type resourceStatusDelta struct { + Created []ResourceStatus `json:"created"` + Deleted []ResourceStatus `json:"deleted"` + Modified []ResourceStatus `json:"modified"` +} + +type notifyRequestPayload struct { + InstanceId string `json:"instance-id"` + Subscription string `json:"subscription-name"` + Metadata map[string]interface{} `json:"metadata"` + Delta resourceStatusDelta `json:"resource-changes"` +} + +func (rsd resourceStatusDelta) Delta() bool { + return len(rsd.Created) > 0 || len(rsd.Deleted) > 0 || len(rsd.Modified) > 0 +} + +type notifyChannelData struct { + instanceId string + subscription StatusSubscription + action string + delta resourceStatusDelta + notifyResult chan notifyResult +} + +type subscriptionWatch struct { + watcherStop chan struct{} + lastUpdateTime time.Time +} + +type subscriptionWatchManager struct { + watchersStatus map[string]subscriptionWatch +} + +type subscriptionNotifyManager struct { + notifyLockMap map[string]*sync.Mutex + notifyChannel map[string]chan notifyChannelData + watchersStatus map[string]subscriptionWatchManager + sync.Mutex +} + +var subscriptionNotifyData = subscriptionNotifyManager{ + notifyLockMap: map[string]*sync.Mutex{}, + notifyChannel: map[string]chan notifyChannelData{}, + watchersStatus: map[string]subscriptionWatchManager{}, +} + +// InstanceStatusSubManager is an interface exposes the status subscription functionality +type InstanceStatusSubManager interface { + Create(instanceId string, subDetails SubscriptionRequest) (StatusSubscription, error) + Get(instanceId, subId string) (StatusSubscription, error) + Update(instanceId, subId string, subDetails SubscriptionRequest) (StatusSubscription, error) + List(instanceId string) ([]StatusSubscription, error) + Delete(instanceId, subId string) error + Cleanup(instanceId string) error + RestoreWatchers() +} + +// Create Status Subscription +func (iss *InstanceStatusSubClient) Create(instanceId string, subDetails SubscriptionRequest) (StatusSubscription, error) { + + _, err := iss.Get(instanceId, subDetails.Name) + if err == nil { + return StatusSubscription{}, pkgerrors.New("Subscription already exists") + } + + lock, _, _ := getSubscriptionData(instanceId) + + key := StatusSubscriptionKey{ + InstanceId: instanceId, + SubscriptionName: subDetails.Name, + } + + sub := StatusSubscription{ + Name: subDetails.Name, + MinNotifyInterval: subDetails.MinNotifyInterval, + LastNotifyStatus: 0, + CallbackUrl: subDetails.CallbackUrl, + LastUpdateTime: time.Now(), + LastNotifyTime: time.Now(), + NotifyMetadata: subDetails.NotifyMetadata, + } + if sub.NotifyMetadata == nil { + sub.NotifyMetadata = make(map[string]interface{}) + } + + err = iss.refreshWatchers(instanceId, subDetails.Name) + if err != nil { + return sub, pkgerrors.Wrap(err, "Creating Status Subscription DB Entry") + } + + lock.Lock() + defer lock.Unlock() + + err = db.DBconn.Create(iss.storeName, key, iss.tagInst, sub) + if err != nil { + return sub, pkgerrors.Wrap(err, "Creating Status Subscription DB Entry") + } + log.Info("Successfully created Status Subscription", log.Fields{ + "InstanceId": instanceId, + "SubscriptionName": subDetails.Name, + }) + + go runNotifyThread(instanceId, sub.Name) + + return sub, nil +} + +// Get Status subscription +func (iss *InstanceStatusSubClient) Get(instanceId, subId string) (StatusSubscription, error) { + lock, _, _ := getSubscriptionData(instanceId) + // Acquire Mutex + lock.Lock() + defer lock.Unlock() + key := StatusSubscriptionKey{ + InstanceId: instanceId, + SubscriptionName: subId, + } + DBResp, err := db.DBconn.Read(iss.storeName, key, iss.tagInst) + if err != nil || DBResp == nil { + return StatusSubscription{}, pkgerrors.Wrap(err, "Error retrieving Subscription data") + } + + sub := StatusSubscription{} + err = db.DBconn.Unmarshal(DBResp, &sub) + if err != nil { + return StatusSubscription{}, pkgerrors.Wrap(err, "Demarshalling Subscription Value") + } + return sub, nil +} + +// Update status subscription +func (iss *InstanceStatusSubClient) Update(instanceId, subId string, subDetails SubscriptionRequest) (StatusSubscription, error) { + sub, err := iss.Get(instanceId, subDetails.Name) + if err != nil { + return StatusSubscription{}, pkgerrors.Wrap(err, "Subscription does not exist") + } + + lock, _, _ := getSubscriptionData(instanceId) + + key := StatusSubscriptionKey{ + InstanceId: instanceId, + SubscriptionName: subDetails.Name, + } + + sub.MinNotifyInterval = subDetails.MinNotifyInterval + sub.CallbackUrl = subDetails.CallbackUrl + sub.NotifyMetadata = subDetails.NotifyMetadata + if sub.NotifyMetadata == nil { + sub.NotifyMetadata = make(map[string]interface{}) + } + + err = iss.refreshWatchers(instanceId, subDetails.Name) + if err != nil { + return sub, pkgerrors.Wrap(err, "Updating Status Subscription DB Entry") + } + + lock.Lock() + defer lock.Unlock() + + err = db.DBconn.Update(iss.storeName, key, iss.tagInst, sub) + if err != nil { + return sub, pkgerrors.Wrap(err, "Updating Status Subscription DB Entry") + } + log.Info("Successfully updated Status Subscription", log.Fields{ + "InstanceId": instanceId, + "SubscriptionName": subDetails.Name, + }) + + return sub, nil +} + +// Get list of status subscriptions +func (iss *InstanceStatusSubClient) List(instanceId string) ([]StatusSubscription, error) { + + lock, _, _ := getSubscriptionData(instanceId) + // Acquire Mutex + lock.Lock() + defer lock.Unlock() + // Retrieve info about created status subscriptions + dbResp, err := db.DBconn.ReadAll(iss.storeName, iss.tagInst) + if err != nil { + if !strings.Contains(err.Error(), "Did not find any objects with tag") { + return []StatusSubscription{}, pkgerrors.Wrap(err, "Getting Status Subscription data") + } + } + subList := make([]StatusSubscription, 0) + for key, value := range dbResp { + if key != "" { + subKey := StatusSubscriptionKey{} + err = json.Unmarshal([]byte(key), &subKey) + if err != nil { + log.Error("Error demarshaling Status Subscription Key DB data", log.Fields{ + "error": err.Error(), + "key": key}) + return []StatusSubscription{}, pkgerrors.Wrap(err, "Demarshalling subscription key") + } + if subKey.InstanceId != instanceId { + continue + } + } + //value is a byte array + if value != nil { + sub := StatusSubscription{} + err = db.DBconn.Unmarshal(value, &sub) + if err != nil { + log.Error("Error demarshaling Status Subscription DB data", log.Fields{ + "error": err.Error(), + "key": key}) + } + subList = append(subList, sub) + } + } + + return subList, nil +} + +// Delete status subscription +func (iss *InstanceStatusSubClient) Delete(instanceId, subId string) error { + _, err := iss.Get(instanceId, subId) + if err != nil { + return pkgerrors.Wrap(err, "Subscription does not exist") + } + lock, _, watchers := getSubscriptionData(instanceId) + // Acquire Mutex + lock.Lock() + defer lock.Unlock() + + close(watchers.watchersStatus[subId].watcherStop) + delete(watchers.watchersStatus, subId) + + key := StatusSubscriptionKey{ + InstanceId: instanceId, + SubscriptionName: subId, + } + err = db.DBconn.Delete(iss.storeName, key, iss.tagInst) + if err != nil { + return pkgerrors.Wrap(err, "Removing Status Subscription in DB") + } + return nil +} + +// Cleanup status subscriptions for instance +func (iss *InstanceStatusSubClient) Cleanup(instanceId string) error { + subList, err := iss.List(instanceId) + if err != nil { + return err + } + + for _, sub := range subList { + err = iss.Delete(instanceId, sub.Name) + if err != nil { + log.Error("Error deleting ", log.Fields{ + "error": err.Error(), + "key": sub.Name}) + } + } + removeSubscriptionData(instanceId) + return err +} + +// Restore status subscriptions notify threads +func (iss *InstanceStatusSubClient) RestoreWatchers() { + go func() { + time.Sleep(time.Second * 10) + log.Info("Restoring status subscription notifications", log.Fields{}) + v := NewInstanceClient() + instances, err := v.List("", "", "") + if err != nil { + log.Error("Error reading instance list", log.Fields{ + "error": err.Error(), + }) + } + for _, instance := range instances { + subList, err := iss.List(instance.ID) + if err != nil { + log.Error("Error reading subscription list for instance", log.Fields{ + "error": err.Error(), + "instance": instance.ID, + }) + continue + } + + for _, sub := range subList { + err = iss.refreshWatchers(instance.ID, sub.Name) + if err != nil { + log.Error("Error on refreshing watchers", log.Fields{ + "error": err.Error(), + "instance": instance.ID, + "subscription": sub.Name, + }) + continue + } + go runNotifyThread(instance.ID, sub.Name) + } + } + }() +} + +func (iss *InstanceStatusSubClient) refreshWatchers(instanceId, subId string) error { + log.Info("REFRESH WATCHERS", log.Fields{ + "instance": instanceId, + "subscription": subId, + }) + v := NewInstanceClient() + k8sClient := KubernetesClient{} + instance, err := v.Get(instanceId) + if err != nil { + return pkgerrors.Wrap(err, "Cannot get instance for notify thread") + } + profile, err := rb.NewProfileClient().Get(instance.Request.RBName, instance.Request.RBVersion, + instance.Request.ProfileName) + if err != nil { + return pkgerrors.Wrap(err, "Unable to find Profile instance status") + } + err = k8sClient.Init(instance.Request.CloudRegion, instanceId) + if err != nil { + return pkgerrors.Wrap(err, "Cannot set k8s client for instance") + } + + lock, _, watchers := getSubscriptionData(instanceId) + // Acquire Mutex + lock.Lock() + defer lock.Unlock() + watcher, ok := watchers.watchersStatus[subId] + if ok { + close(watcher.watcherStop) + } else { + watchers.watchersStatus[subId] = subscriptionWatch{ + lastUpdateTime: time.Now(), + } + } + + watcher.watcherStop = make(chan struct{}) + + for _, gvk := range gvkListForInstance(instance, profile) { + informer, _ := k8sClient.GetInformer(gvk) + handlers := cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + lock.Lock() + watcher.lastUpdateTime = time.Now() + watchers.watchersStatus[subId] = watcher + lock.Unlock() + }, + UpdateFunc: func(oldObj, obj interface{}) { + lock.Lock() + watcher.lastUpdateTime = time.Now() + watchers.watchersStatus[subId] = watcher + lock.Unlock() + }, + DeleteFunc: func(obj interface{}) { + lock.Lock() + watcher.lastUpdateTime = time.Now() + watchers.watchersStatus[subId] = watcher + lock.Unlock() + }, + } + informer.AddEventHandler(handlers) + go func(informer cache.SharedInformer, stopper chan struct{}, fields log.Fields) { + log.Info("[START] Watcher", fields) + informer.Run(stopper) + log.Info("[STOP] Watcher", fields) + }(informer, watcher.watcherStop, log.Fields{ + "Kind": gvk.Kind, + "Instance": instanceId, + "Subscription": subId, + }) + } + return nil +} + +// Get the Mutex for the Subscription +func getSubscriptionData(instanceId string) (*sync.Mutex, chan notifyChannelData, subscriptionWatchManager) { + var key string = instanceId + subscriptionNotifyData.Lock() + defer subscriptionNotifyData.Unlock() + _, ok := subscriptionNotifyData.notifyLockMap[key] + if !ok { + subscriptionNotifyData.notifyLockMap[key] = &sync.Mutex{} + } + _, ok = subscriptionNotifyData.notifyChannel[key] + if !ok { + subscriptionNotifyData.notifyChannel[key] = make(chan notifyChannelData) + go scheduleNotifications(instanceId, subscriptionNotifyData.notifyChannel[key]) + time.Sleep(time.Second * 5) + } + _, ok = subscriptionNotifyData.watchersStatus[key] + if !ok { + subscriptionNotifyData.watchersStatus[key] = subscriptionWatchManager{ + watchersStatus: make(map[string]subscriptionWatch), + } + } + return subscriptionNotifyData.notifyLockMap[key], subscriptionNotifyData.notifyChannel[key], subscriptionNotifyData.watchersStatus[key] +} + +func removeSubscriptionData(instanceId string) { + var key string = instanceId + subscriptionNotifyData.Lock() + defer subscriptionNotifyData.Unlock() + _, ok := subscriptionNotifyData.notifyLockMap[key] + if ok { + delete(subscriptionNotifyData.notifyLockMap, key) + } + _, ok = subscriptionNotifyData.notifyChannel[key] + if ok { + crl := notifyChannelData{ + instanceId: instanceId, + action: "STOP", + } + subscriptionNotifyData.notifyChannel[key] <- crl + delete(subscriptionNotifyData.notifyChannel, key) + } + _, ok = subscriptionNotifyData.watchersStatus[key] + if !ok { + delete(subscriptionNotifyData.watchersStatus, key) + } +} + +// notify request timeout +func notifyTimeout(network, addr string) (net.Conn, error) { + return net.DialTimeout(network, addr, time.Duration(time.Second*5)) +} + +// Per Subscription Go routine to send notification about status change +func scheduleNotifications(instanceId string, c chan notifyChannelData) { + // Keep thread running + log.Info("[START] status notify thread for ", log.Fields{ + "instance": instanceId, + }) + for { + data := <-c + breakThread := false + switch { + case data.action == "NOTIFY": + var result = notifyResult{} + var err error = nil + var notifyPayload = notifyRequestPayload{ + Delta: data.delta, + InstanceId: data.instanceId, + Subscription: data.subscription.Name, + Metadata: data.subscription.NotifyMetadata, + } + notifyBody, err := json.Marshal(notifyPayload) + if err == nil { + notifyBodyBuffer := bytes.NewBuffer(notifyBody) + transport := http.Transport{ + Dial: notifyTimeout, + } + client := http.Client{ + Transport: &transport, + } + resp, errReq := client.Post(data.subscription.CallbackUrl, "application/json", notifyBodyBuffer) + if errReq == nil { + result.result = int32(resp.StatusCode) + if resp.StatusCode >= 400 { + respBody, _ := ioutil.ReadAll(resp.Body) + log.Error("Status notification request failed", log.Fields{ + "instance": instanceId, + "name": data.subscription.Name, + "url": data.subscription.CallbackUrl, + "code": resp.StatusCode, + "status": resp.Status, + "body": string(respBody), + }) + resp.Body.Close() + } + } else { + err = errReq + } + } + + if err != nil { + log.Error("Error for status notify thread", log.Fields{ + "instance": instanceId, + "name": data.subscription.Name, + "err": err.Error(), + }) + result.result = 500 + } + result.time = time.Now() + + data.notifyResult <- result + + case data.action == "STOP": + breakThread = true + } + if breakThread { + break + } + } + log.Info("[STOP] status notify thread for ", log.Fields{ + "instance": instanceId, + }) +} + +func gvkListForInstance(instance InstanceResponse, profile rb.Profile) []schema.GroupVersionKind { + list := make([]schema.GroupVersionKind, 0) + gvkMap := make(map[string]schema.GroupVersionKind) + gvk := schema.FromAPIVersionAndKind("v1", "Pod") + gvkMap[gvk.String()] = gvk + for _, res := range instance.Resources { + gvk = res.GVK + _, ok := gvkMap[gvk.String()] + if !ok { + gvkMap[gvk.String()] = gvk + } + } + for _, gvk := range profile.ExtraResourceTypes { + _, ok := gvkMap[gvk.String()] + if !ok { + gvkMap[gvk.String()] = gvk + } + } + for _, gvk := range gvkMap { + list = append(list, gvk) + } + return list +} + +func runNotifyThread(instanceId, subName string) { + v := NewInstanceClient() + iss := NewInstanceStatusSubClient() + var status = InstanceStatus{ + ResourceCount: -1, + } + key := StatusSubscriptionKey{ + InstanceId: instanceId, + SubscriptionName: subName, + } + time.Sleep(time.Second * 5) + log.Info("[START] status verification thread", log.Fields{ + "InstanceId": instanceId, + "SubscriptionName": subName, + }) + + lastChange := time.Now() + var timeInSeconds time.Duration = 5 + for { + time.Sleep(time.Second * timeInSeconds) + + lock, subData, watchers := getSubscriptionData(instanceId) + var changeDetected = false + lock.Lock() + watcherStatus, ok := watchers.watchersStatus[subName] + if ok { + changeDetected = watcherStatus.lastUpdateTime.After(lastChange) + } + lock.Unlock() + if !ok { + break + } + if changeDetected || status.ResourceCount < 0 { + currentSub, err := iss.Get(instanceId, subName) + if err != nil { + log.Error("Error getting current status", log.Fields{ + "error": err.Error(), + "instance": instanceId}) + break + } + if currentSub.MinNotifyInterval > 5 { + timeInSeconds = time.Duration(currentSub.MinNotifyInterval) + } else { + timeInSeconds = 5 + } + newStatus, err := v.Status(instanceId, false) + if err != nil { + log.Error("Error getting current status", log.Fields{ + "error": err.Error(), + "instance": instanceId}) + break + } else { + if status.ResourceCount >= 0 { + var delta = statusDelta(status, newStatus) + if delta.Delta() { + log.Info("CHANGE DETECTED", log.Fields{ + "Instance": instanceId, + "Subscription": subName, + }) + lastChange = watcherStatus.lastUpdateTime + for _, res := range delta.Created { + log.Info("CREATED", log.Fields{ + "Kind": res.GVK.Kind, + "Name": res.Name, + }) + } + for _, res := range delta.Modified { + log.Info("MODIFIED", log.Fields{ + "Kind": res.GVK.Kind, + "Name": res.Name, + }) + } + for _, res := range delta.Deleted { + log.Info("DELETED", log.Fields{ + "Kind": res.GVK.Kind, + "Name": res.Name, + }) + } + // Acquire Mutex + lock.Lock() + currentSub.LastUpdateTime = time.Now() + var notifyResultCh = make(chan notifyResult) + var newData = notifyChannelData{ + instanceId: instanceId, + subscription: currentSub, + action: "NOTIFY", + delta: delta, + notifyResult: notifyResultCh, + } + subData <- newData + var notifyResult notifyResult = <-notifyResultCh + log.Info("Notification sent", log.Fields{ + "InstanceId": instanceId, + "SubscriptionName": subName, + "Result": notifyResult.result, + }) + currentSub.LastNotifyStatus = notifyResult.result + currentSub.LastNotifyTime = notifyResult.time + err = db.DBconn.Update(iss.storeName, key, iss.tagInst, currentSub) + if err != nil { + log.Error("Error updating subscription status", log.Fields{ + "error": err.Error(), + "instance": instanceId}) + } + lock.Unlock() + } + } + + status = newStatus + } + } + } + log.Info("[STOP] status verification thread", log.Fields{ + "InstanceId": instanceId, + "SubscriptionName": subName, + }) +} + +func statusDelta(first, second InstanceStatus) resourceStatusDelta { + var delta resourceStatusDelta = resourceStatusDelta{ + Created: make([]ResourceStatus, 0), + Deleted: make([]ResourceStatus, 0), + Modified: make([]ResourceStatus, 0), + } + var firstResList map[string]ResourceStatus = make(map[string]ResourceStatus) + for _, res := range first.ResourcesStatus { + firstResList[res.Key()] = res + } + for _, res := range second.ResourcesStatus { + var key string = res.Key() + if prevRes, ok := firstResList[key]; ok { + if prevRes.Value() != res.Value() { + delta.Modified = append(delta.Modified, res) + } + delete(firstResList, res.Key()) + } else { + delta.Created = append(delta.Created, res) + } + } + for _, res := range firstResList { + delta.Deleted = append(delta.Deleted, res) + } + return delta +} |