aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/comm/PdpHeartbeatListener.java40
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/comm/PdpStatusMessageHandler.java239
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java11
3 files changed, 290 insertions, 0 deletions
diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/PdpHeartbeatListener.java b/main/src/main/java/org/onap/policy/pap/main/comm/PdpHeartbeatListener.java
new file mode 100644
index 00000000..fc61c1ab
--- /dev/null
+++ b/main/src/main/java/org/onap/policy/pap/main/comm/PdpHeartbeatListener.java
@@ -0,0 +1,40 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm;
+
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.listeners.TypedMessageListener;
+import org.onap.policy.models.pdp.concepts.PdpStatus;
+
+/**
+ * Listener for PDP Status messages which either represent registration or heart beat.
+ *
+ * @author Ram Krishna Verma (ram.krishna.verma@est.tech)
+ */
+public class PdpHeartbeatListener implements TypedMessageListener<PdpStatus> {
+
+ @Override
+ public void onTopicEvent(final CommInfrastructure infra, final String topic, final PdpStatus message) {
+
+ final PdpStatusMessageHandler handler = new PdpStatusMessageHandler();
+ handler.handlePdpStatus(message);
+ }
+}
diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/PdpStatusMessageHandler.java b/main/src/main/java/org/onap/policy/pap/main/comm/PdpStatusMessageHandler.java
new file mode 100644
index 00000000..5a44ff91
--- /dev/null
+++ b/main/src/main/java/org/onap/policy/pap/main/comm/PdpStatusMessageHandler.java
@@ -0,0 +1,239 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.onap.policy.common.utils.services.Registry;
+import org.onap.policy.models.base.PfModelException;
+import org.onap.policy.models.pdp.concepts.Pdp;
+import org.onap.policy.models.pdp.concepts.PdpGroup;
+import org.onap.policy.models.pdp.concepts.PdpGroups;
+import org.onap.policy.models.pdp.concepts.PdpStateChange;
+import org.onap.policy.models.pdp.concepts.PdpStatus;
+import org.onap.policy.models.pdp.concepts.PdpSubGroup;
+import org.onap.policy.models.pdp.concepts.PdpUpdate;
+import org.onap.policy.models.pdp.concepts.PolicyTypeIdent;
+import org.onap.policy.models.pdp.enums.PdpState;
+import org.onap.policy.models.provider.PolicyModelsProvider;
+import org.onap.policy.pap.main.PapConstants;
+import org.onap.policy.pap.main.PolicyModelsProviderFactoryWrapper;
+import org.onap.policy.pap.main.PolicyPapException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handler for PDP Status messages which either represent registration or heart beat.
+ *
+ * @author Ram Krishna Verma (ram.krishna.verma@est.tech)
+ */
+public class PdpStatusMessageHandler {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(PdpStatusMessageHandler.class);
+
+ /**
+ * Handles the PdpStatus message coming from various PDP's.
+ *
+ * @param message the PdpStatus message
+ */
+ public void handlePdpStatus(final PdpStatus message) {
+ final PolicyModelsProviderFactoryWrapper modelProviderWrapper =
+ Registry.get(PapConstants.REG_PAP_DAO_FACTORY, PolicyModelsProviderFactoryWrapper.class);
+ try (PolicyModelsProvider databaseProvider = modelProviderWrapper.create()) {
+ // TODO: remove the init() call when PolicyModelProviderFactory changes to call init() before returning the
+ // object
+ databaseProvider.init();
+ if (message.getPdpGroup().isEmpty() && message.getPdpSubgroup().isEmpty()) {
+ handlePdpRegistration(message, databaseProvider);
+ } else {
+ handlePdpHeartbeat(message, databaseProvider);
+ }
+ } catch (final PolicyPapException exp) {
+ LOGGER.error("Operation Failed", exp);
+ } catch (final Exception exp) {
+ LOGGER.error("Failed connecting to database provider", exp);
+ }
+ }
+
+ private void handlePdpRegistration(final PdpStatus message, final PolicyModelsProvider databaseProvider)
+ throws PfModelException, PolicyPapException {
+ if (!findAndUpdatePdpGroup(message, databaseProvider)) {
+ final String errorMessage = "Failed to register PDP. No matching PdpGroup/SubGroup Found - ";
+ LOGGER.debug("{}{}", errorMessage, message);
+ throw new PolicyPapException(errorMessage + message);
+ }
+ }
+
+ private boolean findAndUpdatePdpGroup(final PdpStatus message, final PolicyModelsProvider databaseProvider)
+ throws PfModelException {
+ boolean pdpGroupFound = false;
+ Optional<PdpSubGroup> subGroup = null;
+ final List<Pair<String, String>> supportedPolicyTypesPair = createSupportedPolictTypesPair(message);
+ final PdpGroups pdpGroups =
+ databaseProvider.getFilteredPdpGroups(message.getPdpType(), supportedPolicyTypesPair);
+ for (final PdpGroup pdpGroup : pdpGroups.getGroups()) {
+ if (pdpGroup.getPdpGroupState().equals(PdpState.ACTIVE)) {
+ subGroup = findPdpSubGroup(message, pdpGroup);
+ if (subGroup.isPresent()) {
+ LOGGER.debug("Found pdpGroup - {}, going for registration of PDP - {}", pdpGroup, message);
+ if (!findPdpInstance(message, subGroup.get()).isPresent()) {
+ updatePdpSubGroup(pdpGroup, subGroup.get(), message, databaseProvider);
+ }
+ sendPdpMessage(pdpGroup.getName(), subGroup.get(), message.getInstance(), null);
+ pdpGroupFound = true;
+ break;
+ }
+ }
+ }
+ return pdpGroupFound;
+ }
+
+ private List<Pair<String, String>> createSupportedPolictTypesPair(final PdpStatus message) {
+ final List<Pair<String, String>> supportedPolicyTypesPair = new ArrayList<>();
+ for (final PolicyTypeIdent policyTypeIdent : message.getSupportedPolicyTypes()) {
+ supportedPolicyTypesPair.add(Pair.of(policyTypeIdent.getName(), policyTypeIdent.getVersion()));
+ }
+ return supportedPolicyTypesPair;
+ }
+
+ private void updatePdpSubGroup(final PdpGroup pdpGroup, final PdpSubGroup pdpSubGroup, final PdpStatus message,
+ final PolicyModelsProvider databaseProvider) throws PfModelException {
+
+ final Pdp pdpInstance = new Pdp();
+ pdpInstance.setInstanceId(message.getInstance());
+ pdpInstance.setPdpState(message.getState());
+ pdpInstance.setHealthy(message.getHealthy());
+ pdpInstance.setMessage(message.getDescription());
+ pdpSubGroup.getPdpInstances().add(pdpInstance);
+
+ pdpSubGroup.setCurrentInstanceCount(pdpSubGroup.getCurrentInstanceCount() + 1);
+
+ databaseProvider.updatePdpSubGroup(pdpGroup.getName(), pdpGroup.getVersion(), pdpSubGroup);
+
+ LOGGER.debug("Updated PdpSubGroup in DB - {} belonging to PdpGroup - {}", pdpSubGroup, pdpGroup);
+ }
+
+ private void handlePdpHeartbeat(final PdpStatus message, final PolicyModelsProvider databaseProvider)
+ throws PfModelException, PolicyPapException {
+ boolean pdpInstanceFound = false;
+ Optional<PdpSubGroup> pdpSubgroup = null;
+ Optional<Pdp> pdpInstance = null;
+
+ final PdpGroups pdpGroups = databaseProvider.getLatestPdpGroups(message.getPdpGroup());
+ final PdpGroup pdpGroup = pdpGroups.getGroups().get(0);
+ pdpSubgroup = findPdpSubGroup(message, pdpGroup);
+ if (pdpSubgroup.isPresent()) {
+ pdpInstance = findPdpInstance(message, pdpSubgroup.get());
+ if (pdpInstance.isPresent()) {
+ processPdpDetails(message, pdpSubgroup, pdpInstance, pdpGroup);
+ pdpInstanceFound = true;
+ }
+ }
+ if (!pdpInstanceFound) {
+ final String errorMessage = "Failed to process heartbeat. No matching PdpGroup/SubGroup Found - ";
+ LOGGER.debug("{}{}", errorMessage, message);
+ throw new PolicyPapException(errorMessage + message);
+ }
+ }
+
+ private Optional<PdpSubGroup> findPdpSubGroup(final PdpStatus message, final PdpGroup pdpGroup) {
+ PdpSubGroup pdpSubgroup = null;
+ for (final PdpSubGroup subGroup : pdpGroup.getPdpSubgroups()) {
+ if (message.getPdpSubgroup().equals(subGroup.getPdpType())) {
+ pdpSubgroup = subGroup;
+ break;
+ }
+ }
+ return Optional.ofNullable(pdpSubgroup);
+ }
+
+ private Optional<Pdp> findPdpInstance(final PdpStatus message, final PdpSubGroup subGroup) {
+ Pdp pdpInstance = null;
+ for (final Pdp pdpInstanceDetails : subGroup.getPdpInstances()) {
+ if (message.getInstance().equals(pdpInstanceDetails.getInstanceId())) {
+ pdpInstance = pdpInstanceDetails;
+ break;
+ }
+ }
+ return Optional.ofNullable(pdpInstance);
+ }
+
+ private void processPdpDetails(final PdpStatus message, final Optional<PdpSubGroup> pdpSubgroup,
+ final Optional<Pdp> pdpInstance, final PdpGroup pdpGroup) {
+ if (validatePdpDetails(message, pdpGroup, pdpSubgroup.get(), pdpInstance.get())) {
+ LOGGER.debug("PdpInstance details are correct. Saving current state in DB - {}", pdpInstance.get());
+ // TODO: details are correct save health & statistics details in DB
+ } else {
+ LOGGER.debug("PdpInstance details are not correct. Sending PdpUpdate message - {}", pdpInstance.get());
+ sendPdpMessage(pdpGroup.getName(), pdpSubgroup.get(), pdpInstance.get().getInstanceId(),
+ pdpInstance.get().getPdpState());
+ }
+ }
+
+ private boolean validatePdpDetails(final PdpStatus message, final PdpGroup pdpGroup, final PdpSubGroup subGroup,
+ final Pdp pdpInstanceDetails) {
+
+ return message.getPdpGroup().equals(pdpGroup.getName())
+ && message.getPdpSubgroup().equals(subGroup.getPdpType())
+ && message.getState().equals(pdpInstanceDetails.getPdpState())
+ && message.getSupportedPolicyTypes().containsAll(subGroup.getSupportedPolicyTypes())
+ && message.getPdpType().equals(subGroup.getPdpType());
+ }
+
+ private void sendPdpMessage(final String pdpGroupName, final PdpSubGroup subGroup, final String pdpInstanceId,
+ final PdpState pdpState) {
+ final PdpUpdate pdpUpdatemessage = createPdpUpdateMessage(pdpGroupName, subGroup, pdpInstanceId);
+ final PdpStateChange pdpStateChangeMessage =
+ createPdpStateChangeMessage(pdpGroupName, subGroup, pdpInstanceId, pdpState);
+ final PdpModifyRequestMap requestMap = Registry.get(PapConstants.REG_PDP_MODIFY_MAP, PdpModifyRequestMap.class);
+ requestMap.addRequest(pdpUpdatemessage, pdpStateChangeMessage);
+ LOGGER.debug("Sent PdpUpdate message - {}", pdpUpdatemessage);
+ LOGGER.debug("Sent PdpStateChange message - {}", pdpStateChangeMessage);
+ }
+
+ private PdpUpdate createPdpUpdateMessage(final String pdpGroupName, final PdpSubGroup subGroup,
+ final String pdpInstanceId) {
+
+ final PdpUpdate update = new PdpUpdate();
+ update.setName(pdpInstanceId);
+ update.setPdpGroup(pdpGroupName);
+ update.setPdpSubgroup(subGroup.getPdpType());
+ update.setPolicies(subGroup.getPolicies());
+ LOGGER.debug("Created PdpUpdate message - {}", update);
+ return update;
+ }
+
+ private PdpStateChange createPdpStateChangeMessage(final String pdpGroupName, final PdpSubGroup subGroup,
+ final String pdpInstanceId, final PdpState pdpState) {
+
+ final PdpStateChange stateChange = new PdpStateChange();
+ stateChange.setName(pdpInstanceId);
+ stateChange.setPdpGroup(pdpGroupName);
+ stateChange.setPdpSubgroup(subGroup.getPdpType());
+ stateChange.setState(pdpState == null ? PdpState.ACTIVE : pdpState);
+ LOGGER.debug("Created PdpStateChange message - {}", stateChange);
+ return stateChange;
+ }
+}
diff --git a/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java b/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java
index 42634ee7..a60232fd 100644
--- a/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java
+++ b/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java
@@ -37,6 +37,7 @@ import org.onap.policy.models.pdp.enums.PdpMessageType;
import org.onap.policy.pap.main.PapConstants;
import org.onap.policy.pap.main.PolicyModelsProviderFactoryWrapper;
import org.onap.policy.pap.main.PolicyPapRuntimeException;
+import org.onap.policy.pap.main.comm.PdpHeartbeatListener;
import org.onap.policy.pap.main.comm.PdpModifyRequestMap;
import org.onap.policy.pap.main.comm.Publisher;
import org.onap.policy.pap.main.comm.TimerManager;
@@ -76,6 +77,11 @@ public class PapActivator extends ServiceManagerContainer {
private final RequestIdDispatcher<PdpStatus> reqIdDispatcher;
/**
+ * Listener for anonymous {@link PdpStatus} messages either for registration or heartbeat.
+ */
+ private final PdpHeartbeatListener pdpHeartbeatListener;
+
+ /**
* Instantiate the activator for policy pap as a complete service.
*
* @param papParameterGroup the parameters for the pap service
@@ -91,6 +97,7 @@ public class PapActivator extends ServiceManagerContainer {
this.papParameterGroup = papParameterGroup;
this.msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
this.reqIdDispatcher = new RequestIdDispatcher<>(PdpStatus.class, REQ_ID_NAMES);
+ this.pdpHeartbeatListener = new PdpHeartbeatListener();
} catch (final RuntimeException e) {
throw new PolicyPapRuntimeException(e);
@@ -119,6 +126,10 @@ public class PapActivator extends ServiceManagerContainer {
() -> Registry.register(PapConstants.REG_PAP_DAO_FACTORY, daoFactory.get()),
() -> Registry.unregister(PapConstants.REG_PAP_DAO_FACTORY));
+ addAction("Pdp Heartbeat Listener",
+ () -> reqIdDispatcher.register(pdpHeartbeatListener),
+ () -> reqIdDispatcher.unregister(pdpHeartbeatListener));
+
addAction("Request ID Dispatcher",
() -> msgDispatcher.register(PdpMessageType.PDP_STATUS.name(), this.reqIdDispatcher),
() -> msgDispatcher.unregister(PdpMessageType.PDP_STATUS.name()));