summaryrefslogtreecommitdiffstats
path: root/participant/participant-impl/participant-impl-acelement/src/main
diff options
context:
space:
mode:
authorLiam Fallon <liam.fallon@est.tech>2022-08-29 20:42:38 +0000
committerGerrit Code Review <gerrit@onap.org>2022-08-29 20:42:38 +0000
commitde9211996ad934d231c4950debc2bd49094d547b (patch)
treeb238f35dfdc68fd4b847604fd14a548fa1f06433 /participant/participant-impl/participant-impl-acelement/src/main
parent71c9053f4b27fd1b8f30fd358c9b465b057eaea3 (diff)
parentdf9dc5ca6f5a6fed5af6c322a582d70d8114b21b (diff)
Merge "Add new topic for publishing events in Acm Element Impl"
Diffstat (limited to 'participant/participant-impl/participant-impl-acelement/src/main')
-rw-r--r--participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/main/parameters/ElementTopicParameters.java44
-rw-r--r--participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/BridgeService.java2
-rw-r--r--participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/ConfigService.java16
-rw-r--r--participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/StarterService.java10
4 files changed, 57 insertions, 15 deletions
diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/main/parameters/ElementTopicParameters.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/main/parameters/ElementTopicParameters.java
new file mode 100644
index 000000000..21394401c
--- /dev/null
+++ b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/main/parameters/ElementTopicParameters.java
@@ -0,0 +1,44 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.element.main.parameters;
+
+import java.util.List;
+import lombok.Data;
+import org.onap.policy.clamp.models.acm.messages.rest.element.DmaapConfig;
+import org.onap.policy.common.endpoints.parameters.TopicParameters;
+
+@Data
+public class ElementTopicParameters extends TopicParameters {
+
+ /**
+ * Constructor.
+ * @param parameters DmaapConfig
+ */
+ public ElementTopicParameters(DmaapConfig parameters) {
+ super();
+ this.setTopic(parameters.getListenerTopic());
+ this.setServers(List.of(parameters.getServer()));
+ this.setFetchTimeout(parameters.getFetchTimeout());
+ this.setTopicCommInfrastructure(parameters.getTopicCommInfrastructure());
+ this.setUseHttps(parameters.isUseHttps());
+ }
+
+}
diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/BridgeService.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/BridgeService.java
index 18f3a6fdd..f374f50d1 100644
--- a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/BridgeService.java
+++ b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/BridgeService.java
@@ -60,6 +60,6 @@ public class BridgeService extends AbstractElementService {
@Override
public void active(ElementConfig elementConfig) {
- receiver = elementConfig.getElementId();
+ receiver = elementConfig.getReceiverId();
}
}
diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/ConfigService.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/ConfigService.java
index 4ffed197c..78ddad681 100644
--- a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/ConfigService.java
+++ b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/ConfigService.java
@@ -26,11 +26,11 @@ import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.onap.policy.clamp.acm.element.handler.MessageActivator;
import org.onap.policy.clamp.acm.element.handler.MessageHandler;
+import org.onap.policy.clamp.acm.element.main.parameters.ElementTopicParameters;
import org.onap.policy.clamp.common.acm.exception.AutomationCompositionRuntimeException;
import org.onap.policy.clamp.models.acm.messages.dmaap.element.ElementMessage;
import org.onap.policy.clamp.models.acm.messages.rest.element.ElementConfig;
import org.onap.policy.common.endpoints.parameters.TopicParameterGroup;
-import org.onap.policy.common.endpoints.parameters.TopicParameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
@@ -52,16 +52,14 @@ public class ConfigService {
* @param elementConfig the configuration
*/
public void activateElement(@NonNull ElementConfig elementConfig) {
- var topicParameters = new TopicParameters();
- topicParameters.setTopic(elementConfig.getTopicParameterGroup().getTopic());
- topicParameters.setServers(List.of(elementConfig.getTopicParameterGroup().getServer()));
- topicParameters.setFetchTimeout(elementConfig.getTopicParameterGroup().getFetchTimeout());
- topicParameters.setTopicCommInfrastructure(elementConfig.getTopicParameterGroup().getTopicCommInfrastructure());
- topicParameters.setUseHttps(elementConfig.getTopicParameterGroup().isUseHttps());
+ var listenerTopicParameters = new ElementTopicParameters(elementConfig.getTopicParameterGroup());
+
+ var publisherTopicParameters = new ElementTopicParameters(elementConfig.getTopicParameterGroup());
+ publisherTopicParameters.setTopic(elementConfig.getTopicParameterGroup().getPublisherTopic());
var parameters = new TopicParameterGroup();
- parameters.setTopicSinks(List.of(topicParameters));
- parameters.setTopicSources(List.of(topicParameters));
+ parameters.setTopicSinks(List.of(publisherTopicParameters));
+ parameters.setTopicSources(List.of(listenerTopicParameters));
if (!parameters.isValid()) {
throw new AutomationCompositionRuntimeException(Response.Status.BAD_REQUEST,
diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/StarterService.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/StarterService.java
index 589397db7..479fa44b6 100644
--- a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/StarterService.java
+++ b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/StarterService.java
@@ -75,12 +75,12 @@ public class StarterService extends AbstractElementService implements AutoClosea
if (timerPool != null) {
throw new PfModelRuntimeException(Response.Status.CONFLICT, "StarterService alredy actived!");
}
- receiver = elementConfig.getElementId();
+ receiver = elementConfig.getReceiverId();
timerPool = new ScheduledThreadPoolExecutor(1);
timerPool.setRemoveOnCancelPolicy(true);
- future = timerPool.scheduleAtFixedRate(this::sendMessage, elementConfig.getTimerSec(),
- elementConfig.getTimerSec(), TimeUnit.MILLISECONDS);
+ future = timerPool.scheduleAtFixedRate(this::sendMessage, elementConfig.getTimerMs(),
+ elementConfig.getTimerMs(), TimeUnit.MILLISECONDS);
}
private void sendMessage() {
@@ -99,8 +99,8 @@ public class StarterService extends AbstractElementService implements AutoClosea
if (future != null) {
future.cancel(true);
}
- future = timerPool.scheduleAtFixedRate(this::sendMessage, elementConfig.getTimerSec(),
- elementConfig.getTimerSec(), TimeUnit.MILLISECONDS);
+ future = timerPool.scheduleAtFixedRate(this::sendMessage, elementConfig.getTimerMs(),
+ elementConfig.getTimerMs(), TimeUnit.MILLISECONDS);
}
@Override