diff options
Diffstat (limited to 'pkg/kafkacomm/handler/pdp_message_handler.go')
-rw-r--r-- | pkg/kafkacomm/handler/pdp_message_handler.go | 132 |
1 files changed, 132 insertions, 0 deletions
diff --git a/pkg/kafkacomm/handler/pdp_message_handler.go b/pkg/kafkacomm/handler/pdp_message_handler.go new file mode 100644 index 0000000..8d7da92 --- /dev/null +++ b/pkg/kafkacomm/handler/pdp_message_handler.go @@ -0,0 +1,132 @@ +// - +// ========================LICENSE_START================================= +// Copyright (C) 2024: Deutsche Telecom +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ========================LICENSE_END=================================== + +// The handler package is responsible for processing messages from Kafka, specifically targeting the OPA +// (Open Policy Agent) PDP (Policy Decision Point). It validates the message type, +// +// ensures it is relevant to the current PDP, and dispatches the message for appropriate processing. +package handler + +import ( + "encoding/json" + "policy-opa-pdp/consts" + "policy-opa-pdp/pkg/kafkacomm" + "policy-opa-pdp/pkg/kafkacomm/publisher" + "policy-opa-pdp/pkg/log" + "policy-opa-pdp/pkg/pdpattributes" +) + +type OpaPdpMessage struct { + Name string `json:"name"` // Name of the PDP (optional for broadcast messages). + MessageType string `json:"MessageName"` // Type of the message (e.g., PDP_UPDATE, PDP_STATE_CHANGE, etc.) + PdpGroup string `json:"pdpGroup"` // Group to which the PDP belongs. + PdpSubgroup string `json:"pdpSubgroup"` // Subgroup within the PDP group. +} + +// Checks if the incoming Kafka message belongs to the current PDP instance. +func checkIfMessageIsForOpaPdp(message OpaPdpMessage) bool { + + if message.Name != "" { + // message included a PDP name, check if matches + //log.Infof(" Message Name is not empty") + return message.Name == pdpattributes.PdpName + } + + // message does not provide a PDP name - must be a broadcast + if message.PdpGroup == "" { + //log.Infof(" Message PDP Group is empty") + return false + } + + if pdpattributes.PdpSubgroup == "" { + // this PDP has no assignment yet, thus should ignore broadcast messages + //log.Infof(" pdpstate PDP subgroup is empty") + return false + } + + if message.PdpGroup != consts.PdpGroup { + //log.Infof(" message pdp group is not equal to cons pdp group") + return false + } + + if message.PdpSubgroup == "" { + //message was broadcast to entire group + //log.Infof(" message pdp subgroup is empty") + return true + } + + return message.PdpSubgroup == pdpattributes.PdpSubgroup +} + +// Handles incoming Kafka messages, validates their relevance to the current PDP, +// and dispatches them for further processing based on their type. +func PdpMessageHandler(kc *kafkacomm.KafkaConsumer, topic string, p publisher.PdpStatusSender) error { + + log.Debug("Starting PDP Message Listener.....") + var stopConsuming bool + for !stopConsuming { + message, err := kafkacomm.ReadKafkaMessages(kc) + if err != nil { + log.Warnf("Failed to Read Kafka Messages: %v\n", err) + continue + } + log.Debugf("[IN|KAFKA|%s]\n%s", topic, string(message)) + + if message != nil { + + var opaPdpMessage OpaPdpMessage + + err = json.Unmarshal(message, &opaPdpMessage) + if err != nil { + log.Warnf("Failed to UnMarshal Messages: %v\n", err) + continue + } + + if !checkIfMessageIsForOpaPdp(opaPdpMessage) { + + log.Warnf("Not a valid Opa Pdp Message") + continue + } + + switch opaPdpMessage.MessageType { + + case "PDP_UPDATE": + err = PdpUpdateMessageHandler(message, p) + if err != nil { + log.Warnf("Error processing Update Message: %v", err) + } + + case "PDP_STATE_CHANGE": + err = PdpStateChangeMessageHandler(message, p) + if err != nil { + log.Warnf("Error processing Update Message: %v", err) + } + + case "PDP_STATUS": + log.Debugf("discarding event of type PDP_STATUS") + continue + default: + log.Errorf("This is not a valid Message Type: %s", opaPdpMessage.MessageType) + continue + + } + + } + } + return nil + +} |