From a47c4b53211a4ad6a448b3f6b28b683189f88c80 Mon Sep 17 00:00:00 2001 From: ramverma Date: Tue, 2 Apr 2019 17:50:15 +0000 Subject: Adding PdpStatus listener & handler to policy/pap 1) Adding PdpHeartbeatListener to keep listening for PdpStatus messages coming from Pdp's over DMaaP either for registration or just heartbeat. 2) Adding PdpStatusMessageHandler to handle new Pdp registration and also heartbeat coming from Pdp's. Registration Flow - 1. Find the PdpSubgroup based on PdpType & SupportedPolicyTypes coming in PdpStatus message. 2. If not found, don't register pdp and log the error message. 3. If found, check if PdpInstance is already added. 4. If not added, add PdpInstance to the subgroup and increment the currentInstanceCount. 5. Create and send PdpUpdate & PdpStateChange message to the Pdp. 6. Update the changes in DB. Heatbeat Flow - 1. Find the PdpInstance based on details in PdpStatus message. 2. Validate the details from message to what in DB. 3. If correct, persist the health & statistics information in DB. 4. If not correct, send a PdpUpdate & PdpStateChange message to the Pdp. 3) Unit tests will come as seperate review. Change-Id: If705193259999e2ab077b78961282c998b949f57 Issue-ID: POLICY-1443 Signed-off-by: ramverma --- .../policy/pap/main/comm/PdpHeartbeatListener.java | 40 ++++ .../pap/main/comm/PdpStatusMessageHandler.java | 239 +++++++++++++++++++++ .../policy/pap/main/startstop/PapActivator.java | 11 + 3 files changed, 290 insertions(+) create mode 100644 main/src/main/java/org/onap/policy/pap/main/comm/PdpHeartbeatListener.java create mode 100644 main/src/main/java/org/onap/policy/pap/main/comm/PdpStatusMessageHandler.java (limited to 'main') diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/PdpHeartbeatListener.java b/main/src/main/java/org/onap/policy/pap/main/comm/PdpHeartbeatListener.java new file mode 100644 index 00000000..fc61c1ab --- /dev/null +++ b/main/src/main/java/org/onap/policy/pap/main/comm/PdpHeartbeatListener.java @@ -0,0 +1,40 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.pap.main.comm; + +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.listeners.TypedMessageListener; +import org.onap.policy.models.pdp.concepts.PdpStatus; + +/** + * Listener for PDP Status messages which either represent registration or heart beat. + * + * @author Ram Krishna Verma (ram.krishna.verma@est.tech) + */ +public class PdpHeartbeatListener implements TypedMessageListener { + + @Override + public void onTopicEvent(final CommInfrastructure infra, final String topic, final PdpStatus message) { + + final PdpStatusMessageHandler handler = new PdpStatusMessageHandler(); + handler.handlePdpStatus(message); + } +} diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/PdpStatusMessageHandler.java b/main/src/main/java/org/onap/policy/pap/main/comm/PdpStatusMessageHandler.java new file mode 100644 index 00000000..5a44ff91 --- /dev/null +++ b/main/src/main/java/org/onap/policy/pap/main/comm/PdpStatusMessageHandler.java @@ -0,0 +1,239 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.pap.main.comm; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import org.apache.commons.lang3.tuple.Pair; +import org.onap.policy.common.utils.services.Registry; +import org.onap.policy.models.base.PfModelException; +import org.onap.policy.models.pdp.concepts.Pdp; +import org.onap.policy.models.pdp.concepts.PdpGroup; +import org.onap.policy.models.pdp.concepts.PdpGroups; +import org.onap.policy.models.pdp.concepts.PdpStateChange; +import org.onap.policy.models.pdp.concepts.PdpStatus; +import org.onap.policy.models.pdp.concepts.PdpSubGroup; +import org.onap.policy.models.pdp.concepts.PdpUpdate; +import org.onap.policy.models.pdp.concepts.PolicyTypeIdent; +import org.onap.policy.models.pdp.enums.PdpState; +import org.onap.policy.models.provider.PolicyModelsProvider; +import org.onap.policy.pap.main.PapConstants; +import org.onap.policy.pap.main.PolicyModelsProviderFactoryWrapper; +import org.onap.policy.pap.main.PolicyPapException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Handler for PDP Status messages which either represent registration or heart beat. + * + * @author Ram Krishna Verma (ram.krishna.verma@est.tech) + */ +public class PdpStatusMessageHandler { + + private static final Logger LOGGER = LoggerFactory.getLogger(PdpStatusMessageHandler.class); + + /** + * Handles the PdpStatus message coming from various PDP's. + * + * @param message the PdpStatus message + */ + public void handlePdpStatus(final PdpStatus message) { + final PolicyModelsProviderFactoryWrapper modelProviderWrapper = + Registry.get(PapConstants.REG_PAP_DAO_FACTORY, PolicyModelsProviderFactoryWrapper.class); + try (PolicyModelsProvider databaseProvider = modelProviderWrapper.create()) { + // TODO: remove the init() call when PolicyModelProviderFactory changes to call init() before returning the + // object + databaseProvider.init(); + if (message.getPdpGroup().isEmpty() && message.getPdpSubgroup().isEmpty()) { + handlePdpRegistration(message, databaseProvider); + } else { + handlePdpHeartbeat(message, databaseProvider); + } + } catch (final PolicyPapException exp) { + LOGGER.error("Operation Failed", exp); + } catch (final Exception exp) { + LOGGER.error("Failed connecting to database provider", exp); + } + } + + private void handlePdpRegistration(final PdpStatus message, final PolicyModelsProvider databaseProvider) + throws PfModelException, PolicyPapException { + if (!findAndUpdatePdpGroup(message, databaseProvider)) { + final String errorMessage = "Failed to register PDP. No matching PdpGroup/SubGroup Found - "; + LOGGER.debug("{}{}", errorMessage, message); + throw new PolicyPapException(errorMessage + message); + } + } + + private boolean findAndUpdatePdpGroup(final PdpStatus message, final PolicyModelsProvider databaseProvider) + throws PfModelException { + boolean pdpGroupFound = false; + Optional subGroup = null; + final List> supportedPolicyTypesPair = createSupportedPolictTypesPair(message); + final PdpGroups pdpGroups = + databaseProvider.getFilteredPdpGroups(message.getPdpType(), supportedPolicyTypesPair); + for (final PdpGroup pdpGroup : pdpGroups.getGroups()) { + if (pdpGroup.getPdpGroupState().equals(PdpState.ACTIVE)) { + subGroup = findPdpSubGroup(message, pdpGroup); + if (subGroup.isPresent()) { + LOGGER.debug("Found pdpGroup - {}, going for registration of PDP - {}", pdpGroup, message); + if (!findPdpInstance(message, subGroup.get()).isPresent()) { + updatePdpSubGroup(pdpGroup, subGroup.get(), message, databaseProvider); + } + sendPdpMessage(pdpGroup.getName(), subGroup.get(), message.getInstance(), null); + pdpGroupFound = true; + break; + } + } + } + return pdpGroupFound; + } + + private List> createSupportedPolictTypesPair(final PdpStatus message) { + final List> supportedPolicyTypesPair = new ArrayList<>(); + for (final PolicyTypeIdent policyTypeIdent : message.getSupportedPolicyTypes()) { + supportedPolicyTypesPair.add(Pair.of(policyTypeIdent.getName(), policyTypeIdent.getVersion())); + } + return supportedPolicyTypesPair; + } + + private void updatePdpSubGroup(final PdpGroup pdpGroup, final PdpSubGroup pdpSubGroup, final PdpStatus message, + final PolicyModelsProvider databaseProvider) throws PfModelException { + + final Pdp pdpInstance = new Pdp(); + pdpInstance.setInstanceId(message.getInstance()); + pdpInstance.setPdpState(message.getState()); + pdpInstance.setHealthy(message.getHealthy()); + pdpInstance.setMessage(message.getDescription()); + pdpSubGroup.getPdpInstances().add(pdpInstance); + + pdpSubGroup.setCurrentInstanceCount(pdpSubGroup.getCurrentInstanceCount() + 1); + + databaseProvider.updatePdpSubGroup(pdpGroup.getName(), pdpGroup.getVersion(), pdpSubGroup); + + LOGGER.debug("Updated PdpSubGroup in DB - {} belonging to PdpGroup - {}", pdpSubGroup, pdpGroup); + } + + private void handlePdpHeartbeat(final PdpStatus message, final PolicyModelsProvider databaseProvider) + throws PfModelException, PolicyPapException { + boolean pdpInstanceFound = false; + Optional pdpSubgroup = null; + Optional pdpInstance = null; + + final PdpGroups pdpGroups = databaseProvider.getLatestPdpGroups(message.getPdpGroup()); + final PdpGroup pdpGroup = pdpGroups.getGroups().get(0); + pdpSubgroup = findPdpSubGroup(message, pdpGroup); + if (pdpSubgroup.isPresent()) { + pdpInstance = findPdpInstance(message, pdpSubgroup.get()); + if (pdpInstance.isPresent()) { + processPdpDetails(message, pdpSubgroup, pdpInstance, pdpGroup); + pdpInstanceFound = true; + } + } + if (!pdpInstanceFound) { + final String errorMessage = "Failed to process heartbeat. No matching PdpGroup/SubGroup Found - "; + LOGGER.debug("{}{}", errorMessage, message); + throw new PolicyPapException(errorMessage + message); + } + } + + private Optional findPdpSubGroup(final PdpStatus message, final PdpGroup pdpGroup) { + PdpSubGroup pdpSubgroup = null; + for (final PdpSubGroup subGroup : pdpGroup.getPdpSubgroups()) { + if (message.getPdpSubgroup().equals(subGroup.getPdpType())) { + pdpSubgroup = subGroup; + break; + } + } + return Optional.ofNullable(pdpSubgroup); + } + + private Optional findPdpInstance(final PdpStatus message, final PdpSubGroup subGroup) { + Pdp pdpInstance = null; + for (final Pdp pdpInstanceDetails : subGroup.getPdpInstances()) { + if (message.getInstance().equals(pdpInstanceDetails.getInstanceId())) { + pdpInstance = pdpInstanceDetails; + break; + } + } + return Optional.ofNullable(pdpInstance); + } + + private void processPdpDetails(final PdpStatus message, final Optional pdpSubgroup, + final Optional pdpInstance, final PdpGroup pdpGroup) { + if (validatePdpDetails(message, pdpGroup, pdpSubgroup.get(), pdpInstance.get())) { + LOGGER.debug("PdpInstance details are correct. Saving current state in DB - {}", pdpInstance.get()); + // TODO: details are correct save health & statistics details in DB + } else { + LOGGER.debug("PdpInstance details are not correct. Sending PdpUpdate message - {}", pdpInstance.get()); + sendPdpMessage(pdpGroup.getName(), pdpSubgroup.get(), pdpInstance.get().getInstanceId(), + pdpInstance.get().getPdpState()); + } + } + + private boolean validatePdpDetails(final PdpStatus message, final PdpGroup pdpGroup, final PdpSubGroup subGroup, + final Pdp pdpInstanceDetails) { + + return message.getPdpGroup().equals(pdpGroup.getName()) + && message.getPdpSubgroup().equals(subGroup.getPdpType()) + && message.getState().equals(pdpInstanceDetails.getPdpState()) + && message.getSupportedPolicyTypes().containsAll(subGroup.getSupportedPolicyTypes()) + && message.getPdpType().equals(subGroup.getPdpType()); + } + + private void sendPdpMessage(final String pdpGroupName, final PdpSubGroup subGroup, final String pdpInstanceId, + final PdpState pdpState) { + final PdpUpdate pdpUpdatemessage = createPdpUpdateMessage(pdpGroupName, subGroup, pdpInstanceId); + final PdpStateChange pdpStateChangeMessage = + createPdpStateChangeMessage(pdpGroupName, subGroup, pdpInstanceId, pdpState); + final PdpModifyRequestMap requestMap = Registry.get(PapConstants.REG_PDP_MODIFY_MAP, PdpModifyRequestMap.class); + requestMap.addRequest(pdpUpdatemessage, pdpStateChangeMessage); + LOGGER.debug("Sent PdpUpdate message - {}", pdpUpdatemessage); + LOGGER.debug("Sent PdpStateChange message - {}", pdpStateChangeMessage); + } + + private PdpUpdate createPdpUpdateMessage(final String pdpGroupName, final PdpSubGroup subGroup, + final String pdpInstanceId) { + + final PdpUpdate update = new PdpUpdate(); + update.setName(pdpInstanceId); + update.setPdpGroup(pdpGroupName); + update.setPdpSubgroup(subGroup.getPdpType()); + update.setPolicies(subGroup.getPolicies()); + LOGGER.debug("Created PdpUpdate message - {}", update); + return update; + } + + private PdpStateChange createPdpStateChangeMessage(final String pdpGroupName, final PdpSubGroup subGroup, + final String pdpInstanceId, final PdpState pdpState) { + + final PdpStateChange stateChange = new PdpStateChange(); + stateChange.setName(pdpInstanceId); + stateChange.setPdpGroup(pdpGroupName); + stateChange.setPdpSubgroup(subGroup.getPdpType()); + stateChange.setState(pdpState == null ? PdpState.ACTIVE : pdpState); + LOGGER.debug("Created PdpStateChange message - {}", stateChange); + return stateChange; + } +} diff --git a/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java b/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java index 42634ee7..a60232fd 100644 --- a/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java +++ b/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java @@ -37,6 +37,7 @@ import org.onap.policy.models.pdp.enums.PdpMessageType; import org.onap.policy.pap.main.PapConstants; import org.onap.policy.pap.main.PolicyModelsProviderFactoryWrapper; import org.onap.policy.pap.main.PolicyPapRuntimeException; +import org.onap.policy.pap.main.comm.PdpHeartbeatListener; import org.onap.policy.pap.main.comm.PdpModifyRequestMap; import org.onap.policy.pap.main.comm.Publisher; import org.onap.policy.pap.main.comm.TimerManager; @@ -75,6 +76,11 @@ public class PapActivator extends ServiceManagerContainer { */ private final RequestIdDispatcher reqIdDispatcher; + /** + * Listener for anonymous {@link PdpStatus} messages either for registration or heartbeat. + */ + private final PdpHeartbeatListener pdpHeartbeatListener; + /** * Instantiate the activator for policy pap as a complete service. * @@ -91,6 +97,7 @@ public class PapActivator extends ServiceManagerContainer { this.papParameterGroup = papParameterGroup; this.msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); this.reqIdDispatcher = new RequestIdDispatcher<>(PdpStatus.class, REQ_ID_NAMES); + this.pdpHeartbeatListener = new PdpHeartbeatListener(); } catch (final RuntimeException e) { throw new PolicyPapRuntimeException(e); @@ -119,6 +126,10 @@ public class PapActivator extends ServiceManagerContainer { () -> Registry.register(PapConstants.REG_PAP_DAO_FACTORY, daoFactory.get()), () -> Registry.unregister(PapConstants.REG_PAP_DAO_FACTORY)); + addAction("Pdp Heartbeat Listener", + () -> reqIdDispatcher.register(pdpHeartbeatListener), + () -> reqIdDispatcher.unregister(pdpHeartbeatListener)); + addAction("Request ID Dispatcher", () -> msgDispatcher.register(PdpMessageType.PDP_STATUS.name(), this.reqIdDispatcher), () -> msgDispatcher.unregister(PdpMessageType.PDP_STATUS.name())); -- cgit 1.2.3-korg