diff options
author | rameshiyer27 <ramesh.murugan.iyer@est.tech> | 2022-08-26 10:56:31 +0100 |
---|---|---|
committer | rameshiyer27 <ramesh.murugan.iyer@est.tech> | 2022-08-26 15:19:59 +0100 |
commit | df9dc5ca6f5a6fed5af6c322a582d70d8114b21b (patch) | |
tree | b79b76c14515810e8fc10398727d251348818ecf | |
parent | e7beba3d152cb8df1809fbe8a644f227a441957e (diff) |
Add new topic for publishing events in Acm Element Impl
Listening and publishing on different topics for better readability of
logs and segregation of events between PDP and AC element.
Issue-ID: POLICY-4332
Signed-off-by: zrrmmua <ramesh.murugan.iyer@est.tech>
Change-Id: I7c44ba1498c73a8bd395ad54eeb09950c584156e
9 files changed, 67 insertions, 22 deletions
diff --git a/models/src/main/java/org/onap/policy/clamp/models/acm/messages/rest/element/DmaapConfig.java b/models/src/main/java/org/onap/policy/clamp/models/acm/messages/rest/element/DmaapConfig.java index 0d13bcda5..dad92afa9 100644 --- a/models/src/main/java/org/onap/policy/clamp/models/acm/messages/rest/element/DmaapConfig.java +++ b/models/src/main/java/org/onap/policy/clamp/models/acm/messages/rest/element/DmaapConfig.java @@ -26,7 +26,9 @@ import lombok.Data; public class DmaapConfig { private String server; - private String topic; + private String listenerTopic; + + private String publisherTopic; private Integer fetchTimeout; diff --git a/models/src/main/java/org/onap/policy/clamp/models/acm/messages/rest/element/ElementConfig.java b/models/src/main/java/org/onap/policy/clamp/models/acm/messages/rest/element/ElementConfig.java index d1f064823..c86f7f225 100644 --- a/models/src/main/java/org/onap/policy/clamp/models/acm/messages/rest/element/ElementConfig.java +++ b/models/src/main/java/org/onap/policy/clamp/models/acm/messages/rest/element/ElementConfig.java @@ -26,11 +26,11 @@ import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; @Data public class ElementConfig { - private ToscaConceptIdentifier elementId; + private ToscaConceptIdentifier receiverId; private ElementType elementType; - private Integer timerSec; + private Integer timerMs; private DmaapConfig topicParameterGroup; } diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/main/parameters/ElementTopicParameters.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/main/parameters/ElementTopicParameters.java new file mode 100644 index 000000000..21394401c --- /dev/null +++ b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/main/parameters/ElementTopicParameters.java @@ -0,0 +1,44 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.acm.element.main.parameters; + +import java.util.List; +import lombok.Data; +import org.onap.policy.clamp.models.acm.messages.rest.element.DmaapConfig; +import org.onap.policy.common.endpoints.parameters.TopicParameters; + +@Data +public class ElementTopicParameters extends TopicParameters { + + /** + * Constructor. + * @param parameters DmaapConfig + */ + public ElementTopicParameters(DmaapConfig parameters) { + super(); + this.setTopic(parameters.getListenerTopic()); + this.setServers(List.of(parameters.getServer())); + this.setFetchTimeout(parameters.getFetchTimeout()); + this.setTopicCommInfrastructure(parameters.getTopicCommInfrastructure()); + this.setUseHttps(parameters.isUseHttps()); + } + +} diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/BridgeService.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/BridgeService.java index 18f3a6fdd..f374f50d1 100644 --- a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/BridgeService.java +++ b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/BridgeService.java @@ -60,6 +60,6 @@ public class BridgeService extends AbstractElementService { @Override public void active(ElementConfig elementConfig) { - receiver = elementConfig.getElementId(); + receiver = elementConfig.getReceiverId(); } } diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/ConfigService.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/ConfigService.java index f8f9024f8..255974d22 100644 --- a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/ConfigService.java +++ b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/ConfigService.java @@ -26,11 +26,11 @@ import lombok.NonNull; import lombok.RequiredArgsConstructor; import org.onap.policy.clamp.acm.element.handler.MessageActivator; import org.onap.policy.clamp.acm.element.handler.MessageHandler; +import org.onap.policy.clamp.acm.element.main.parameters.ElementTopicParameters; import org.onap.policy.clamp.common.acm.exception.AutomationCompositionRuntimeException; import org.onap.policy.clamp.models.acm.messages.dmaap.element.ElementMessage; import org.onap.policy.clamp.models.acm.messages.rest.element.ElementConfig; import org.onap.policy.common.endpoints.parameters.TopicParameterGroup; -import org.onap.policy.common.endpoints.parameters.TopicParameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; @@ -52,16 +52,14 @@ public class ConfigService { * @param elementConfig the configuration */ public void activateElement(@NonNull ElementConfig elementConfig) { - var topicParameters = new TopicParameters(); - topicParameters.setTopic(elementConfig.getTopicParameterGroup().getTopic()); - topicParameters.setServers(List.of(elementConfig.getTopicParameterGroup().getServer())); - topicParameters.setFetchTimeout(elementConfig.getTopicParameterGroup().getFetchTimeout()); - topicParameters.setTopicCommInfrastructure(elementConfig.getTopicParameterGroup().getTopicCommInfrastructure()); - topicParameters.setUseHttps(elementConfig.getTopicParameterGroup().isUseHttps()); + var listenerTopicParameters = new ElementTopicParameters(elementConfig.getTopicParameterGroup()); + + var publisherTopicParameters = new ElementTopicParameters(elementConfig.getTopicParameterGroup()); + publisherTopicParameters.setTopic(elementConfig.getTopicParameterGroup().getPublisherTopic()); var parameters = new TopicParameterGroup(); - parameters.setTopicSinks(List.of(topicParameters)); - parameters.setTopicSources(List.of(topicParameters)); + parameters.setTopicSinks(List.of(publisherTopicParameters)); + parameters.setTopicSources(List.of(listenerTopicParameters)); if (!parameters.isValid()) { throw new AutomationCompositionRuntimeException(Response.Status.BAD_REQUEST, diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/StarterService.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/StarterService.java index 589397db7..479fa44b6 100644 --- a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/StarterService.java +++ b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/StarterService.java @@ -75,12 +75,12 @@ public class StarterService extends AbstractElementService implements AutoClosea if (timerPool != null) { throw new PfModelRuntimeException(Response.Status.CONFLICT, "StarterService alredy actived!"); } - receiver = elementConfig.getElementId(); + receiver = elementConfig.getReceiverId(); timerPool = new ScheduledThreadPoolExecutor(1); timerPool.setRemoveOnCancelPolicy(true); - future = timerPool.scheduleAtFixedRate(this::sendMessage, elementConfig.getTimerSec(), - elementConfig.getTimerSec(), TimeUnit.MILLISECONDS); + future = timerPool.scheduleAtFixedRate(this::sendMessage, elementConfig.getTimerMs(), + elementConfig.getTimerMs(), TimeUnit.MILLISECONDS); } private void sendMessage() { @@ -99,8 +99,8 @@ public class StarterService extends AbstractElementService implements AutoClosea if (future != null) { future.cancel(true); } - future = timerPool.scheduleAtFixedRate(this::sendMessage, elementConfig.getTimerSec(), - elementConfig.getTimerSec(), TimeUnit.MILLISECONDS); + future = timerPool.scheduleAtFixedRate(this::sendMessage, elementConfig.getTimerMs(), + elementConfig.getTimerMs(), TimeUnit.MILLISECONDS); } @Override diff --git a/participant/participant-impl/participant-impl-acelement/src/test/java/org/onap/policy/clamp/acm/element/service/BridgeServiceTest.java b/participant/participant-impl/participant-impl-acelement/src/test/java/org/onap/policy/clamp/acm/element/service/BridgeServiceTest.java index cc62d8d15..aae8d5893 100644 --- a/participant/participant-impl/participant-impl-acelement/src/test/java/org/onap/policy/clamp/acm/element/service/BridgeServiceTest.java +++ b/participant/participant-impl/participant-impl-acelement/src/test/java/org/onap/policy/clamp/acm/element/service/BridgeServiceTest.java @@ -47,7 +47,7 @@ class BridgeServiceTest { assertThat(bridgeService.getType()).isEqualTo(ElementType.BRIDGE); var elementConfig = new ElementConfig(); - elementConfig.setElementId(new ToscaConceptIdentifier("onap.policy.clamp.ac.element2", "1.0.0")); + elementConfig.setReceiverId(new ToscaConceptIdentifier("onap.policy.clamp.ac.element2", "1.0.0")); bridgeService.active(elementConfig); bridgeService.handleMessage(new ElementStatus()); diff --git a/participant/participant-impl/participant-impl-acelement/src/test/java/org/onap/policy/clamp/acm/element/service/ConfigServiceTest.java b/participant/participant-impl/participant-impl-acelement/src/test/java/org/onap/policy/clamp/acm/element/service/ConfigServiceTest.java index 785673bae..156f7d3fc 100644 --- a/participant/participant-impl/participant-impl-acelement/src/test/java/org/onap/policy/clamp/acm/element/service/ConfigServiceTest.java +++ b/participant/participant-impl/participant-impl-acelement/src/test/java/org/onap/policy/clamp/acm/element/service/ConfigServiceTest.java @@ -39,7 +39,8 @@ class ConfigServiceTest { var elementConfig = new ElementConfig(); elementConfig.setTopicParameterGroup(new DmaapConfig()); elementConfig.getTopicParameterGroup().setTopicCommInfrastructure("dmaap"); - elementConfig.getTopicParameterGroup().setTopic("topic"); + elementConfig.getTopicParameterGroup().setListenerTopic("topic"); + elementConfig.getTopicParameterGroup().setPublisherTopic("topic"); elementConfig.getTopicParameterGroup().setServer("localhost"); elementConfig.getTopicParameterGroup().setFetchTimeout(1000); diff --git a/participant/participant-impl/participant-impl-acelement/src/test/java/org/onap/policy/clamp/acm/element/service/StarterServiceTest.java b/participant/participant-impl/participant-impl-acelement/src/test/java/org/onap/policy/clamp/acm/element/service/StarterServiceTest.java index ee58a352c..28af70de7 100644 --- a/participant/participant-impl/participant-impl-acelement/src/test/java/org/onap/policy/clamp/acm/element/service/StarterServiceTest.java +++ b/participant/participant-impl/participant-impl-acelement/src/test/java/org/onap/policy/clamp/acm/element/service/StarterServiceTest.java @@ -47,8 +47,8 @@ class StarterServiceTest { assertThat(starterService.getType()).isEqualTo(ElementType.STARTER); var elementConfig = new ElementConfig(); - elementConfig.setTimerSec(100); - elementConfig.setElementId(new ToscaConceptIdentifier("onap.policy.clamp.ac.element2", "1.0.0")); + elementConfig.setTimerMs(100); + elementConfig.setReceiverId(new ToscaConceptIdentifier("onap.policy.clamp.ac.element2", "1.0.0")); starterService.active(elementConfig); verify(messagePublisher, timeout(200).atLeastOnce()).publishMsg(any(ElementMessage.class)); starterService.deactivate(); |