diff options
Diffstat (limited to 'main/src/main/java/org')
5 files changed, 216 insertions, 14 deletions
diff --git a/main/src/main/java/org/onap/policy/pap/main/PapConstants.java b/main/src/main/java/org/onap/policy/pap/main/PapConstants.java index 64401b81..3fc36f35 100644 --- a/main/src/main/java/org/onap/policy/pap/main/PapConstants.java +++ b/main/src/main/java/org/onap/policy/pap/main/PapConstants.java @@ -30,6 +30,7 @@ public class PapConstants { public static final String REG_STATISTICS_MANAGER = "object:manager/statistics"; public static final String REG_PDP_MODIFY_LOCK = "lock:pdp"; public static final String REG_PDP_MODIFY_MAP = "object:pdp/modify/map"; + public static final String REG_PDP_TRACKER = "object:pdp/tracker"; public static final String REG_PAP_DAO_FACTORY = "object:pap/dao/factory"; // topic names diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/PdpStatusMessageHandler.java b/main/src/main/java/org/onap/policy/pap/main/comm/PdpStatusMessageHandler.java index 7ef9c594..f5184c93 100644 --- a/main/src/main/java/org/onap/policy/pap/main/comm/PdpStatusMessageHandler.java +++ b/main/src/main/java/org/onap/policy/pap/main/comm/PdpStatusMessageHandler.java @@ -28,6 +28,7 @@ import java.util.Optional; import java.util.TreeMap; import org.apache.commons.lang3.builder.EqualsBuilder; +import org.onap.policy.common.parameters.ParameterService; import org.onap.policy.common.utils.services.Registry; import org.onap.policy.models.base.PfModelException; import org.onap.policy.models.pdp.concepts.Pdp; @@ -44,6 +45,7 @@ import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicyIdentifier; import org.onap.policy.pap.main.PapConstants; import org.onap.policy.pap.main.PolicyModelsProviderFactoryWrapper; import org.onap.policy.pap.main.PolicyPapException; +import org.onap.policy.pap.main.parameters.PapParameterGroup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,9 +56,10 @@ import org.slf4j.LoggerFactory; * @author Ram Krishna Verma (ram.krishna.verma@est.tech) */ public class PdpStatusMessageHandler { - private static final Logger LOGGER = LoggerFactory.getLogger(PdpStatusMessageHandler.class); + private static final String PAP_GROUP_PARAMS_NAME = "PapGroup"; + /** * Lock used when updating PDPs. */ @@ -70,7 +73,12 @@ public class PdpStatusMessageHandler { /** * Factory for PAP DAO. */ - PolicyModelsProviderFactoryWrapper modelProviderWrapper; + private final PolicyModelsProviderFactoryWrapper modelProviderWrapper; + + /** + * Heart beat interval, in milliseconds, to pass to PDPs. + */ + private final long heartBeatMs; /** * Constructs the object. @@ -79,6 +87,9 @@ public class PdpStatusMessageHandler { modelProviderWrapper = Registry.get(PapConstants.REG_PAP_DAO_FACTORY, PolicyModelsProviderFactoryWrapper.class); updateLock = Registry.get(PapConstants.REG_PDP_MODIFY_LOCK, Object.class); requestMap = Registry.get(PapConstants.REG_PDP_MODIFY_MAP, PdpModifyRequestMap.class); + + PapParameterGroup params = ParameterService.get(PAP_GROUP_PARAMS_NAME); + heartBeatMs = params.getPdpParameters().getHeartBeatMs(); } /** @@ -94,6 +105,15 @@ public class PdpStatusMessageHandler { } else { handlePdpHeartbeat(message, databaseProvider); } + + /* + * Indicate that a heart beat was received from the PDP. This is invoked + * only if handleXxx() does not throw an exception. + */ + if (message.getName() != null) { + PdpTracker pdpTracker = Registry.get(PapConstants.REG_PDP_TRACKER); + pdpTracker.add(message.getName()); + } } catch (final PolicyPapException exp) { LOGGER.error("Operation Failed", exp); } catch (final Exception exp) { @@ -297,6 +317,7 @@ public class PdpStatusMessageHandler { update.setPdpGroup(pdpGroupName); update.setPdpSubgroup(subGroup.getPdpType()); update.setPolicies(getToscaPolicies(subGroup, databaseProvider)); + update.setPdpHeartbeatIntervalMs(heartBeatMs); LOGGER.debug("Created PdpUpdate message - {}", update); return update; diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/PdpTracker.java b/main/src/main/java/org/onap/policy/pap/main/comm/PdpTracker.java new file mode 100644 index 00000000..04dfe813 --- /dev/null +++ b/main/src/main/java/org/onap/policy/pap/main/comm/PdpTracker.java @@ -0,0 +1,153 @@ +/* + * ============LICENSE_START======================================================= + * ONAP PAP + * ================================================================================ + * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.pap.main.comm; + +import java.util.HashMap; +import java.util.Map; +import lombok.Builder; +import lombok.NonNull; +import org.onap.policy.models.base.PfModelException; +import org.onap.policy.models.pdp.concepts.Pdp; +import org.onap.policy.models.pdp.concepts.PdpGroup; +import org.onap.policy.models.pdp.concepts.PdpSubGroup; +import org.onap.policy.models.provider.PolicyModelsProvider; +import org.onap.policy.pap.main.PolicyModelsProviderFactoryWrapper; +import org.onap.policy.pap.main.PolicyPapRuntimeException; +import org.onap.policy.pap.main.comm.TimerManager.Timer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tracks PDPs. When a PDP is added to the tracker, a timer is started. If the PDP is not + * re-added to the tracker before the timer expires, then + * {@link PdpModifyRequestMap#removeFromGroups(String)} is called. + */ +public class PdpTracker { + private static final Logger logger = LoggerFactory.getLogger(PdpTracker.class); + + /** + * PDP expiration timers. + */ + private final TimerManager timers; + + /** + * Maps a PDP name to its expiration timer. + */ + private final Map<String, TimerManager.Timer> pdp2timer = new HashMap<>(); + + /** + * PDP modification lock. + */ + private final Object modifyLock; + + /** + * Used to remove a PDP from its group/subgroup. + */ + private final PdpModifyRequestMap requestMap; + + + /** + * Constructs the object. Loads the list of PDPs to be tracked, from the DB. + * + * @param requestMap map used to remove a PDP from its group/subgroup + * @param modifyLock object to be locked while data structures are updated + * @param timers timers used to detect missed heart beats + * @param daoFactory DAO factory + */ + @Builder + public PdpTracker(@NonNull PdpModifyRequestMap requestMap, @NonNull Object modifyLock, @NonNull TimerManager timers, + @NonNull PolicyModelsProviderFactoryWrapper daoFactory) { + + this.requestMap = requestMap; + this.modifyLock = modifyLock; + this.timers = timers; + + loadPdps(daoFactory); + } + + /** + * Loads the PDPs from the DB. + * + * @param daoFactory DAO factory + */ + private void loadPdps(PolicyModelsProviderFactoryWrapper daoFactory) { + synchronized (modifyLock) { + try (PolicyModelsProvider dao = daoFactory.create()) { + for (PdpGroup group : dao.getPdpGroups(null)) { + loadPdpsFromGroup(group); + } + + } catch (PfModelException e) { + throw new PolicyPapRuntimeException("cannot load PDPs from the DB", e); + } + } + } + + /** + * Loads the PDPs appearing within a group. + * + * @param group group whose PDPs are to be loaded + */ + private void loadPdpsFromGroup(PdpGroup group) { + for (PdpSubGroup subgrp : group.getPdpSubgroups()) { + for (Pdp pdp : subgrp.getPdpInstances()) { + add(pdp.getInstanceId()); + } + } + } + + /** + * Adds a PDP to the tracker and starts its timer. If a timer is already running, the + * old timer is cancelled. + * + * @param pdpName name of the PDP + */ + public void add(String pdpName) { + synchronized (modifyLock) { + Timer timer = pdp2timer.remove(pdpName); + if (timer != null) { + timer.cancel(); + } + + timer = timers.register(pdpName, this::handleTimeout); + pdp2timer.put(pdpName, timer); + } + } + + /** + * Handles a timeout. Removes the PDP from {@link #pdp2timer}. + * + * @param pdpName name of the PDP whose timer has expired + */ + private void handleTimeout(String pdpName) { + synchronized (modifyLock) { + // remove timer - no need to cancel it, as TimerManager does that + pdp2timer.remove(pdpName); + + try { + requestMap.removeFromGroups(pdpName); + + } catch (PfModelException e) { + logger.warn("unable to remove PDP {} from its group/subgroup", pdpName, e); + } + } + } +} diff --git a/main/src/main/java/org/onap/policy/pap/main/parameters/PdpParameters.java b/main/src/main/java/org/onap/policy/pap/main/parameters/PdpParameters.java index 84fe353b..1776772a 100644 --- a/main/src/main/java/org/onap/policy/pap/main/parameters/PdpParameters.java +++ b/main/src/main/java/org/onap/policy/pap/main/parameters/PdpParameters.java @@ -22,6 +22,7 @@ package org.onap.policy.pap.main.parameters; import lombok.Getter; import org.onap.policy.common.parameters.ParameterGroupImpl; +import org.onap.policy.common.parameters.annotations.Min; import org.onap.policy.common.parameters.annotations.NotBlank; import org.onap.policy.common.parameters.annotations.NotNull; @@ -32,6 +33,10 @@ import org.onap.policy.common.parameters.annotations.NotNull; @NotBlank @Getter public class PdpParameters extends ParameterGroupImpl { + + @Min(1) + private long heartBeatMs; + private PdpUpdateParameters updateParameters; private PdpStateChangeParameters stateChangeParameters; diff --git a/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java b/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java index 1b7281ca..e1ad80e2 100644 --- a/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java +++ b/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java @@ -24,7 +24,6 @@ package org.onap.policy.pap.main.startstop; import java.util.Arrays; import java.util.Properties; import java.util.concurrent.atomic.AtomicReference; - import org.onap.policy.common.endpoints.event.comm.TopicEndpoint; import org.onap.policy.common.endpoints.event.comm.TopicSource; import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher; @@ -39,6 +38,7 @@ import org.onap.policy.pap.main.PolicyModelsProviderFactoryWrapper; import org.onap.policy.pap.main.PolicyPapRuntimeException; import org.onap.policy.pap.main.comm.PdpHeartbeatListener; import org.onap.policy.pap.main.comm.PdpModifyRequestMap; +import org.onap.policy.pap.main.comm.PdpTracker; import org.onap.policy.pap.main.comm.Publisher; import org.onap.policy.pap.main.comm.TimerManager; import org.onap.policy.pap.main.parameters.PapParameterGroup; @@ -57,12 +57,12 @@ public class PapActivator extends ServiceManagerContainer { private static final String[] MSG_TYPE_NAMES = { "messageName" }; private static final String[] REQ_ID_NAMES = { "response", "responseTo" }; - private final PapParameterGroup papParameterGroup; - /** - * The PAP REST API server. + * Max number of heat beats that can be missed before PAP removes a PDP. */ - private PapRestServer restServer; + private static final int MAX_MISSED_HEARTBEATS = 3; + + private final PapParameterGroup papParameterGroup; /** * Listens for messages on the topic, decodes them into a {@link PdpStatus} message, and then dispatches them to @@ -110,7 +110,10 @@ public class PapActivator extends ServiceManagerContainer { final AtomicReference<Publisher> pdpPub = new AtomicReference<>(); final AtomicReference<TimerManager> pdpUpdTimers = new AtomicReference<>(); final AtomicReference<TimerManager> pdpStChgTimers = new AtomicReference<>(); + final AtomicReference<TimerManager> heartBeatTimers = new AtomicReference<>(); final AtomicReference<PolicyModelsProviderFactoryWrapper> daoFactory = new AtomicReference<>(); + final AtomicReference<PdpModifyRequestMap> requestMap = new AtomicReference<>(); + final AtomicReference<PapRestServer> restServer = new AtomicReference<>(); // @formatter:off addAction("PAP parameters", @@ -153,6 +156,14 @@ public class PapActivator extends ServiceManagerContainer { }, () -> pdpPub.get().stop()); + addAction("PDP heart beat timers", + () -> { + long maxWaitHeartBeatMs = MAX_MISSED_HEARTBEATS * pdpParams.getHeartBeatMs(); + heartBeatTimers.set(new TimerManager("heart beat", maxWaitHeartBeatMs)); + startThread(heartBeatTimers.get()); + }, + () -> heartBeatTimers.get().stop()); + addAction("PDP update timers", () -> { pdpUpdTimers.set(new TimerManager("update", pdpParams.getUpdateParameters().getMaxWaitMs())); @@ -172,7 +183,8 @@ public class PapActivator extends ServiceManagerContainer { () -> Registry.unregister(PapConstants.REG_PDP_MODIFY_LOCK)); addAction("PDP modification requests", - () -> Registry.register(PapConstants.REG_PDP_MODIFY_MAP, new PdpModifyRequestMap( + () -> { + requestMap.set(new PdpModifyRequestMap( new PdpModifyRequestMapParams() .setDaoFactory(daoFactory.get()) .setModifyLock(pdpUpdateLock) @@ -180,16 +192,26 @@ public class PapActivator extends ServiceManagerContainer { .setPublisher(pdpPub.get()) .setResponseDispatcher(reqIdDispatcher) .setStateChangeTimers(pdpStChgTimers.get()) - .setUpdateTimers(pdpUpdTimers.get()))), + .setUpdateTimers(pdpUpdTimers.get()))); + Registry.register(PapConstants.REG_PDP_MODIFY_MAP, requestMap.get()); + }, () -> Registry.unregister(PapConstants.REG_PDP_MODIFY_MAP)); - addAction("Create REST server", - () -> restServer = new PapRestServer(papParameterGroup.getRestServerParameters()), - () -> restServer = null); + addAction("PDP heart beat tracker", + () -> Registry.register(PapConstants.REG_PDP_TRACKER, PdpTracker.builder() + .daoFactory(daoFactory.get()) + .timers(heartBeatTimers.get()) + .modifyLock(pdpUpdateLock) + .requestMap(requestMap.get()) + .build()), + () -> Registry.unregister(PapConstants.REG_PDP_TRACKER)); addAction("REST server", - () -> restServer.start(), - () -> restServer.stop()); + () -> { + restServer.set(new PapRestServer(papParameterGroup.getRestServerParameters())); + restServer.get().start(); + }, + () -> restServer.get().stop()); // @formatter:on } |