aboutsummaryrefslogtreecommitdiffstats
path: root/main/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'main/src/main/java')
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/PapConstants.java1
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/comm/PdpStatusMessageHandler.java25
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/comm/PdpTracker.java153
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/parameters/PdpParameters.java5
-rw-r--r--main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java46
5 files changed, 216 insertions, 14 deletions
diff --git a/main/src/main/java/org/onap/policy/pap/main/PapConstants.java b/main/src/main/java/org/onap/policy/pap/main/PapConstants.java
index 64401b81..3fc36f35 100644
--- a/main/src/main/java/org/onap/policy/pap/main/PapConstants.java
+++ b/main/src/main/java/org/onap/policy/pap/main/PapConstants.java
@@ -30,6 +30,7 @@ public class PapConstants {
public static final String REG_STATISTICS_MANAGER = "object:manager/statistics";
public static final String REG_PDP_MODIFY_LOCK = "lock:pdp";
public static final String REG_PDP_MODIFY_MAP = "object:pdp/modify/map";
+ public static final String REG_PDP_TRACKER = "object:pdp/tracker";
public static final String REG_PAP_DAO_FACTORY = "object:pap/dao/factory";
// topic names
diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/PdpStatusMessageHandler.java b/main/src/main/java/org/onap/policy/pap/main/comm/PdpStatusMessageHandler.java
index 7ef9c594..f5184c93 100644
--- a/main/src/main/java/org/onap/policy/pap/main/comm/PdpStatusMessageHandler.java
+++ b/main/src/main/java/org/onap/policy/pap/main/comm/PdpStatusMessageHandler.java
@@ -28,6 +28,7 @@ import java.util.Optional;
import java.util.TreeMap;
import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.onap.policy.common.parameters.ParameterService;
import org.onap.policy.common.utils.services.Registry;
import org.onap.policy.models.base.PfModelException;
import org.onap.policy.models.pdp.concepts.Pdp;
@@ -44,6 +45,7 @@ import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicyIdentifier;
import org.onap.policy.pap.main.PapConstants;
import org.onap.policy.pap.main.PolicyModelsProviderFactoryWrapper;
import org.onap.policy.pap.main.PolicyPapException;
+import org.onap.policy.pap.main.parameters.PapParameterGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,9 +56,10 @@ import org.slf4j.LoggerFactory;
* @author Ram Krishna Verma (ram.krishna.verma@est.tech)
*/
public class PdpStatusMessageHandler {
-
private static final Logger LOGGER = LoggerFactory.getLogger(PdpStatusMessageHandler.class);
+ private static final String PAP_GROUP_PARAMS_NAME = "PapGroup";
+
/**
* Lock used when updating PDPs.
*/
@@ -70,7 +73,12 @@ public class PdpStatusMessageHandler {
/**
* Factory for PAP DAO.
*/
- PolicyModelsProviderFactoryWrapper modelProviderWrapper;
+ private final PolicyModelsProviderFactoryWrapper modelProviderWrapper;
+
+ /**
+ * Heart beat interval, in milliseconds, to pass to PDPs.
+ */
+ private final long heartBeatMs;
/**
* Constructs the object.
@@ -79,6 +87,9 @@ public class PdpStatusMessageHandler {
modelProviderWrapper = Registry.get(PapConstants.REG_PAP_DAO_FACTORY, PolicyModelsProviderFactoryWrapper.class);
updateLock = Registry.get(PapConstants.REG_PDP_MODIFY_LOCK, Object.class);
requestMap = Registry.get(PapConstants.REG_PDP_MODIFY_MAP, PdpModifyRequestMap.class);
+
+ PapParameterGroup params = ParameterService.get(PAP_GROUP_PARAMS_NAME);
+ heartBeatMs = params.getPdpParameters().getHeartBeatMs();
}
/**
@@ -94,6 +105,15 @@ public class PdpStatusMessageHandler {
} else {
handlePdpHeartbeat(message, databaseProvider);
}
+
+ /*
+ * Indicate that a heart beat was received from the PDP. This is invoked
+ * only if handleXxx() does not throw an exception.
+ */
+ if (message.getName() != null) {
+ PdpTracker pdpTracker = Registry.get(PapConstants.REG_PDP_TRACKER);
+ pdpTracker.add(message.getName());
+ }
} catch (final PolicyPapException exp) {
LOGGER.error("Operation Failed", exp);
} catch (final Exception exp) {
@@ -297,6 +317,7 @@ public class PdpStatusMessageHandler {
update.setPdpGroup(pdpGroupName);
update.setPdpSubgroup(subGroup.getPdpType());
update.setPolicies(getToscaPolicies(subGroup, databaseProvider));
+ update.setPdpHeartbeatIntervalMs(heartBeatMs);
LOGGER.debug("Created PdpUpdate message - {}", update);
return update;
diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/PdpTracker.java b/main/src/main/java/org/onap/policy/pap/main/comm/PdpTracker.java
new file mode 100644
index 00000000..04dfe813
--- /dev/null
+++ b/main/src/main/java/org/onap/policy/pap/main/comm/PdpTracker.java
@@ -0,0 +1,153 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP PAP
+ * ================================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.pap.main.comm;
+
+import java.util.HashMap;
+import java.util.Map;
+import lombok.Builder;
+import lombok.NonNull;
+import org.onap.policy.models.base.PfModelException;
+import org.onap.policy.models.pdp.concepts.Pdp;
+import org.onap.policy.models.pdp.concepts.PdpGroup;
+import org.onap.policy.models.pdp.concepts.PdpSubGroup;
+import org.onap.policy.models.provider.PolicyModelsProvider;
+import org.onap.policy.pap.main.PolicyModelsProviderFactoryWrapper;
+import org.onap.policy.pap.main.PolicyPapRuntimeException;
+import org.onap.policy.pap.main.comm.TimerManager.Timer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tracks PDPs. When a PDP is added to the tracker, a timer is started. If the PDP is not
+ * re-added to the tracker before the timer expires, then
+ * {@link PdpModifyRequestMap#removeFromGroups(String)} is called.
+ */
+public class PdpTracker {
+ private static final Logger logger = LoggerFactory.getLogger(PdpTracker.class);
+
+ /**
+ * PDP expiration timers.
+ */
+ private final TimerManager timers;
+
+ /**
+ * Maps a PDP name to its expiration timer.
+ */
+ private final Map<String, TimerManager.Timer> pdp2timer = new HashMap<>();
+
+ /**
+ * PDP modification lock.
+ */
+ private final Object modifyLock;
+
+ /**
+ * Used to remove a PDP from its group/subgroup.
+ */
+ private final PdpModifyRequestMap requestMap;
+
+
+ /**
+ * Constructs the object. Loads the list of PDPs to be tracked, from the DB.
+ *
+ * @param requestMap map used to remove a PDP from its group/subgroup
+ * @param modifyLock object to be locked while data structures are updated
+ * @param timers timers used to detect missed heart beats
+ * @param daoFactory DAO factory
+ */
+ @Builder
+ public PdpTracker(@NonNull PdpModifyRequestMap requestMap, @NonNull Object modifyLock, @NonNull TimerManager timers,
+ @NonNull PolicyModelsProviderFactoryWrapper daoFactory) {
+
+ this.requestMap = requestMap;
+ this.modifyLock = modifyLock;
+ this.timers = timers;
+
+ loadPdps(daoFactory);
+ }
+
+ /**
+ * Loads the PDPs from the DB.
+ *
+ * @param daoFactory DAO factory
+ */
+ private void loadPdps(PolicyModelsProviderFactoryWrapper daoFactory) {
+ synchronized (modifyLock) {
+ try (PolicyModelsProvider dao = daoFactory.create()) {
+ for (PdpGroup group : dao.getPdpGroups(null)) {
+ loadPdpsFromGroup(group);
+ }
+
+ } catch (PfModelException e) {
+ throw new PolicyPapRuntimeException("cannot load PDPs from the DB", e);
+ }
+ }
+ }
+
+ /**
+ * Loads the PDPs appearing within a group.
+ *
+ * @param group group whose PDPs are to be loaded
+ */
+ private void loadPdpsFromGroup(PdpGroup group) {
+ for (PdpSubGroup subgrp : group.getPdpSubgroups()) {
+ for (Pdp pdp : subgrp.getPdpInstances()) {
+ add(pdp.getInstanceId());
+ }
+ }
+ }
+
+ /**
+ * Adds a PDP to the tracker and starts its timer. If a timer is already running, the
+ * old timer is cancelled.
+ *
+ * @param pdpName name of the PDP
+ */
+ public void add(String pdpName) {
+ synchronized (modifyLock) {
+ Timer timer = pdp2timer.remove(pdpName);
+ if (timer != null) {
+ timer.cancel();
+ }
+
+ timer = timers.register(pdpName, this::handleTimeout);
+ pdp2timer.put(pdpName, timer);
+ }
+ }
+
+ /**
+ * Handles a timeout. Removes the PDP from {@link #pdp2timer}.
+ *
+ * @param pdpName name of the PDP whose timer has expired
+ */
+ private void handleTimeout(String pdpName) {
+ synchronized (modifyLock) {
+ // remove timer - no need to cancel it, as TimerManager does that
+ pdp2timer.remove(pdpName);
+
+ try {
+ requestMap.removeFromGroups(pdpName);
+
+ } catch (PfModelException e) {
+ logger.warn("unable to remove PDP {} from its group/subgroup", pdpName, e);
+ }
+ }
+ }
+}
diff --git a/main/src/main/java/org/onap/policy/pap/main/parameters/PdpParameters.java b/main/src/main/java/org/onap/policy/pap/main/parameters/PdpParameters.java
index 84fe353b..1776772a 100644
--- a/main/src/main/java/org/onap/policy/pap/main/parameters/PdpParameters.java
+++ b/main/src/main/java/org/onap/policy/pap/main/parameters/PdpParameters.java
@@ -22,6 +22,7 @@ package org.onap.policy.pap.main.parameters;
import lombok.Getter;
import org.onap.policy.common.parameters.ParameterGroupImpl;
+import org.onap.policy.common.parameters.annotations.Min;
import org.onap.policy.common.parameters.annotations.NotBlank;
import org.onap.policy.common.parameters.annotations.NotNull;
@@ -32,6 +33,10 @@ import org.onap.policy.common.parameters.annotations.NotNull;
@NotBlank
@Getter
public class PdpParameters extends ParameterGroupImpl {
+
+ @Min(1)
+ private long heartBeatMs;
+
private PdpUpdateParameters updateParameters;
private PdpStateChangeParameters stateChangeParameters;
diff --git a/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java b/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java
index 1b7281ca..e1ad80e2 100644
--- a/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java
+++ b/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java
@@ -24,7 +24,6 @@ package org.onap.policy.pap.main.startstop;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
-
import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
import org.onap.policy.common.endpoints.event.comm.TopicSource;
import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
@@ -39,6 +38,7 @@ import org.onap.policy.pap.main.PolicyModelsProviderFactoryWrapper;
import org.onap.policy.pap.main.PolicyPapRuntimeException;
import org.onap.policy.pap.main.comm.PdpHeartbeatListener;
import org.onap.policy.pap.main.comm.PdpModifyRequestMap;
+import org.onap.policy.pap.main.comm.PdpTracker;
import org.onap.policy.pap.main.comm.Publisher;
import org.onap.policy.pap.main.comm.TimerManager;
import org.onap.policy.pap.main.parameters.PapParameterGroup;
@@ -57,12 +57,12 @@ public class PapActivator extends ServiceManagerContainer {
private static final String[] MSG_TYPE_NAMES = { "messageName" };
private static final String[] REQ_ID_NAMES = { "response", "responseTo" };
- private final PapParameterGroup papParameterGroup;
-
/**
- * The PAP REST API server.
+ * Max number of heat beats that can be missed before PAP removes a PDP.
*/
- private PapRestServer restServer;
+ private static final int MAX_MISSED_HEARTBEATS = 3;
+
+ private final PapParameterGroup papParameterGroup;
/**
* Listens for messages on the topic, decodes them into a {@link PdpStatus} message, and then dispatches them to
@@ -110,7 +110,10 @@ public class PapActivator extends ServiceManagerContainer {
final AtomicReference<Publisher> pdpPub = new AtomicReference<>();
final AtomicReference<TimerManager> pdpUpdTimers = new AtomicReference<>();
final AtomicReference<TimerManager> pdpStChgTimers = new AtomicReference<>();
+ final AtomicReference<TimerManager> heartBeatTimers = new AtomicReference<>();
final AtomicReference<PolicyModelsProviderFactoryWrapper> daoFactory = new AtomicReference<>();
+ final AtomicReference<PdpModifyRequestMap> requestMap = new AtomicReference<>();
+ final AtomicReference<PapRestServer> restServer = new AtomicReference<>();
// @formatter:off
addAction("PAP parameters",
@@ -153,6 +156,14 @@ public class PapActivator extends ServiceManagerContainer {
},
() -> pdpPub.get().stop());
+ addAction("PDP heart beat timers",
+ () -> {
+ long maxWaitHeartBeatMs = MAX_MISSED_HEARTBEATS * pdpParams.getHeartBeatMs();
+ heartBeatTimers.set(new TimerManager("heart beat", maxWaitHeartBeatMs));
+ startThread(heartBeatTimers.get());
+ },
+ () -> heartBeatTimers.get().stop());
+
addAction("PDP update timers",
() -> {
pdpUpdTimers.set(new TimerManager("update", pdpParams.getUpdateParameters().getMaxWaitMs()));
@@ -172,7 +183,8 @@ public class PapActivator extends ServiceManagerContainer {
() -> Registry.unregister(PapConstants.REG_PDP_MODIFY_LOCK));
addAction("PDP modification requests",
- () -> Registry.register(PapConstants.REG_PDP_MODIFY_MAP, new PdpModifyRequestMap(
+ () -> {
+ requestMap.set(new PdpModifyRequestMap(
new PdpModifyRequestMapParams()
.setDaoFactory(daoFactory.get())
.setModifyLock(pdpUpdateLock)
@@ -180,16 +192,26 @@ public class PapActivator extends ServiceManagerContainer {
.setPublisher(pdpPub.get())
.setResponseDispatcher(reqIdDispatcher)
.setStateChangeTimers(pdpStChgTimers.get())
- .setUpdateTimers(pdpUpdTimers.get()))),
+ .setUpdateTimers(pdpUpdTimers.get())));
+ Registry.register(PapConstants.REG_PDP_MODIFY_MAP, requestMap.get());
+ },
() -> Registry.unregister(PapConstants.REG_PDP_MODIFY_MAP));
- addAction("Create REST server",
- () -> restServer = new PapRestServer(papParameterGroup.getRestServerParameters()),
- () -> restServer = null);
+ addAction("PDP heart beat tracker",
+ () -> Registry.register(PapConstants.REG_PDP_TRACKER, PdpTracker.builder()
+ .daoFactory(daoFactory.get())
+ .timers(heartBeatTimers.get())
+ .modifyLock(pdpUpdateLock)
+ .requestMap(requestMap.get())
+ .build()),
+ () -> Registry.unregister(PapConstants.REG_PDP_TRACKER));
addAction("REST server",
- () -> restServer.start(),
- () -> restServer.stop());
+ () -> {
+ restServer.set(new PapRestServer(papParameterGroup.getRestServerParameters()));
+ restServer.get().start();
+ },
+ () -> restServer.get().stop());
// @formatter:on
}