diff options
3 files changed, 290 insertions, 0 deletions
diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/PdpHeartbeatListener.java b/main/src/main/java/org/onap/policy/pap/main/comm/PdpHeartbeatListener.java new file mode 100644 index 00000000..fc61c1ab --- /dev/null +++ b/main/src/main/java/org/onap/policy/pap/main/comm/PdpHeartbeatListener.java @@ -0,0 +1,40 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.pap.main.comm; + +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.listeners.TypedMessageListener; +import org.onap.policy.models.pdp.concepts.PdpStatus; + +/** + * Listener for PDP Status messages which either represent registration or heart beat. + * + * @author Ram Krishna Verma (ram.krishna.verma@est.tech) + */ +public class PdpHeartbeatListener implements TypedMessageListener<PdpStatus> { + + @Override + public void onTopicEvent(final CommInfrastructure infra, final String topic, final PdpStatus message) { + + final PdpStatusMessageHandler handler = new PdpStatusMessageHandler(); + handler.handlePdpStatus(message); + } +} diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/PdpStatusMessageHandler.java b/main/src/main/java/org/onap/policy/pap/main/comm/PdpStatusMessageHandler.java new file mode 100644 index 00000000..5a44ff91 --- /dev/null +++ b/main/src/main/java/org/onap/policy/pap/main/comm/PdpStatusMessageHandler.java @@ -0,0 +1,239 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.pap.main.comm; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import org.apache.commons.lang3.tuple.Pair; +import org.onap.policy.common.utils.services.Registry; +import org.onap.policy.models.base.PfModelException; +import org.onap.policy.models.pdp.concepts.Pdp; +import org.onap.policy.models.pdp.concepts.PdpGroup; +import org.onap.policy.models.pdp.concepts.PdpGroups; +import org.onap.policy.models.pdp.concepts.PdpStateChange; +import org.onap.policy.models.pdp.concepts.PdpStatus; +import org.onap.policy.models.pdp.concepts.PdpSubGroup; +import org.onap.policy.models.pdp.concepts.PdpUpdate; +import org.onap.policy.models.pdp.concepts.PolicyTypeIdent; +import org.onap.policy.models.pdp.enums.PdpState; +import org.onap.policy.models.provider.PolicyModelsProvider; +import org.onap.policy.pap.main.PapConstants; +import org.onap.policy.pap.main.PolicyModelsProviderFactoryWrapper; +import org.onap.policy.pap.main.PolicyPapException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Handler for PDP Status messages which either represent registration or heart beat. + * + * @author Ram Krishna Verma (ram.krishna.verma@est.tech) + */ +public class PdpStatusMessageHandler { + + private static final Logger LOGGER = LoggerFactory.getLogger(PdpStatusMessageHandler.class); + + /** + * Handles the PdpStatus message coming from various PDP's. + * + * @param message the PdpStatus message + */ + public void handlePdpStatus(final PdpStatus message) { + final PolicyModelsProviderFactoryWrapper modelProviderWrapper = + Registry.get(PapConstants.REG_PAP_DAO_FACTORY, PolicyModelsProviderFactoryWrapper.class); + try (PolicyModelsProvider databaseProvider = modelProviderWrapper.create()) { + // TODO: remove the init() call when PolicyModelProviderFactory changes to call init() before returning the + // object + databaseProvider.init(); + if (message.getPdpGroup().isEmpty() && message.getPdpSubgroup().isEmpty()) { + handlePdpRegistration(message, databaseProvider); + } else { + handlePdpHeartbeat(message, databaseProvider); + } + } catch (final PolicyPapException exp) { + LOGGER.error("Operation Failed", exp); + } catch (final Exception exp) { + LOGGER.error("Failed connecting to database provider", exp); + } + } + + private void handlePdpRegistration(final PdpStatus message, final PolicyModelsProvider databaseProvider) + throws PfModelException, PolicyPapException { + if (!findAndUpdatePdpGroup(message, databaseProvider)) { + final String errorMessage = "Failed to register PDP. No matching PdpGroup/SubGroup Found - "; + LOGGER.debug("{}{}", errorMessage, message); + throw new PolicyPapException(errorMessage + message); + } + } + + private boolean findAndUpdatePdpGroup(final PdpStatus message, final PolicyModelsProvider databaseProvider) + throws PfModelException { + boolean pdpGroupFound = false; + Optional<PdpSubGroup> subGroup = null; + final List<Pair<String, String>> supportedPolicyTypesPair = createSupportedPolictTypesPair(message); + final PdpGroups pdpGroups = + databaseProvider.getFilteredPdpGroups(message.getPdpType(), supportedPolicyTypesPair); + for (final PdpGroup pdpGroup : pdpGroups.getGroups()) { + if (pdpGroup.getPdpGroupState().equals(PdpState.ACTIVE)) { + subGroup = findPdpSubGroup(message, pdpGroup); + if (subGroup.isPresent()) { + LOGGER.debug("Found pdpGroup - {}, going for registration of PDP - {}", pdpGroup, message); + if (!findPdpInstance(message, subGroup.get()).isPresent()) { + updatePdpSubGroup(pdpGroup, subGroup.get(), message, databaseProvider); + } + sendPdpMessage(pdpGroup.getName(), subGroup.get(), message.getInstance(), null); + pdpGroupFound = true; + break; + } + } + } + return pdpGroupFound; + } + + private List<Pair<String, String>> createSupportedPolictTypesPair(final PdpStatus message) { + final List<Pair<String, String>> supportedPolicyTypesPair = new ArrayList<>(); + for (final PolicyTypeIdent policyTypeIdent : message.getSupportedPolicyTypes()) { + supportedPolicyTypesPair.add(Pair.of(policyTypeIdent.getName(), policyTypeIdent.getVersion())); + } + return supportedPolicyTypesPair; + } + + private void updatePdpSubGroup(final PdpGroup pdpGroup, final PdpSubGroup pdpSubGroup, final PdpStatus message, + final PolicyModelsProvider databaseProvider) throws PfModelException { + + final Pdp pdpInstance = new Pdp(); + pdpInstance.setInstanceId(message.getInstance()); + pdpInstance.setPdpState(message.getState()); + pdpInstance.setHealthy(message.getHealthy()); + pdpInstance.setMessage(message.getDescription()); + pdpSubGroup.getPdpInstances().add(pdpInstance); + + pdpSubGroup.setCurrentInstanceCount(pdpSubGroup.getCurrentInstanceCount() + 1); + + databaseProvider.updatePdpSubGroup(pdpGroup.getName(), pdpGroup.getVersion(), pdpSubGroup); + + LOGGER.debug("Updated PdpSubGroup in DB - {} belonging to PdpGroup - {}", pdpSubGroup, pdpGroup); + } + + private void handlePdpHeartbeat(final PdpStatus message, final PolicyModelsProvider databaseProvider) + throws PfModelException, PolicyPapException { + boolean pdpInstanceFound = false; + Optional<PdpSubGroup> pdpSubgroup = null; + Optional<Pdp> pdpInstance = null; + + final PdpGroups pdpGroups = databaseProvider.getLatestPdpGroups(message.getPdpGroup()); + final PdpGroup pdpGroup = pdpGroups.getGroups().get(0); + pdpSubgroup = findPdpSubGroup(message, pdpGroup); + if (pdpSubgroup.isPresent()) { + pdpInstance = findPdpInstance(message, pdpSubgroup.get()); + if (pdpInstance.isPresent()) { + processPdpDetails(message, pdpSubgroup, pdpInstance, pdpGroup); + pdpInstanceFound = true; + } + } + if (!pdpInstanceFound) { + final String errorMessage = "Failed to process heartbeat. No matching PdpGroup/SubGroup Found - "; + LOGGER.debug("{}{}", errorMessage, message); + throw new PolicyPapException(errorMessage + message); + } + } + + private Optional<PdpSubGroup> findPdpSubGroup(final PdpStatus message, final PdpGroup pdpGroup) { + PdpSubGroup pdpSubgroup = null; + for (final PdpSubGroup subGroup : pdpGroup.getPdpSubgroups()) { + if (message.getPdpSubgroup().equals(subGroup.getPdpType())) { + pdpSubgroup = subGroup; + break; + } + } + return Optional.ofNullable(pdpSubgroup); + } + + private Optional<Pdp> findPdpInstance(final PdpStatus message, final PdpSubGroup subGroup) { + Pdp pdpInstance = null; + for (final Pdp pdpInstanceDetails : subGroup.getPdpInstances()) { + if (message.getInstance().equals(pdpInstanceDetails.getInstanceId())) { + pdpInstance = pdpInstanceDetails; + break; + } + } + return Optional.ofNullable(pdpInstance); + } + + private void processPdpDetails(final PdpStatus message, final Optional<PdpSubGroup> pdpSubgroup, + final Optional<Pdp> pdpInstance, final PdpGroup pdpGroup) { + if (validatePdpDetails(message, pdpGroup, pdpSubgroup.get(), pdpInstance.get())) { + LOGGER.debug("PdpInstance details are correct. Saving current state in DB - {}", pdpInstance.get()); + // TODO: details are correct save health & statistics details in DB + } else { + LOGGER.debug("PdpInstance details are not correct. Sending PdpUpdate message - {}", pdpInstance.get()); + sendPdpMessage(pdpGroup.getName(), pdpSubgroup.get(), pdpInstance.get().getInstanceId(), + pdpInstance.get().getPdpState()); + } + } + + private boolean validatePdpDetails(final PdpStatus message, final PdpGroup pdpGroup, final PdpSubGroup subGroup, + final Pdp pdpInstanceDetails) { + + return message.getPdpGroup().equals(pdpGroup.getName()) + && message.getPdpSubgroup().equals(subGroup.getPdpType()) + && message.getState().equals(pdpInstanceDetails.getPdpState()) + && message.getSupportedPolicyTypes().containsAll(subGroup.getSupportedPolicyTypes()) + && message.getPdpType().equals(subGroup.getPdpType()); + } + + private void sendPdpMessage(final String pdpGroupName, final PdpSubGroup subGroup, final String pdpInstanceId, + final PdpState pdpState) { + final PdpUpdate pdpUpdatemessage = createPdpUpdateMessage(pdpGroupName, subGroup, pdpInstanceId); + final PdpStateChange pdpStateChangeMessage = + createPdpStateChangeMessage(pdpGroupName, subGroup, pdpInstanceId, pdpState); + final PdpModifyRequestMap requestMap = Registry.get(PapConstants.REG_PDP_MODIFY_MAP, PdpModifyRequestMap.class); + requestMap.addRequest(pdpUpdatemessage, pdpStateChangeMessage); + LOGGER.debug("Sent PdpUpdate message - {}", pdpUpdatemessage); + LOGGER.debug("Sent PdpStateChange message - {}", pdpStateChangeMessage); + } + + private PdpUpdate createPdpUpdateMessage(final String pdpGroupName, final PdpSubGroup subGroup, + final String pdpInstanceId) { + + final PdpUpdate update = new PdpUpdate(); + update.setName(pdpInstanceId); + update.setPdpGroup(pdpGroupName); + update.setPdpSubgroup(subGroup.getPdpType()); + update.setPolicies(subGroup.getPolicies()); + LOGGER.debug("Created PdpUpdate message - {}", update); + return update; + } + + private PdpStateChange createPdpStateChangeMessage(final String pdpGroupName, final PdpSubGroup subGroup, + final String pdpInstanceId, final PdpState pdpState) { + + final PdpStateChange stateChange = new PdpStateChange(); + stateChange.setName(pdpInstanceId); + stateChange.setPdpGroup(pdpGroupName); + stateChange.setPdpSubgroup(subGroup.getPdpType()); + stateChange.setState(pdpState == null ? PdpState.ACTIVE : pdpState); + LOGGER.debug("Created PdpStateChange message - {}", stateChange); + return stateChange; + } +} diff --git a/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java b/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java index 42634ee7..a60232fd 100644 --- a/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java +++ b/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java @@ -37,6 +37,7 @@ import org.onap.policy.models.pdp.enums.PdpMessageType; import org.onap.policy.pap.main.PapConstants; import org.onap.policy.pap.main.PolicyModelsProviderFactoryWrapper; import org.onap.policy.pap.main.PolicyPapRuntimeException; +import org.onap.policy.pap.main.comm.PdpHeartbeatListener; import org.onap.policy.pap.main.comm.PdpModifyRequestMap; import org.onap.policy.pap.main.comm.Publisher; import org.onap.policy.pap.main.comm.TimerManager; @@ -76,6 +77,11 @@ public class PapActivator extends ServiceManagerContainer { private final RequestIdDispatcher<PdpStatus> reqIdDispatcher; /** + * Listener for anonymous {@link PdpStatus} messages either for registration or heartbeat. + */ + private final PdpHeartbeatListener pdpHeartbeatListener; + + /** * Instantiate the activator for policy pap as a complete service. * * @param papParameterGroup the parameters for the pap service @@ -91,6 +97,7 @@ public class PapActivator extends ServiceManagerContainer { this.papParameterGroup = papParameterGroup; this.msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); this.reqIdDispatcher = new RequestIdDispatcher<>(PdpStatus.class, REQ_ID_NAMES); + this.pdpHeartbeatListener = new PdpHeartbeatListener(); } catch (final RuntimeException e) { throw new PolicyPapRuntimeException(e); @@ -119,6 +126,10 @@ public class PapActivator extends ServiceManagerContainer { () -> Registry.register(PapConstants.REG_PAP_DAO_FACTORY, daoFactory.get()), () -> Registry.unregister(PapConstants.REG_PAP_DAO_FACTORY)); + addAction("Pdp Heartbeat Listener", + () -> reqIdDispatcher.register(pdpHeartbeatListener), + () -> reqIdDispatcher.unregister(pdpHeartbeatListener)); + addAction("Request ID Dispatcher", () -> msgDispatcher.register(PdpMessageType.PDP_STATUS.name(), this.reqIdDispatcher), () -> msgDispatcher.unregister(PdpMessageType.PDP_STATUS.name())); |