diff options
Diffstat (limited to 'main')
3 files changed, 375 insertions, 1 deletions
diff --git a/main/src/main/java/org/onap/policy/pap/main/PapConstants.java b/main/src/main/java/org/onap/policy/pap/main/PapConstants.java index 7d4cb7b4..209dc612 100644 --- a/main/src/main/java/org/onap/policy/pap/main/PapConstants.java +++ b/main/src/main/java/org/onap/policy/pap/main/PapConstants.java @@ -25,7 +25,7 @@ import org.onap.policy.common.utils.network.NetworkUtil; /** * Names of various items contained in the Registry. */ -public class PapConstants { +public final class PapConstants { // Registry keys public static final String REG_PAP_ACTIVATOR = "object:activator/pap"; @@ -38,6 +38,7 @@ public class PapConstants { // topic names public static final String TOPIC_POLICY_PDP_PAP = "POLICY-PDP-PAP"; public static final String TOPIC_POLICY_NOTIFICATION = "POLICY-NOTIFICATION"; + public static final String TOPIC_POLICY_STATISTICS = "POLICY-STATISTICS"; // policy components names public static final String POLICY_API = "api"; diff --git a/main/src/main/java/org/onap/policy/pap/main/comm/PdpStatisticsListener.java b/main/src/main/java/org/onap/policy/pap/main/comm/PdpStatisticsListener.java new file mode 100644 index 00000000..7559f0ce --- /dev/null +++ b/main/src/main/java/org/onap/policy/pap/main/comm/PdpStatisticsListener.java @@ -0,0 +1,138 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2021 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.pap.main.comm; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.listeners.TypedMessageListener; +import org.onap.policy.common.utils.services.Registry; +import org.onap.policy.models.base.PfModelException; +import org.onap.policy.models.pdp.concepts.PdpGroupFilter; +import org.onap.policy.models.pdp.concepts.PdpStatistics; +import org.onap.policy.models.pdp.concepts.PdpStatus; +import org.onap.policy.models.pdp.enums.PdpState; +import org.onap.policy.models.provider.PolicyModelsProvider; +import org.onap.policy.pap.main.PapConstants; +import org.onap.policy.pap.main.PolicyModelsProviderFactoryWrapper; +import org.onap.policy.pap.main.parameters.PdpParameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Listens for PDP statistics, found within {@link PdpStatus} messages. + */ +public class PdpStatisticsListener implements TypedMessageListener<PdpStatus> { + private static final Logger LOGGER = LoggerFactory.getLogger(PdpStatisticsListener.class); + + /** + * Maximum message age, in milliseconds - anything older than this is discarded. + */ + private final long maxMessageAgeMs; + + /** + * Lock used when updating PDPs. + */ + private final Object updateLock; + + /** + * Factory for PAP DAO. + */ + private final PolicyModelsProviderFactoryWrapper modelProviderWrapper; + + /** + * Constructs the object. + * + * @param params PDP parameters + */ + public PdpStatisticsListener(PdpParameters params) { + maxMessageAgeMs = params.getMaxMessageAgeMs(); + modelProviderWrapper = Registry.get(PapConstants.REG_PAP_DAO_FACTORY, PolicyModelsProviderFactoryWrapper.class); + updateLock = Registry.get(PapConstants.REG_PDP_MODIFY_LOCK, Object.class); + } + + @Override + public void onTopicEvent(CommInfrastructure infra, String topic, PdpStatus message) { + long diffms = System.currentTimeMillis() - message.getTimestampMs(); + if (diffms > maxMessageAgeMs) { + long diffsec = TimeUnit.SECONDS.convert(diffms, TimeUnit.MILLISECONDS); + LOGGER.info("discarding statistics message from {} age {}s", message.getName(), diffsec); + return; + } + + if (!validStatistics(message)) { + LOGGER.info("discarding invalid/null statistics message from {}", message.getName()); + return; + } + + synchronized (updateLock) { + try (PolicyModelsProvider databaseProvider = modelProviderWrapper.create()) { + handleStatistics(message, databaseProvider); + } catch (final Exception exp) { + LOGGER.error("database provider error", exp); + } + } + } + + private void handleStatistics(final PdpStatus message, final PolicyModelsProvider databaseProvider) + throws PfModelException { + + final String pdpType = message.getPdpType(); + final String pdpName = message.getName(); + final PdpGroupFilter filter = + PdpGroupFilter.builder().name(message.getPdpGroup()).groupState(PdpState.ACTIVE).build(); + boolean pdpFound = databaseProvider.getFilteredPdpGroups(filter).stream() + .flatMap(grp -> grp.getPdpSubgroups().stream()) + .filter(subgrp -> subgrp.getPdpType().equals(pdpType)) + .flatMap(subgrp -> subgrp.getPdpInstances().stream()) + .anyMatch(pdp -> pdp.getInstanceId().equals(pdpName)); + if (pdpFound) { + databaseProvider.createPdpStatistics(List.of(message.getStatistics())); + LOGGER.debug("Created PdpStatistics in DB for {}", pdpName); + } + } + + private boolean validStatistics(final PdpStatus message) { + PdpStatistics stats = message.getStatistics(); + if (stats == null) { + return false; + } + + // @formatter:off + return new EqualsBuilder() + .append(PdpState.TERMINATED.equals(message.getState()), false) + .append(message.getPdpGroup() != null, true) + .append(message.getPdpType() != null, true) + .append(message.getName() != null, true) + .append(stats.getPdpGroupName(), message.getPdpGroup()) + .append(stats.getPdpSubGroupName(), message.getPdpType()) + .append(stats.getPdpInstanceId(), message.getName()) + .append(stats.getPolicyDeployCount() >= 0, true) + .append(stats.getPolicyDeployFailCount() >= 0, true) + .append(stats.getPolicyDeploySuccessCount() >= 0, true) + .append(stats.getPolicyExecutedCount() >= 0, true) + .append(stats.getPolicyExecutedFailCount() >= 0, true) + .append(stats.getPolicyExecutedSuccessCount() >= 0, true) + .isEquals(); + // @formatter:on + } +} diff --git a/main/src/test/java/org/onap/policy/pap/main/comm/PdpStatisticsListenerTest.java b/main/src/test/java/org/onap/policy/pap/main/comm/PdpStatisticsListenerTest.java new file mode 100644 index 00000000..c0d1ed0e --- /dev/null +++ b/main/src/test/java/org/onap/policy/pap/main/comm/PdpStatisticsListenerTest.java @@ -0,0 +1,235 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP + * ================================================================================ + * Copyright (C) 2021 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.pap.main.comm; + +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Supplier; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.utils.services.Registry; +import org.onap.policy.models.base.PfModelException; +import org.onap.policy.models.pdp.concepts.Pdp; +import org.onap.policy.models.pdp.concepts.PdpGroup; +import org.onap.policy.models.pdp.concepts.PdpStatistics; +import org.onap.policy.models.pdp.concepts.PdpStatus; +import org.onap.policy.models.pdp.concepts.PdpSubGroup; +import org.onap.policy.models.pdp.enums.PdpState; +import org.onap.policy.models.provider.PolicyModelsProvider; +import org.onap.policy.pap.main.PapConstants; +import org.onap.policy.pap.main.PolicyModelsProviderFactoryWrapper; +import org.onap.policy.pap.main.parameters.PdpParameters; + +@RunWith(MockitoJUnitRunner.class) +public class PdpStatisticsListenerTest { + private static final long MAX_MSG_AGE_MS = 5000; + + private static final String GROUP = "myGroup"; + private static final String PDP_TYPE = "mySubgroup"; + private static final String MY_NAME = "myName"; + private static final CommInfrastructure INFRA = CommInfrastructure.DMAAP; + private static final String TOPIC = "MyTopic"; + + private static final long COUNT = 10; + + @Mock + private PdpParameters params; + + @Mock + private PolicyModelsProviderFactoryWrapper provFactory; + + @Mock + private PolicyModelsProvider provider; + + private PdpStatus msg; + private PdpStatistics stats; + + private PdpStatisticsListener listener; + + + @AfterClass + public static void tearDownAfterClass() { + Registry.newRegistry(); + } + + /** + * Sets up. + */ + @Before + public void setUp() throws PfModelException { + when(params.getMaxMessageAgeMs()).thenReturn(MAX_MSG_AGE_MS); + when(provFactory.create()).thenReturn(provider); + when(provider.getFilteredPdpGroups(any())).thenReturn(List.of()); + + Registry.newRegistry(); + + Registry.register(PapConstants.REG_PAP_DAO_FACTORY, provFactory); + Registry.register(PapConstants.REG_PDP_MODIFY_LOCK, new Object()); + + stats = new PdpStatistics(); + stats.setPdpGroupName(GROUP); + stats.setPdpSubGroupName(PDP_TYPE); + stats.setPdpInstanceId(MY_NAME); + stats.setPolicyDeployCount(COUNT); + stats.setPolicyDeployFailCount(COUNT); + stats.setPolicyDeploySuccessCount(COUNT); + stats.setPolicyExecutedCount(COUNT); + stats.setPolicyExecutedFailCount(COUNT); + stats.setPolicyExecutedSuccessCount(COUNT); + + msg = new PdpStatus(); + msg.setPdpGroup(GROUP); + msg.setPdpType(PDP_TYPE); + msg.setName(MY_NAME); + msg.setStatistics(stats); + msg.setTimestampMs(System.currentTimeMillis()); + + listener = new PdpStatisticsListener(params); + } + + @Test + public void testOnTopicEvent() throws PfModelException { + listener.onTopicEvent(INFRA, TOPIC, msg); + verify(provFactory).create(); + } + + @Test + public void testOnTopicEventOutOfDate() throws PfModelException { + msg.setTimestampMs(System.currentTimeMillis() - MAX_MSG_AGE_MS - 1); + listener.onTopicEvent(INFRA, TOPIC, msg); + + verify(provFactory, never()).create(); + } + + @Test + public void testOnTopicEventDbException() throws PfModelException { + when(provFactory.create()).thenThrow(new RuntimeException("expected exception")); + assertThatCode(() -> listener.onTopicEvent(INFRA, TOPIC, msg)).doesNotThrowAnyException(); + } + + @Test + public void testHandleStatistics() throws PfModelException { + // no matching group + listener.onTopicEvent(INFRA, TOPIC, msg); + verify(provFactory).create(); + verify(provider, never()).createPdpStatistics(any()); + + // matches + final Pdp pdp = new Pdp(); + pdp.setInstanceId(MY_NAME); + + // doesn't match + final Pdp pdp2 = new Pdp(); + pdp2.setInstanceId(MY_NAME + "aaa"); + + // matches, but PDP doesn't match + final PdpSubGroup subgrp = new PdpSubGroup(); + subgrp.setPdpType(PDP_TYPE); + subgrp.setPdpInstances(List.of(pdp2)); + + // doesn't match, but has matching PDP + final PdpSubGroup subgrp2 = new PdpSubGroup(); + subgrp2.setPdpType(PDP_TYPE + "bbb"); + subgrp2.setPdpInstances(List.of(pdp)); + + // has matching subgroup + final PdpGroup group = new PdpGroup(); + group.setPdpSubgroups(List.of(subgrp2, subgrp)); + + // no matching subgroup + final PdpGroup group2 = new PdpGroup(); + group2.setPdpSubgroups(List.of(subgrp2)); + + when(provider.getFilteredPdpGroups(any())).thenReturn(List.of(group2, group)); + + // nothing matches, so nothing should be inserted + listener.onTopicEvent(INFRA, TOPIC, msg); + verify(provider, never()).createPdpStatistics(any()); + + // add a matching pdp to the matching subgroup + subgrp.setPdpInstances(List.of(pdp, pdp)); + + // now it should update the statistics + listener.onTopicEvent(INFRA, TOPIC, msg); + verify(provider).createPdpStatistics(eq(List.of(stats))); + } + + @Test + public void testValidStatistics() throws PfModelException { + validateStats(msg::getState, msg::setState, PdpState.TERMINATED); + validateStats(msg::getStatistics, msg::setStatistics, null); + validateStats(msg::getPdpGroup, msg::setPdpGroup, null); + validateStats(msg::getPdpType, msg::setPdpType, null); + validateStats(msg::getName, msg::setName, null); + validateStats(stats::getPdpGroupName, stats::setPdpGroupName, GROUP + "ccc"); + validateStats(stats::getPdpSubGroupName, stats::setPdpSubGroupName, PDP_TYPE + "ddd"); + validateStats(stats::getPdpInstanceId, stats::setPdpInstanceId, MY_NAME + "eee"); + validateStats(stats::getPolicyDeployCount, stats::setPolicyDeployCount, -1L); + validateStats(stats::getPolicyDeployFailCount, stats::setPolicyDeployFailCount, -1L); + validateStats(stats::getPolicyDeploySuccessCount, stats::setPolicyDeploySuccessCount, -1L); + validateStats(stats::getPolicyExecutedCount, stats::setPolicyExecutedCount, -1L); + validateStats(stats::getPolicyExecutedFailCount, stats::setPolicyExecutedFailCount, -1L); + validateStats(stats::getPolicyExecutedSuccessCount, stats::setPolicyExecutedSuccessCount, -1L); + + // verify that all zeroes are OK + stats = new PdpStatistics(); + stats.setPdpGroupName(GROUP); + stats.setPdpSubGroupName(PDP_TYPE); + stats.setPdpInstanceId(MY_NAME); + + msg.setStatistics(stats); + + listener.onTopicEvent(INFRA, TOPIC, msg); + verify(provFactory).create(); + } + + /** + * Verifies that the provider is never created when one of the message's fields is set + * to an invalid value. + * + * @param <T> value type + * @param getter method to get the current value of the field + * @param setter method to change the field + * @param invalidValue invalid value for the field + * @throws PfModelException if an error occurs + */ + private <T> void validateStats(Supplier<T> getter, Consumer<T> setter, T invalidValue) throws PfModelException { + final T saved = getter.get(); + setter.accept(invalidValue); + + listener.onTopicEvent(INFRA, TOPIC, msg); + verify(provFactory, never()).create(); + + setter.accept(saved); + } +} |