From c8d0fd08038a956d2588e540d8c31c46acf21377 Mon Sep 17 00:00:00 2001 From: Sirisha_Manchikanti Date: Wed, 14 Jul 2021 23:20:30 +0100 Subject: Handle participant register,deregister,update messages Send ParticipantRegister, ParticipantDeregister and ParticipantUpdateAck from participant to controlloop runtime Receive ParticipantRegisterAck, ParticipantDeregisterAck and ParticipantUpdate messages from controlloop runtime Issue-ID: POLICY-3414 Signed-off-by: Sirisha_Manchikanti Change-Id: Ifb3c4fa64bdd2d50ec2302b35f342847f4994c5d --- .../src/main/resources/config/application.yaml | 4 - .../src/test/resources/application_test.properties | 3 - .../src/main/resources/config/application.yaml | 4 - .../src/test/resources/application_test.properties | 3 - .../src/main/resources/config/application.yaml | 4 - .../policy/endtoend/ParticipantMessagesTest.java | 149 +++++++++++++++++++++ .../policy/endtoend/ParticipantPolicyTest.java | 2 + .../policy/main/utils/TestListenerUtils.java | 41 ++++++ .../src/test/resources/application_test.properties | 3 - .../src/main/resources/config/application.yaml | 4 - .../src/test/resources/application_test.properties | 3 - .../api/ParticipantIntermediaryApi.java | 11 +- .../api/impl/ParticipantIntermediaryApiImpl.java | 10 ++ .../intermediary/comm/MessageSender.java | 38 +++++- .../intermediary/comm/ParticipantAckListener.java | 57 ++++++++ .../comm/ParticipantDeregisterAckListener.java | 42 ++++++ .../comm/ParticipantMessagePublisher.java | 93 +++++++++++++ .../comm/ParticipantRegisterAckListener.java | 42 ++++++ .../comm/ParticipantStatusPublisher.java | 57 -------- .../comm/ParticipantUpdateListener.java | 41 ++++++ .../intermediary/config/BeanFactory.java | 10 +- .../handler/IntermediaryActivator.java | 29 +++- .../intermediary/handler/ParticipantHandler.java | 99 ++++++++++++-- 23 files changed, 643 insertions(+), 106 deletions(-) create mode 100644 participant/participant-impl/participant-impl-policy/src/test/java/org/onap/policy/clamp/controlloop/participant/policy/endtoend/ParticipantMessagesTest.java create mode 100644 participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantAckListener.java create mode 100644 participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantDeregisterAckListener.java create mode 100644 participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantMessagePublisher.java create mode 100644 participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantRegisterAckListener.java delete mode 100644 participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusPublisher.java create mode 100644 participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantUpdateListener.java diff --git a/participant/participant-impl/participant-impl-dcae/src/main/resources/config/application.yaml b/participant/participant-impl/participant-impl-dcae/src/main/resources/config/application.yaml index 36b9f846a..44ba5b3e8 100644 --- a/participant/participant-impl/participant-impl-dcae/src/main/resources/config/application.yaml +++ b/participant/participant-impl/participant-impl-dcae/src/main/resources/config/application.yaml @@ -46,10 +46,6 @@ participant: topic: POLICY-CLRUNTIME-PARTICIPANT servers[0]: ${topicServer:message-router} topicCommInfrastructure: dmaap - topicSinks[1]: - topic: POLICY-NOTIFICATION - servers[0]: ${topicServer:message-router} - topicCommInfrastructure: dmaap checkCount: 10 secCount: 10 jsonBodyConsulPath: src/main/resources/parameters/consul.json diff --git a/participant/participant-impl/participant-impl-dcae/src/test/resources/application_test.properties b/participant/participant-impl/participant-impl-dcae/src/test/resources/application_test.properties index d585dd3c9..2c775c28d 100644 --- a/participant/participant-impl/participant-impl-dcae/src/test/resources/application_test.properties +++ b/participant/participant-impl/participant-impl-dcae/src/test/resources/application_test.properties @@ -34,8 +34,5 @@ participant.intermediaryParameters.clampControlLoopTopics.topicSources[0].fetchT participant.intermediaryParameters.clampControlLoopTopics.topicSinks[0].topic=POLICY-CLRUNTIME-PARTICIPANT participant.intermediaryParameters.clampControlLoopTopics.topicSinks[0].servers[0]=localhost participant.intermediaryParameters.clampControlLoopTopics.topicSinks[0].topicCommInfrastructure=dmaap -participant.intermediaryParameters.clampControlLoopTopics.topicSinks[1].topic=POLICY-NOTIFICATION -participant.intermediaryParameters.clampControlLoopTopics.topicSinks[1].servers[0]=localhost -participant.intermediaryParameters.clampControlLoopTopics.topicSinks[1].topicCommInfrastructure=dmaap participant.checkCount=10 diff --git a/participant/participant-impl/participant-impl-kubernetes/src/main/resources/config/application.yaml b/participant/participant-impl/participant-impl-kubernetes/src/main/resources/config/application.yaml index 63ec8a295..3be694375 100644 --- a/participant/participant-impl/participant-impl-kubernetes/src/main/resources/config/application.yaml +++ b/participant/participant-impl/participant-impl-kubernetes/src/main/resources/config/application.yaml @@ -24,10 +24,6 @@ participant: topic: POLICY-CLRUNTIME-PARTICIPANT servers[0]: ${topicServer:message-router} topicCommInfrastructure: dmaap - topicSinks[1]: - topic: POLICY-NOTIFICATION - servers[0]: ${topicServer:message-router} - topicCommInfrastructure: dmaap management: endpoints: diff --git a/participant/participant-impl/participant-impl-kubernetes/src/test/resources/application_test.properties b/participant/participant-impl/participant-impl-kubernetes/src/test/resources/application_test.properties index 188623af0..5c61535a7 100644 --- a/participant/participant-impl/participant-impl-kubernetes/src/test/resources/application_test.properties +++ b/participant/participant-impl/participant-impl-kubernetes/src/test/resources/application_test.properties @@ -21,6 +21,3 @@ participant.intermediaryParameters.clampControlLoopTopics.topicSources[0].fetchT participant.intermediaryParameters.clampControlLoopTopics.topicSinks[0].topic=POLICY-CLRUNTIME-PARTICIPANT participant.intermediaryParameters.clampControlLoopTopics.topicSinks[0].servers[0]=localhost participant.intermediaryParameters.clampControlLoopTopics.topicSinks[0].topicCommInfrastructure=dmaap -participant.intermediaryParameters.clampControlLoopTopics.topicSinks[1].topic=POLICY-NOTIFICATION -participant.intermediaryParameters.clampControlLoopTopics.topicSinks[1].servers[0]=localhost -participant.intermediaryParameters.clampControlLoopTopics.topicSinks[1].topicCommInfrastructure=dmaap diff --git a/participant/participant-impl/participant-impl-policy/src/main/resources/config/application.yaml b/participant/participant-impl/participant-impl-policy/src/main/resources/config/application.yaml index 7bf4f623a..d4c7d7561 100644 --- a/participant/participant-impl/participant-impl-policy/src/main/resources/config/application.yaml +++ b/participant/participant-impl/participant-impl-policy/src/main/resources/config/application.yaml @@ -27,7 +27,3 @@ participant: topic: POLICY-CLRUNTIME-PARTICIPANT servers[0]: ${topicServer:message-router} topicCommInfrastructure: dmaap - topicSinks[1]: - topic: POLICY-NOTIFICATION - servers[0]: ${topicServer:message-router} - topicCommInfrastructure: dmaap diff --git a/participant/participant-impl/participant-impl-policy/src/test/java/org/onap/policy/clamp/controlloop/participant/policy/endtoend/ParticipantMessagesTest.java b/participant/participant-impl/participant-impl-policy/src/test/java/org/onap/policy/clamp/controlloop/participant/policy/endtoend/ParticipantMessagesTest.java new file mode 100644 index 000000000..4b4558b89 --- /dev/null +++ b/participant/participant-impl/participant-impl-policy/src/test/java/org/onap/policy/clamp/controlloop/participant/policy/endtoend/ParticipantMessagesTest.java @@ -0,0 +1,149 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.policy.endtoend; + +import java.time.Instant; +import java.util.Collections; +import java.util.UUID; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mockito; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregister; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregisterAck; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegisterAck; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdate; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck; +import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantDeregisterAckListener; +import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantMessagePublisher; +import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantRegisterAckListener; +import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantUpdateListener; +import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; +import org.onap.policy.clamp.controlloop.participant.policy.main.utils.TestListenerUtils; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +@ExtendWith(SpringExtension.class) +@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT) +@TestPropertySource(locations = {"classpath:application_test.properties"}) +class ParticipantMessagesTest { + + private static final Object lockit = new Object(); + private static final CommInfrastructure INFRA = CommInfrastructure.NOOP; + private static final String TOPIC = "my-topic"; + + @Autowired + private ParticipantHandler participantHandler; + + @Test + void testSendParticipantRegisterMessage() throws Exception { + final ParticipantRegister participantRegisterMsg = new ParticipantRegister(); + participantRegisterMsg.setParticipantId(getParticipantId()); + participantRegisterMsg.setTimestamp(Instant.now()); + participantRegisterMsg.setParticipantType(getParticipantType()); + + synchronized (lockit) { + ParticipantMessagePublisher participantMessagePublisher = + new ParticipantMessagePublisher(Collections.singletonList(Mockito.mock(TopicSink.class))); + participantMessagePublisher.sendParticipantRegister(participantRegisterMsg); + } + } + + @Test + void testReceiveParticipantRegisterAckMessage() throws Exception { + final ParticipantRegisterAck participantRegisterAckMsg = new ParticipantRegisterAck(); + participantRegisterAckMsg.setMessage("ParticipantRegisterAck message"); + participantRegisterAckMsg.setResponseTo(UUID.randomUUID()); + participantRegisterAckMsg.setResult(true); + + synchronized (lockit) { + ParticipantRegisterAckListener participantRegisterAckListener = + new ParticipantRegisterAckListener(participantHandler); + participantRegisterAckListener.onTopicEvent(INFRA, TOPIC, null, participantRegisterAckMsg); + } + } + + @Test + void testSendParticipantDeregisterMessage() throws Exception { + final ParticipantDeregister participantDeregisterMsg = new ParticipantDeregister(); + participantDeregisterMsg.setParticipantId(getParticipantId()); + participantDeregisterMsg.setTimestamp(Instant.now()); + participantDeregisterMsg.setParticipantType(getParticipantType()); + + synchronized (lockit) { + ParticipantMessagePublisher participantMessagePublisher = + new ParticipantMessagePublisher(Collections.singletonList(Mockito.mock(TopicSink.class))); + participantMessagePublisher.sendParticipantDeregister(participantDeregisterMsg); + } + } + + @Test + void testReceiveParticipantDeregisterAckMessage() throws Exception { + final ParticipantDeregisterAck participantDeregisterAckMsg = new ParticipantDeregisterAck(); + participantDeregisterAckMsg.setMessage("ParticipantDeregisterAck message"); + participantDeregisterAckMsg.setResponseTo(UUID.randomUUID()); + participantDeregisterAckMsg.setResult(true); + + synchronized (lockit) { + ParticipantDeregisterAckListener participantDeregisterAckListener = + new ParticipantDeregisterAckListener(participantHandler); + participantDeregisterAckListener.onTopicEvent(INFRA, TOPIC, null, participantDeregisterAckMsg); + } + } + + @Test + void testReceiveParticipantUpdateMessage() throws Exception { + ParticipantUpdate participantUpdateMsg = TestListenerUtils.createParticipantUpdateMsg(); + + synchronized (lockit) { + ParticipantUpdateListener participantUpdateListener = new ParticipantUpdateListener(participantHandler); + participantUpdateListener.onTopicEvent(INFRA, TOPIC, null, participantUpdateMsg); + } + } + + @Test + void testSendParticipantUpdateAckMessage() throws Exception { + final ParticipantUpdateAck participantUpdateAckMsg = new ParticipantUpdateAck(); + participantUpdateAckMsg.setMessage("ParticipantUpdateAck message"); + participantUpdateAckMsg.setResponseTo(UUID.randomUUID()); + participantUpdateAckMsg.setResult(true); + + synchronized (lockit) { + ParticipantMessagePublisher participantMessagePublisher = + new ParticipantMessagePublisher(Collections.singletonList(Mockito.mock(TopicSink.class))); + participantMessagePublisher.sendParticipantUpdateAck(participantUpdateAckMsg); + } + } + + private ToscaConceptIdentifier getParticipantId() { + return new ToscaConceptIdentifier("org.onap.PM_Policy", "1.0.0"); + } + + private ToscaConceptIdentifier getParticipantType() { + return new ToscaConceptIdentifier("org.onap.policy.controlloop.PolicyControlLoopParticipant", "2.3.1"); + } +} diff --git a/participant/participant-impl/participant-impl-policy/src/test/java/org/onap/policy/clamp/controlloop/participant/policy/endtoend/ParticipantPolicyTest.java b/participant/participant-impl/participant-impl-policy/src/test/java/org/onap/policy/clamp/controlloop/participant/policy/endtoend/ParticipantPolicyTest.java index 6b8323971..45674f4c8 100644 --- a/participant/participant-impl/participant-impl-policy/src/test/java/org/onap/policy/clamp/controlloop/participant/policy/endtoend/ParticipantPolicyTest.java +++ b/participant/participant-impl/participant-impl-policy/src/test/java/org/onap/policy/clamp/controlloop/participant/policy/endtoend/ParticipantPolicyTest.java @@ -28,8 +28,10 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopOrderedState; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopStateChange; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopUpdate; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdate; import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ControlLoopStateChangeListener; import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ControlLoopUpdateListener; +import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantUpdateListener; import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; import org.onap.policy.clamp.controlloop.participant.policy.main.utils.TestListenerUtils; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; diff --git a/participant/participant-impl/participant-impl-policy/src/test/java/org/onap/policy/clamp/controlloop/participant/policy/main/utils/TestListenerUtils.java b/participant/participant-impl/participant-impl-policy/src/test/java/org/onap/policy/clamp/controlloop/participant/policy/main/utils/TestListenerUtils.java index 794b9ff69..d439c9daf 100644 --- a/participant/participant-impl/participant-impl-policy/src/test/java/org/onap/policy/clamp/controlloop/participant/policy/main/utils/TestListenerUtils.java +++ b/participant/participant-impl/participant-impl-policy/src/test/java/org/onap/policy/clamp/controlloop/participant/policy/main/utils/TestListenerUtils.java @@ -31,6 +31,7 @@ import java.util.Set; import java.util.UUID; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElementDefinition; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopOrderedState; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopState; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantState; @@ -38,6 +39,7 @@ import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.Parti import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopUpdate; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantHealthCheck; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStateChange; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdate; import org.onap.policy.clamp.controlloop.participant.policy.main.parameters.CommonTestData; import org.onap.policy.common.utils.coder.Coder; import org.onap.policy.common.utils.coder.CoderException; @@ -192,6 +194,45 @@ public class TestListenerUtils { return clUpdateMsg; } + /** + * Method to create participantUpdateMsg. + * + * @return ParticipantUpdate message + */ + public static ParticipantUpdate createParticipantUpdateMsg() { + final ParticipantUpdate participantUpdateMsg = new ParticipantUpdate(); + ToscaConceptIdentifier participantId = new ToscaConceptIdentifier("org.onap.PM_Policy", "1.0.0"); + ToscaConceptIdentifier participantType = new ToscaConceptIdentifier( + "org.onap.policy.controlloop.PolicyControlLoopParticipant", "2.3.1"); + + participantUpdateMsg.setParticipantId(participantId); + participantUpdateMsg.setTimestamp(Instant.now()); + participantUpdateMsg.setParticipantType(participantType); + participantUpdateMsg.setTimestamp(Instant.ofEpochMilli(3000)); + participantUpdateMsg.setMessageId(UUID.randomUUID()); + + ToscaServiceTemplate toscaServiceTemplate = new ToscaServiceTemplate(); + toscaServiceTemplate.setName("serviceTemplate"); + toscaServiceTemplate.setDerivedFrom("parentServiceTemplate"); + toscaServiceTemplate.setDescription("Description of serviceTemplate"); + toscaServiceTemplate.setVersion("1.2.3"); + + ControlLoopElementDefinition clDefinition = new ControlLoopElementDefinition(); + clDefinition.setId(UUID.randomUUID()); + clDefinition.setControlLoopElementToscaServiceTemplate(toscaServiceTemplate); + Map commonPropertiesMap = Map.of("Prop1", "PropValue"); + clDefinition.setCommonPropertiesMap(commonPropertiesMap); + + Map controlLoopElementDefinitionMap = + Map.of(UUID.randomUUID(), clDefinition); + + Map> + participantDefinitionUpdateMap = Map.of(participantId, controlLoopElementDefinitionMap); + participantUpdateMsg.setParticipantDefinitionUpdateMap(participantDefinitionUpdateMap); + + return participantUpdateMsg; + } + /** * Method to create ParticipantHealthCheck message. * diff --git a/participant/participant-impl/participant-impl-policy/src/test/resources/application_test.properties b/participant/participant-impl/participant-impl-policy/src/test/resources/application_test.properties index 2f260825b..70d52b413 100644 --- a/participant/participant-impl/participant-impl-policy/src/test/resources/application_test.properties +++ b/participant/participant-impl/participant-impl-policy/src/test/resources/application_test.properties @@ -22,6 +22,3 @@ participant.intermediaryParameters.clampControlLoopTopics.topicSources[0].fetchT participant.intermediaryParameters.clampControlLoopTopics.topicSinks[0].topic=POLICY-CLRUNTIME-PARTICIPANT participant.intermediaryParameters.clampControlLoopTopics.topicSinks[0].servers[0]=localhost participant.intermediaryParameters.clampControlLoopTopics.topicSinks[0].topicCommInfrastructure=dmaap -participant.intermediaryParameters.clampControlLoopTopics.topicSinks[1].topic=POLICY-NOTIFICATION -participant.intermediaryParameters.clampControlLoopTopics.topicSinks[1].servers[0]=localhost -participant.intermediaryParameters.clampControlLoopTopics.topicSinks[1].topicCommInfrastructure=dmaap diff --git a/participant/participant-impl/participant-impl-simulator/src/main/resources/config/application.yaml b/participant/participant-impl/participant-impl-simulator/src/main/resources/config/application.yaml index 82e3b89f1..5a9cf1497 100644 --- a/participant/participant-impl/participant-impl-simulator/src/main/resources/config/application.yaml +++ b/participant/participant-impl/participant-impl-simulator/src/main/resources/config/application.yaml @@ -31,7 +31,3 @@ participant: topic: POLICY-CLRUNTIME-PARTICIPANT servers[0]: ${topicServer:message-router} topicCommInfrastructure: dmaap - topicSinks[1]: - topic: POLICY-NOTIFICATION - servers[0]: ${topicServer:message-router} - topicCommInfrastructure: dmaap diff --git a/participant/participant-impl/participant-impl-simulator/src/test/resources/application_test.properties b/participant/participant-impl/participant-impl-simulator/src/test/resources/application_test.properties index 2b30c4ffd..f162367f8 100644 --- a/participant/participant-impl/participant-impl-simulator/src/test/resources/application_test.properties +++ b/participant/participant-impl/participant-impl-simulator/src/test/resources/application_test.properties @@ -23,6 +23,3 @@ participant.intermediaryParameters.clampControlLoopTopics.topicSources[0].fetchT participant.intermediaryParameters.clampControlLoopTopics.topicSinks[0].topic=POLICY-CLRUNTIME-PARTICIPANT participant.intermediaryParameters.clampControlLoopTopics.topicSinks[0].servers[0]=localhost participant.intermediaryParameters.clampControlLoopTopics.topicSinks[0].topicCommInfrastructure=dmaap -participant.intermediaryParameters.clampControlLoopTopics.topicSinks[1].topic=POLICY-NOTIFICATION -participant.intermediaryParameters.clampControlLoopTopics.topicSinks[1].servers[0]=localhost -participant.intermediaryParameters.clampControlLoopTopics.topicSinks[1].topicCommInfrastructure=dmaap diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryApi.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryApi.java index a87299bdc..7e448dc15 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryApi.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/ParticipantIntermediaryApi.java @@ -45,6 +45,16 @@ public interface ParticipantIntermediaryApi { */ void registerControlLoopElementListener(ControlLoopElementListener controlLoopElementListener); + /** + * Send participant register message to controlloop runtime. + */ + void sendParticipantRegister(); + + /** + * Send participant deregister message to controlloop runtime. + */ + void sendParticipantDeregister(); + /** * Get participants loops from the intermediary API. * @@ -114,5 +124,4 @@ public interface ParticipantIntermediaryApi { * @param elementStatistics the updated statistics */ void updateControlLoopElementStatistics(UUID id, ClElementStatistics elementStatistics); - } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/impl/ParticipantIntermediaryApiImpl.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/impl/ParticipantIntermediaryApiImpl.java index 838f47544..9652f1a8d 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/impl/ParticipantIntermediaryApiImpl.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/api/impl/ParticipantIntermediaryApiImpl.java @@ -62,6 +62,16 @@ public class ParticipantIntermediaryApiImpl implements ParticipantIntermediaryAp participantHandler.getControlLoopHandler().registerControlLoopElementListener(controlLoopElementListener); } + @Override + public void sendParticipantRegister() { + participantHandler.sendParticipantRegister(); + } + + @Override + public void sendParticipantDeregister() { + participantHandler.sendParticipantDeregister(); + } + @Override public List getParticipants(String name, String version) { return List.of(participantHandler.getParticipant(name, version)); diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/MessageSender.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/MessageSender.java index 6926bc30b..1bfce1374 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/MessageSender.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/MessageSender.java @@ -30,9 +30,12 @@ import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoop import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoops; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantStatistics; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregister; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantResponseDetails; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantResponseStatus; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck; import org.onap.policy.clamp.controlloop.participant.intermediary.api.ControlLoopElementListener; import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; import org.onap.policy.models.base.PfModelException; @@ -47,7 +50,7 @@ public class MessageSender extends TimerTask implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(MessageSender.class); private final ParticipantHandler participantHandler; - private final ParticipantStatusPublisher publisher; + private final ParticipantMessagePublisher publisher; private ScheduledExecutorService timerPool; /** @@ -57,7 +60,7 @@ public class MessageSender extends TimerTask implements Closeable { * @param publisher the publisher to use for sending messages * @param interval time interval to send Participant Status periodic messages */ - public MessageSender(ParticipantHandler participantHandler, ParticipantStatusPublisher publisher, + public MessageSender(ParticipantHandler participantHandler, ParticipantMessagePublisher publisher, long interval) { this.participantHandler = participantHandler; this.publisher = publisher; @@ -127,11 +130,38 @@ public class MessageSender extends TimerTask implements Closeable { status.setControlLoops(controlLoops); - publisher.send(status); + publisher.sendParticipantStatus(status); } /** - * Update ControlLoopElement statistics. The control loop elements listening will be + * Send a ParticipantRegister message for this participant. + * + * @param message the participantRegister message + */ + public void sendParticipantRegister(ParticipantRegister message) { + publisher.sendParticipantRegister(message); + } + + /** + * Send a ParticipantDeregister message for this participant. + * + * @param message the participantDeRegister message + */ + public void sendParticipantDeregister(ParticipantDeregister message) { + publisher.sendParticipantDeregister(message); + } + + /** + * Send a ParticipantUpdateAck message for this participant update. + * + * @param message the participantUpdateAck message + */ + public void sendParticipantUpdateAck(ParticipantUpdateAck message) { + publisher.sendParticipantUpdateAck(message); + } + + /** + * Update ControlLoopElement statistics. The control loop elements listening will be * notified to retrieve statistics from respective controlloop elements, and controlloopelements * data on the handler will be updated. * diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantAckListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantAckListener.java new file mode 100644 index 000000000..262b21630 --- /dev/null +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantAckListener.java @@ -0,0 +1,57 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.intermediary.comm; + +import java.util.function.Consumer; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantAckMessage; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessage; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegisterAck; +import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.listeners.ScoListener; +import org.onap.policy.common.utils.coder.StandardCoderObject; + +/** + * Abstract Listener for Participant Ack messages sent by runtime. + */ +public abstract class ParticipantAckListener extends ScoListener { + + private final ParticipantHandler participantHandler; + private final Consumer consumer; + + /** + * Constructs the object. + * + * @param clazz class of message this handles + * @param participantHandler ParticipantHandler + * @param consumer function that handles the message + */ + protected ParticipantAckListener(Class clazz, ParticipantHandler participantHandler, Consumer consumer) { + super(clazz); + this.participantHandler = participantHandler; + this.consumer = consumer; + } + + @Override + public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco, T message) { + consumer.accept(message); + } +} diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantDeregisterAckListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantDeregisterAckListener.java new file mode 100644 index 000000000..e20f481f8 --- /dev/null +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantDeregisterAckListener.java @@ -0,0 +1,42 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.intermediary.comm; + +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregisterAck; +import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; +import org.springframework.stereotype.Component; + +/** + * Listener for Participant Deregister Ack messages sent by runtime. + * + */ +@Component +public class ParticipantDeregisterAckListener extends ParticipantAckListener { + + /** + * Constructs the object. + * + * @param participantHandler the handler for managing the state of the participant + */ + public ParticipantDeregisterAckListener(final ParticipantHandler participantHandler) { + super(ParticipantDeregisterAck.class, participantHandler, participantHandler::handleParticipantDeregisterAck); + } +} diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantMessagePublisher.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantMessagePublisher.java new file mode 100644 index 000000000..9e1b84620 --- /dev/null +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantMessagePublisher.java @@ -0,0 +1,93 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.intermediary.comm; + +import java.util.List; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregister; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck; +import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is used to send Participant Status messages to clamp using TopicSinkClient. + * + */ +public class ParticipantMessagePublisher { + private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantMessagePublisher.class); + + private final TopicSinkClient topicSinkClient; + + /** + * Constructor for instantiating ParticipantMessagePublisher. + * + * @param topicSinks the topic sinks + */ + public ParticipantMessagePublisher(List topicSinks) { + if (topicSinks.size() != 1) { + throw new IllegalArgumentException("Configuration unsupported, Topic sinks greater than 1"); + } + this.topicSinkClient = new TopicSinkClient(topicSinks.get(0)); + } + + /** + * Method to send Participant Status message to clamp on demand. + * + * @param participantStatus the Participant Status + */ + public void sendParticipantStatus(final ParticipantStatus participantStatus) { + topicSinkClient.send(participantStatus); + LOGGER.debug("Sent Participant Status message to CLAMP - {}", participantStatus); + } + + /** + * Method to send Participant Status message to clamp on demand. + * + * @param participantRegister the Participant Status + */ + public void sendParticipantRegister(final ParticipantRegister participantRegister) { + topicSinkClient.send(participantRegister); + LOGGER.debug("Sent Participant Register message to CLAMP - {}", participantRegister); + } + + /** + * Method to send Participant Status message to clamp on demand. + * + * @param participantDeregister the Participant Status + */ + public void sendParticipantDeregister(final ParticipantDeregister participantDeregister) { + topicSinkClient.send(participantDeregister); + LOGGER.debug("Sent Participant Deregister message to CLAMP - {}", participantDeregister); + } + + /** + * Method to send Participant Update Ack message to runtime. + * + * @param participantUpdateAck the Participant Update Ack + */ + public void sendParticipantUpdateAck(final ParticipantUpdateAck participantUpdateAck) { + topicSinkClient.send(participantUpdateAck); + LOGGER.debug("Sent Participant Update Ack message to CLAMP - {}", participantUpdateAck); + } +} diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantRegisterAckListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantRegisterAckListener.java new file mode 100644 index 000000000..a15a2a850 --- /dev/null +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantRegisterAckListener.java @@ -0,0 +1,42 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.intermediary.comm; + +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegisterAck; +import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; +import org.springframework.stereotype.Component; + +/** + * Listener for Participant Register Ack messages sent by runtime. + * + */ +@Component +public class ParticipantRegisterAckListener extends ParticipantAckListener { + + /** + * Constructs the object. + * + * @param participantHandler the handler for managing the state of the participant + */ + public ParticipantRegisterAckListener(final ParticipantHandler participantHandler) { + super(ParticipantRegisterAck.class, participantHandler, participantHandler::handleParticipantRegisterAck); + } +} diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusPublisher.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusPublisher.java deleted file mode 100644 index 78b998453..000000000 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantStatusPublisher.java +++ /dev/null @@ -1,57 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * Copyright (C) 2021 Nordix Foundation. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.policy.clamp.controlloop.participant.intermediary.comm; - -import java.util.List; -import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStatus; -import org.onap.policy.common.endpoints.event.comm.TopicSink; -import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This class is used to send Participant Status messages to clamp using TopicSinkClient. - * - */ -public class ParticipantStatusPublisher { - private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantStatusPublisher.class); - - private final TopicSinkClient topicSinkClient; - - /** - * Constructor for instantiating ParticipantStatusPublisher. - * - * @param topicSinks the topic sinks - */ - public ParticipantStatusPublisher(List topicSinks) { - this.topicSinkClient = new TopicSinkClient(topicSinks.get(0)); - } - - /** - * Method to send Participant Status message to clamp on demand. - * - * @param participantStatus the Participant Status - */ - public void send(final ParticipantStatus participantStatus) { - topicSinkClient.send(participantStatus); - LOGGER.debug("Sent Participant Status message to CLAMP - {}", participantStatus); - } -} diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantUpdateListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantUpdateListener.java new file mode 100644 index 000000000..42bd52d9a --- /dev/null +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/comm/ParticipantUpdateListener.java @@ -0,0 +1,41 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.intermediary.comm; + +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdate; +import org.onap.policy.clamp.controlloop.participant.intermediary.handler.ParticipantHandler; +import org.springframework.stereotype.Component; + +/** + * Listener for Participant Update messages sent by runtime. + */ +@Component +public class ParticipantUpdateListener extends ParticipantListener { + + /** + * Constructs the object. + * + * @param participantHandler the handler for managing the state of the participant + */ + public ParticipantUpdateListener(final ParticipantHandler participantHandler) { + super(ParticipantUpdate.class, participantHandler, participantHandler::handleParticipantUpdate); + } +} diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/config/BeanFactory.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/config/BeanFactory.java index dc7d87eec..e363504a5 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/config/BeanFactory.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/config/BeanFactory.java @@ -21,7 +21,7 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.config; import java.util.List; -import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantStatusPublisher; +import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantMessagePublisher; import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantParameters; import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; import org.onap.policy.common.endpoints.event.comm.TopicSink; @@ -36,16 +36,16 @@ public class BeanFactory { private static final String[] MSG_TYPE_NAMES = {"messageType"}; /** - * create ParticipantStatusPublisher. + * create ParticipantMessagePublisher. * * @param parameters the ParticipantParameters - * @return ParticipantStatusPublisher + * @return ParticipantMessagePublisher */ @Bean - public ParticipantStatusPublisher publisher(final ParticipantParameters parameters) { + public ParticipantMessagePublisher publisher(final ParticipantParameters parameters) { List topicSinks = TopicEndpointManager.getManager() .addTopicSinks(parameters.getIntermediaryParameters().getClampControlLoopTopics().getTopicSinks()); - return new ParticipantStatusPublisher(topicSinks); + return new ParticipantMessagePublisher(topicSinks); } @Bean diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java index 2d789d40d..0aa536746 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/IntermediaryActivator.java @@ -24,10 +24,14 @@ import java.io.Closeable; import java.io.IOException; import java.util.List; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessageType; +import org.onap.policy.clamp.controlloop.participant.intermediary.api.ParticipantIntermediaryApi; import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ControlLoopStateChangeListener; import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ControlLoopUpdateListener; +import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantDeregisterAckListener; import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantHealthCheckListener; +import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantRegisterAckListener; import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantStateChangeListener; +import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantUpdateListener; import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantParameters; import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; import org.onap.policy.common.endpoints.event.comm.TopicSource; @@ -50,14 +54,18 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl // Topics from which the participant receives and to which the participant sends messages private List topicSources; + ParticipantIntermediaryApi participantIntermediaryApi; + /** * Instantiate the activator for participant. * * @param applicationContext ApplicationContext * @param parameters the ParticipantParameters */ - public IntermediaryActivator(final ApplicationContext applicationContext, final ParticipantParameters parameters) { + public IntermediaryActivator(final ApplicationContext applicationContext, final ParticipantParameters parameters, + ParticipantIntermediaryApi participantIntermediaryApi) { this.applicationContext = applicationContext; + this.participantIntermediaryApi = participantIntermediaryApi; topicSources = TopicEndpointManager.getManager() .addTopicSources(parameters.getIntermediaryParameters().getClampControlLoopTopics().getTopicSources()); @@ -81,6 +89,7 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl public void handleContextRefreshEvent(ContextRefreshedEvent ctxRefreshedEvent) { if (!isAlive()) { start(); + sendParticipantRegister(); } } @@ -92,10 +101,19 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl @EventListener public void handleContextClosedEvent(ContextClosedEvent ctxClosedEvent) { if (isAlive()) { + sendParticipantDeregister(); stop(); } } + private void sendParticipantRegister() { + participantIntermediaryApi.sendParticipantRegister(); + } + + private void sendParticipantDeregister() { + participantIntermediaryApi.sendParticipantDeregister(); + } + /** * Registers the dispatcher with the topic source(s). */ @@ -114,6 +132,15 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl msgDispatcher.register(ParticipantMessageType.PARTICIPANT_CONTROL_LOOP_UPDATE.name(), applicationContext.getBean(ControlLoopUpdateListener.class)); + msgDispatcher.register(ParticipantMessageType.PARTICIPANT_REGISTER_ACK.name(), + applicationContext.getBean(ParticipantRegisterAckListener.class)); + + msgDispatcher.register(ParticipantMessageType.PARTICIPANT_DEREGISTER_ACK.name(), + applicationContext.getBean(ParticipantDeregisterAckListener.class)); + + msgDispatcher.register(ParticipantMessageType.PARTICIPANT_UPDATE.name(), + applicationContext.getBean(ParticipantUpdateListener.class)); + for (final TopicSource source : topicSources) { source.register(msgDispatcher); } diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java index 1c54658fa..a8913c1f0 100644 --- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java +++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/controlloop/participant/intermediary/handler/ParticipantHandler.java @@ -21,22 +21,32 @@ package org.onap.policy.clamp.controlloop.participant.intermediary.handler; import java.io.Closeable; +import java.util.LinkedHashMap; +import java.util.Map; import java.util.Objects; +import java.util.UUID; import lombok.Getter; import lombok.Setter; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElementDefinition; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.Participant; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantHealthStatus; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantState; import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ParticipantStatistics; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopStateChange; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantControlLoopUpdate; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregister; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantDeregisterAck; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantHealthCheck; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantMessage; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegister; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantRegisterAck; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantResponseDetails; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantResponseStatus; import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantStateChange; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdate; +import org.onap.policy.clamp.controlloop.models.messages.dmaap.participant.ParticipantUpdateAck; import org.onap.policy.clamp.controlloop.participant.intermediary.comm.MessageSender; -import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantStatusPublisher; +import org.onap.policy.clamp.controlloop.participant.intermediary.comm.ParticipantMessagePublisher; import org.onap.policy.clamp.controlloop.participant.intermediary.parameters.ParticipantParameters; import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; import org.slf4j.Logger; @@ -63,13 +73,15 @@ public class ParticipantHandler implements Closeable { @Setter private ParticipantHealthStatus healthStatus = ParticipantHealthStatus.UNKNOWN; + private final Map clElementDefsOnThisParticipant = new LinkedHashMap<>(); + /** * Constructor, set the participant ID and sender. * * @param parameters the parameters of the participant * @param publisher the publisher for sending responses to messages */ - public ParticipantHandler(ParticipantParameters parameters, ParticipantStatusPublisher publisher) { + public ParticipantHandler(ParticipantParameters parameters, ParticipantMessagePublisher publisher) { this.participantType = parameters.getIntermediaryParameters().getParticipantType(); this.participantId = parameters.getIntermediaryParameters().getParticipantId(); this.sender = @@ -249,20 +261,87 @@ public class ParticipantHandler implements Closeable { /** * Check if a participant message applies to this participant handler. * - * @param partipantMsg the message to check + * @param participantMsg the message to check * @return true if it applies, false otherwise */ - public boolean canHandle(ParticipantMessage partipantMsg) { - return partipantMsg.appliesTo(participantType, participantId); + public boolean appliesTo(ParticipantMessage participantMsg) { + return participantMsg.appliesTo(participantType, participantId); } /** - * Check if a participant message applies to this participant handler. + * Method to send ParticipantRegister message to controlloop runtime. + */ + public void sendParticipantRegister() { + var participantRegister = new ParticipantRegister(); + participantRegister.setParticipantId(participantId); + participantRegister.setParticipantType(participantType); + + sender.sendParticipantRegister(participantRegister); + } + + /** + * Handle a participantRegister Ack message. * - * @param partipantMsg the message to check - * @return true if it applies, false otherwise + * @param participantRegisterAckMsg the participantRegisterAck message + */ + public void handleParticipantRegisterAck(ParticipantRegisterAck participantRegisterAckMsg) { + LOGGER.debug("ParticipantRegisterAck message received as responseTo {}", + participantRegisterAckMsg.getResponseTo()); + } + + /** + * Method to send ParticipantDeregister message to controlloop runtime. + */ + public void sendParticipantDeregister() { + var participantDeregister = new ParticipantDeregister(); + participantDeregister.setParticipantId(participantId); + participantDeregister.setParticipantType(participantType); + + sender.sendParticipantDeregister(participantDeregister); + } + + /** + * Handle a participantDeregister Ack message. + * + * @param participantDeregisterAckMsg the participantDeregisterAck message */ - public boolean appliesTo(ParticipantMessage partipantMsg) { - return partipantMsg.appliesTo(participantType, participantId); + public void handleParticipantDeregisterAck(ParticipantDeregisterAck participantDeregisterAckMsg) { + LOGGER.debug("ParticipantDeregisterAck message received as responseTo {}", + participantDeregisterAckMsg.getResponseTo()); + } + + /** + * Handle a ParticipantUpdate message. + * + * @param participantUpdateMsg the ParticipantUpdate message + */ + public void handleParticipantUpdate(ParticipantUpdate participantUpdateMsg) { + LOGGER.debug("ParticipantUpdate message received for participantId {}", + participantUpdateMsg.getParticipantId()); + + if (!participantUpdateMsg.appliesTo(participantType, participantId)) { + return; + } + + Map clDefinitionMap = + participantUpdateMsg.getParticipantDefinitionUpdateMap().get(participantUpdateMsg.getParticipantId()); + + for (ControlLoopElementDefinition element : clDefinitionMap.values()) { + clElementDefsOnThisParticipant.put(element.getId(), element); + } + + sendParticipantUpdateAck(participantUpdateMsg.getMessageId()); + } + + /** + * Method to send ParticipantUpdateAck message to controlloop runtime. + */ + public void sendParticipantUpdateAck(UUID messageId) { + var participantUpdateAck = new ParticipantUpdateAck(); + participantUpdateAck.setResponseTo(messageId); + participantUpdateAck.setMessage("Participant Update Ack message"); + participantUpdateAck.setResult(true); + + sender.sendParticipantUpdateAck(participantUpdateAck); } } -- cgit 1.2.3-korg