summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLiam Fallon <liam.fallon@est.tech>2022-08-29 20:42:38 +0000
committerGerrit Code Review <gerrit@onap.org>2022-08-29 20:42:38 +0000
commitde9211996ad934d231c4950debc2bd49094d547b (patch)
treeb238f35dfdc68fd4b847604fd14a548fa1f06433
parent71c9053f4b27fd1b8f30fd358c9b465b057eaea3 (diff)
parentdf9dc5ca6f5a6fed5af6c322a582d70d8114b21b (diff)
Merge "Add new topic for publishing events in Acm Element Impl"
-rw-r--r--models/src/main/java/org/onap/policy/clamp/models/acm/messages/rest/element/DmaapConfig.java4
-rw-r--r--models/src/main/java/org/onap/policy/clamp/models/acm/messages/rest/element/ElementConfig.java4
-rw-r--r--participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/main/parameters/ElementTopicParameters.java44
-rw-r--r--participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/BridgeService.java2
-rw-r--r--participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/ConfigService.java16
-rw-r--r--participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/StarterService.java10
-rw-r--r--participant/participant-impl/participant-impl-acelement/src/test/java/org/onap/policy/clamp/acm/element/service/BridgeServiceTest.java2
-rw-r--r--participant/participant-impl/participant-impl-acelement/src/test/java/org/onap/policy/clamp/acm/element/service/ConfigServiceTest.java3
-rw-r--r--participant/participant-impl/participant-impl-acelement/src/test/java/org/onap/policy/clamp/acm/element/service/StarterServiceTest.java4
9 files changed, 67 insertions, 22 deletions
diff --git a/models/src/main/java/org/onap/policy/clamp/models/acm/messages/rest/element/DmaapConfig.java b/models/src/main/java/org/onap/policy/clamp/models/acm/messages/rest/element/DmaapConfig.java
index 0d13bcda5..dad92afa9 100644
--- a/models/src/main/java/org/onap/policy/clamp/models/acm/messages/rest/element/DmaapConfig.java
+++ b/models/src/main/java/org/onap/policy/clamp/models/acm/messages/rest/element/DmaapConfig.java
@@ -26,7 +26,9 @@ import lombok.Data;
public class DmaapConfig {
private String server;
- private String topic;
+ private String listenerTopic;
+
+ private String publisherTopic;
private Integer fetchTimeout;
diff --git a/models/src/main/java/org/onap/policy/clamp/models/acm/messages/rest/element/ElementConfig.java b/models/src/main/java/org/onap/policy/clamp/models/acm/messages/rest/element/ElementConfig.java
index d1f064823..c86f7f225 100644
--- a/models/src/main/java/org/onap/policy/clamp/models/acm/messages/rest/element/ElementConfig.java
+++ b/models/src/main/java/org/onap/policy/clamp/models/acm/messages/rest/element/ElementConfig.java
@@ -26,11 +26,11 @@ import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
@Data
public class ElementConfig {
- private ToscaConceptIdentifier elementId;
+ private ToscaConceptIdentifier receiverId;
private ElementType elementType;
- private Integer timerSec;
+ private Integer timerMs;
private DmaapConfig topicParameterGroup;
}
diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/main/parameters/ElementTopicParameters.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/main/parameters/ElementTopicParameters.java
new file mode 100644
index 000000000..21394401c
--- /dev/null
+++ b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/main/parameters/ElementTopicParameters.java
@@ -0,0 +1,44 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.element.main.parameters;
+
+import java.util.List;
+import lombok.Data;
+import org.onap.policy.clamp.models.acm.messages.rest.element.DmaapConfig;
+import org.onap.policy.common.endpoints.parameters.TopicParameters;
+
+@Data
+public class ElementTopicParameters extends TopicParameters {
+
+ /**
+ * Constructor.
+ * @param parameters DmaapConfig
+ */
+ public ElementTopicParameters(DmaapConfig parameters) {
+ super();
+ this.setTopic(parameters.getListenerTopic());
+ this.setServers(List.of(parameters.getServer()));
+ this.setFetchTimeout(parameters.getFetchTimeout());
+ this.setTopicCommInfrastructure(parameters.getTopicCommInfrastructure());
+ this.setUseHttps(parameters.isUseHttps());
+ }
+
+}
diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/BridgeService.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/BridgeService.java
index 18f3a6fdd..f374f50d1 100644
--- a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/BridgeService.java
+++ b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/BridgeService.java
@@ -60,6 +60,6 @@ public class BridgeService extends AbstractElementService {
@Override
public void active(ElementConfig elementConfig) {
- receiver = elementConfig.getElementId();
+ receiver = elementConfig.getReceiverId();
}
}
diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/ConfigService.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/ConfigService.java
index 4ffed197c..78ddad681 100644
--- a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/ConfigService.java
+++ b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/ConfigService.java
@@ -26,11 +26,11 @@ import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.onap.policy.clamp.acm.element.handler.MessageActivator;
import org.onap.policy.clamp.acm.element.handler.MessageHandler;
+import org.onap.policy.clamp.acm.element.main.parameters.ElementTopicParameters;
import org.onap.policy.clamp.common.acm.exception.AutomationCompositionRuntimeException;
import org.onap.policy.clamp.models.acm.messages.dmaap.element.ElementMessage;
import org.onap.policy.clamp.models.acm.messages.rest.element.ElementConfig;
import org.onap.policy.common.endpoints.parameters.TopicParameterGroup;
-import org.onap.policy.common.endpoints.parameters.TopicParameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
@@ -52,16 +52,14 @@ public class ConfigService {
* @param elementConfig the configuration
*/
public void activateElement(@NonNull ElementConfig elementConfig) {
- var topicParameters = new TopicParameters();
- topicParameters.setTopic(elementConfig.getTopicParameterGroup().getTopic());
- topicParameters.setServers(List.of(elementConfig.getTopicParameterGroup().getServer()));
- topicParameters.setFetchTimeout(elementConfig.getTopicParameterGroup().getFetchTimeout());
- topicParameters.setTopicCommInfrastructure(elementConfig.getTopicParameterGroup().getTopicCommInfrastructure());
- topicParameters.setUseHttps(elementConfig.getTopicParameterGroup().isUseHttps());
+ var listenerTopicParameters = new ElementTopicParameters(elementConfig.getTopicParameterGroup());
+
+ var publisherTopicParameters = new ElementTopicParameters(elementConfig.getTopicParameterGroup());
+ publisherTopicParameters.setTopic(elementConfig.getTopicParameterGroup().getPublisherTopic());
var parameters = new TopicParameterGroup();
- parameters.setTopicSinks(List.of(topicParameters));
- parameters.setTopicSources(List.of(topicParameters));
+ parameters.setTopicSinks(List.of(publisherTopicParameters));
+ parameters.setTopicSources(List.of(listenerTopicParameters));
if (!parameters.isValid()) {
throw new AutomationCompositionRuntimeException(Response.Status.BAD_REQUEST,
diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/StarterService.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/StarterService.java
index 589397db7..479fa44b6 100644
--- a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/StarterService.java
+++ b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/StarterService.java
@@ -75,12 +75,12 @@ public class StarterService extends AbstractElementService implements AutoClosea
if (timerPool != null) {
throw new PfModelRuntimeException(Response.Status.CONFLICT, "StarterService alredy actived!");
}
- receiver = elementConfig.getElementId();
+ receiver = elementConfig.getReceiverId();
timerPool = new ScheduledThreadPoolExecutor(1);
timerPool.setRemoveOnCancelPolicy(true);
- future = timerPool.scheduleAtFixedRate(this::sendMessage, elementConfig.getTimerSec(),
- elementConfig.getTimerSec(), TimeUnit.MILLISECONDS);
+ future = timerPool.scheduleAtFixedRate(this::sendMessage, elementConfig.getTimerMs(),
+ elementConfig.getTimerMs(), TimeUnit.MILLISECONDS);
}
private void sendMessage() {
@@ -99,8 +99,8 @@ public class StarterService extends AbstractElementService implements AutoClosea
if (future != null) {
future.cancel(true);
}
- future = timerPool.scheduleAtFixedRate(this::sendMessage, elementConfig.getTimerSec(),
- elementConfig.getTimerSec(), TimeUnit.MILLISECONDS);
+ future = timerPool.scheduleAtFixedRate(this::sendMessage, elementConfig.getTimerMs(),
+ elementConfig.getTimerMs(), TimeUnit.MILLISECONDS);
}
@Override
diff --git a/participant/participant-impl/participant-impl-acelement/src/test/java/org/onap/policy/clamp/acm/element/service/BridgeServiceTest.java b/participant/participant-impl/participant-impl-acelement/src/test/java/org/onap/policy/clamp/acm/element/service/BridgeServiceTest.java
index cc62d8d15..aae8d5893 100644
--- a/participant/participant-impl/participant-impl-acelement/src/test/java/org/onap/policy/clamp/acm/element/service/BridgeServiceTest.java
+++ b/participant/participant-impl/participant-impl-acelement/src/test/java/org/onap/policy/clamp/acm/element/service/BridgeServiceTest.java
@@ -47,7 +47,7 @@ class BridgeServiceTest {
assertThat(bridgeService.getType()).isEqualTo(ElementType.BRIDGE);
var elementConfig = new ElementConfig();
- elementConfig.setElementId(new ToscaConceptIdentifier("onap.policy.clamp.ac.element2", "1.0.0"));
+ elementConfig.setReceiverId(new ToscaConceptIdentifier("onap.policy.clamp.ac.element2", "1.0.0"));
bridgeService.active(elementConfig);
bridgeService.handleMessage(new ElementStatus());
diff --git a/participant/participant-impl/participant-impl-acelement/src/test/java/org/onap/policy/clamp/acm/element/service/ConfigServiceTest.java b/participant/participant-impl/participant-impl-acelement/src/test/java/org/onap/policy/clamp/acm/element/service/ConfigServiceTest.java
index 785673bae..156f7d3fc 100644
--- a/participant/participant-impl/participant-impl-acelement/src/test/java/org/onap/policy/clamp/acm/element/service/ConfigServiceTest.java
+++ b/participant/participant-impl/participant-impl-acelement/src/test/java/org/onap/policy/clamp/acm/element/service/ConfigServiceTest.java
@@ -39,7 +39,8 @@ class ConfigServiceTest {
var elementConfig = new ElementConfig();
elementConfig.setTopicParameterGroup(new DmaapConfig());
elementConfig.getTopicParameterGroup().setTopicCommInfrastructure("dmaap");
- elementConfig.getTopicParameterGroup().setTopic("topic");
+ elementConfig.getTopicParameterGroup().setListenerTopic("topic");
+ elementConfig.getTopicParameterGroup().setPublisherTopic("topic");
elementConfig.getTopicParameterGroup().setServer("localhost");
elementConfig.getTopicParameterGroup().setFetchTimeout(1000);
diff --git a/participant/participant-impl/participant-impl-acelement/src/test/java/org/onap/policy/clamp/acm/element/service/StarterServiceTest.java b/participant/participant-impl/participant-impl-acelement/src/test/java/org/onap/policy/clamp/acm/element/service/StarterServiceTest.java
index ee58a352c..28af70de7 100644
--- a/participant/participant-impl/participant-impl-acelement/src/test/java/org/onap/policy/clamp/acm/element/service/StarterServiceTest.java
+++ b/participant/participant-impl/participant-impl-acelement/src/test/java/org/onap/policy/clamp/acm/element/service/StarterServiceTest.java
@@ -47,8 +47,8 @@ class StarterServiceTest {
assertThat(starterService.getType()).isEqualTo(ElementType.STARTER);
var elementConfig = new ElementConfig();
- elementConfig.setTimerSec(100);
- elementConfig.setElementId(new ToscaConceptIdentifier("onap.policy.clamp.ac.element2", "1.0.0"));
+ elementConfig.setTimerMs(100);
+ elementConfig.setReceiverId(new ToscaConceptIdentifier("onap.policy.clamp.ac.element2", "1.0.0"));
starterService.active(elementConfig);
verify(messagePublisher, timeout(200).atLeastOnce()).publishMsg(any(ElementMessage.class));
starterService.deactivate();