summaryrefslogtreecommitdiffstats
path: root/runtime-acm/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'runtime-acm/src/main')
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/MessageDispatcherActivator.java18
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/Publisher.java7
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/AcRuntimeParameterGroup.java6
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/Topics.java34
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantAckPublisher.java18
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantPublisher.java19
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java4
-rw-r--r--runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java101
-rw-r--r--runtime-acm/src/main/resources/application.yaml13
9 files changed, 199 insertions, 21 deletions
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/MessageDispatcherActivator.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/MessageDispatcherActivator.java
index 0d9de205e..a3e55c3f7 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/MessageDispatcherActivator.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/MessageDispatcherActivator.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2021 Nordix Foundation.
+ * Copyright (C) 2021,2024 Nordix Foundation.
* Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -24,9 +24,12 @@ package org.onap.policy.clamp.acm.runtime.config.messaging;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
import lombok.Getter;
import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
import org.onap.policy.clamp.common.acm.exception.AutomationCompositionRuntimeException;
+import org.onap.policy.common.endpoints.event.comm.Topic;
import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.TopicSource;
@@ -65,8 +68,14 @@ public class MessageDispatcherActivator extends ServiceManagerContainer implemen
topicSources = TopicEndpointManager.getManager()
.addTopicSources(acRuntimeParameterGroup.getTopicParameterGroup().getTopicSources());
+ var topics = acRuntimeParameterGroup.getTopics();
+
msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
+ var topicMap = topicSinks.stream()
+ .collect(Collectors.toMap(Topic::getTopic, UnaryOperator.identity()));
+
+
// @formatter:off
addAction("Topic endpoint management",
() -> TopicEndpointManager.getManager().start(),
@@ -74,7 +83,8 @@ public class MessageDispatcherActivator extends ServiceManagerContainer implemen
publishers.forEach(publisher ->
addAction("Publisher " + publisher.getClass().getSimpleName(),
- () -> publisher.active(topicSinks),
+ () -> publisher.active(publisher.isDefaultTopic() ? topicMap.get(topics.getOperationTopic())
+ : topicMap.get(topics.getSyncTopic())),
publisher::stop));
listeners.forEach(listener ->
@@ -90,7 +100,7 @@ public class MessageDispatcherActivator extends ServiceManagerContainer implemen
* Registers the dispatcher with the topic source(s).
*/
private void registerMsgDispatcher() {
- for (final TopicSource source : topicSources) {
+ for (final var source : topicSources) {
source.register(msgDispatcher);
}
}
@@ -99,7 +109,7 @@ public class MessageDispatcherActivator extends ServiceManagerContainer implemen
* Unregisters the dispatcher from the topic source(s).
*/
private void unregisterMsgDispatcher() {
- for (final TopicSource source : topicSources) {
+ for (final var source : topicSources) {
source.unregister(msgDispatcher);
}
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/Publisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/Publisher.java
index a7acc47b3..a76a09d99 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/Publisher.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/config/messaging/Publisher.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2021 Nordix Foundation.
+ * Copyright (C) 2021,2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@
package org.onap.policy.clamp.acm.runtime.config.messaging;
import java.util.List;
+import org.onap.policy.clamp.acm.runtime.main.parameters.Topics;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
/**
@@ -28,7 +29,9 @@ import org.onap.policy.common.endpoints.event.comm.TopicSink;
*/
public interface Publisher {
- void active(List<TopicSink> topicSinks);
+ void active(TopicSink topicSink);
void stop();
+
+ boolean isDefaultTopic();
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/AcRuntimeParameterGroup.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/AcRuntimeParameterGroup.java
index a30b531a4..a0b6fe13e 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/AcRuntimeParameterGroup.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/AcRuntimeParameterGroup.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2021,2023 Nordix Foundation.
+ * Copyright (C) 2021,2023-2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -50,4 +50,8 @@ public class AcRuntimeParameterGroup {
@Valid
@NotNull
private AcmParameters acmParameters = new AcmParameters();
+
+ @Valid
+ @NotNull
+ private Topics topics;
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/Topics.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/Topics.java
new file mode 100644
index 000000000..d485a24ba
--- /dev/null
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/main/parameters/Topics.java
@@ -0,0 +1,34 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.runtime.main.parameters;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.validation.annotation.Validated;
+
+@Getter
+@Setter
+@Validated
+@AllArgsConstructor
+public class Topics {
+
+ private String operationTopic;
+ private String syncTopic;
+}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantAckPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantAckPublisher.java
index 246d1c13f..5014f7dc3 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantAckPublisher.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantAckPublisher.java
@@ -22,7 +22,9 @@ package org.onap.policy.clamp.acm.runtime.supervision.comm;
import jakarta.ws.rs.core.Response.Status;
import java.util.List;
+import java.util.Optional;
import org.onap.policy.clamp.acm.runtime.config.messaging.Publisher;
+import org.onap.policy.clamp.acm.runtime.main.parameters.Topics;
import org.onap.policy.clamp.common.acm.exception.AutomationCompositionRuntimeException;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantAckMessage;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
@@ -47,11 +49,8 @@ public abstract class AbstractParticipantAckPublisher<E extends ParticipantAckMe
@Override
- public void active(List<TopicSink> topicSinks) {
- if (topicSinks.size() != 1) {
- throw new IllegalArgumentException("Topic Sink must be one");
- }
- this.topicSinkClient = new TopicSinkClient(topicSinks.get(0));
+ public void active(TopicSink topicSink) {
+ this.topicSinkClient = new TopicSinkClient(topicSink);
active = true;
}
@@ -59,4 +58,13 @@ public abstract class AbstractParticipantAckPublisher<E extends ParticipantAckMe
public void stop() {
active = false;
}
+
+ /**
+ * Is default topic.
+ * @return true if default
+ */
+ @Override
+ public boolean isDefaultTopic() {
+ return true;
+ }
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantPublisher.java
index d17cd7301..5afb7eba4 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantPublisher.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/AbstractParticipantPublisher.java
@@ -22,12 +22,15 @@ package org.onap.policy.clamp.acm.runtime.supervision.comm;
import jakarta.ws.rs.core.Response.Status;
import java.util.List;
+import java.util.Optional;
import org.onap.policy.clamp.acm.runtime.config.messaging.Publisher;
+import org.onap.policy.clamp.acm.runtime.main.parameters.Topics;
import org.onap.policy.clamp.common.acm.exception.AutomationCompositionRuntimeException;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessage;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
+
public abstract class AbstractParticipantPublisher<E extends ParticipantMessage> implements Publisher {
private TopicSinkClient topicSinkClient;
@@ -47,11 +50,8 @@ public abstract class AbstractParticipantPublisher<E extends ParticipantMessage>
@Override
- public void active(List<TopicSink> topicSinks) {
- if (topicSinks.size() != 1) {
- throw new IllegalArgumentException("Topic Sink must be one");
- }
- this.topicSinkClient = new TopicSinkClient(topicSinks.get(0));
+ public void active(TopicSink topicSink) {
+ this.topicSinkClient = new TopicSinkClient(topicSink);
active = true;
}
@@ -59,4 +59,13 @@ public abstract class AbstractParticipantPublisher<E extends ParticipantMessage>
public void stop() {
active = false;
}
+
+ /**
+ * Is default topic.
+ * @return true if default
+ */
+ @Override
+ public boolean isDefaultTopic() {
+ return true;
+ }
}
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java
index 50fa6d11f..4f28eab8e 100644
--- a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantRestartPublisher.java
@@ -86,12 +86,12 @@ public class ParticipantRestartPublisher extends AbstractParticipantPublisher<Pa
super.send(message);
}
- private List<ParticipantDefinition> prepareParticipantRestarting(UUID participantId,
+ protected List<ParticipantDefinition> prepareParticipantRestarting(UUID participantId,
AutomationCompositionDefinition acmDefinition) {
var acElements = AcmUtils.extractAcElementsFromServiceTemplate(acmDefinition.getServiceTemplate(),
acRuntimeParameterGroup.getAcmParameters().getToscaElementName());
- // list of entry entry filtered by participantId
+ // list of entry filtered by participantId
List<Entry<String, ToscaNodeTemplate>> elementList = new ArrayList<>();
Map<ToscaConceptIdentifier, UUID> supportedElementMap = new HashMap<>();
for (var elementEntry : acElements) {
diff --git a/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java
new file mode 100644
index 000000000..ae7eda1ee
--- /dev/null
+++ b/runtime-acm/src/main/java/org/onap/policy/clamp/acm/runtime/supervision/comm/ParticipantSyncPublisher.java
@@ -0,0 +1,101 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.runtime.supervision.comm;
+
+import io.micrometer.core.annotation.Timed;
+import java.time.Instant;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import org.onap.policy.clamp.acm.runtime.main.parameters.AcRuntimeParameterGroup;
+import org.onap.policy.clamp.acm.runtime.main.parameters.Topics;
+import org.onap.policy.clamp.models.acm.concepts.AutomationComposition;
+import org.onap.policy.clamp.models.acm.concepts.AutomationCompositionDefinition;
+import org.onap.policy.clamp.models.acm.concepts.ParticipantRestartAc;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantSync;
+import org.onap.policy.clamp.models.acm.utils.AcmUtils;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+
+@Component
+public class ParticipantSyncPublisher extends ParticipantRestartPublisher {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ParticipantSyncPublisher.class);
+
+ private final AcRuntimeParameterGroup acRuntimeParameterGroup;
+
+ public ParticipantSyncPublisher(AcRuntimeParameterGroup acRuntimeParameterGroup) {
+ super(acRuntimeParameterGroup);
+ this.acRuntimeParameterGroup = acRuntimeParameterGroup;
+ }
+
+
+ /**
+ * Send sync msg to Participant.
+ *
+ * @param participantId the ParticipantId
+ * @param acmDefinition the AutomationComposition Definition
+ * @param automationCompositions the list of automationCompositions
+ */
+ @Override
+ @Timed(value = "publisher.participant_sync_msg", description = "Participant Sync published")
+ public void send(UUID participantId, AutomationCompositionDefinition acmDefinition,
+ List<AutomationComposition> automationCompositions) {
+
+ var message = new ParticipantSync();
+ message.setParticipantId(participantId);
+ message.setCompositionId(acmDefinition.getCompositionId());
+ message.setMessageId(UUID.randomUUID());
+ message.setTimestamp(Instant.now());
+ message.setState(acmDefinition.getState());
+ message.setParticipantDefinitionUpdates(prepareParticipantRestarting(participantId, acmDefinition));
+ var toscaServiceTemplateFragment = AcmUtils.getToscaServiceTemplateFragment(acmDefinition.getServiceTemplate());
+
+ for (var automationComposition : automationCompositions) {
+ var syncAc = new ParticipantRestartAc();
+ syncAc.setAutomationCompositionId(automationComposition.getInstanceId());
+ for (var element : automationComposition.getElements().values()) {
+ if (participantId.equals(element.getParticipantId())) {
+ var acElementSync = AcmUtils.createAcElementRestart(element);
+ acElementSync.setToscaServiceTemplateFragment(toscaServiceTemplateFragment);
+ syncAc.getAcElementList().add(acElementSync);
+ }
+ }
+ message.getAutomationcompositionList().add(syncAc);
+ }
+
+ LOGGER.debug("Participant Sync sent {}", message);
+ super.send(message);
+ }
+
+ /**
+ * Is default topic.
+ * @return true if default
+ */
+ @Override
+ public boolean isDefaultTopic() {
+ return false;
+ }
+
+}
diff --git a/runtime-acm/src/main/resources/application.yaml b/runtime-acm/src/main/resources/application.yaml
index d93418e5e..58e590b14 100644
--- a/runtime-acm/src/main/resources/application.yaml
+++ b/runtime-acm/src/main/resources/application.yaml
@@ -40,20 +40,29 @@ server:
path: /error
runtime:
+ topics:
+ operationTopic: policy-acruntime-participant
+ syncTopic: acm-ppnt-sync
participantParameters:
heartBeatMs: 20000
maxStatusWaitMs: 200000
topicParameterGroup:
topicSources:
-
- topic: policy-acruntime-participant
+ topic: ${runtime.topics.operationTopic}
servers:
- ${topicServer:kafka:9092}
topicCommInfrastructure: NOOP
fetchTimeout: 15000
topicSinks:
-
- topic: policy-acruntime-participant
+ topic: ${runtime.topics.operationTopic}
+ servers:
+ - ${topicServer:kafka:9092}
+ topicCommInfrastructure: NOOP
+
+ -
+ topic: ${runtime.topics.syncTopic}
servers:
- ${topicServer:kafka:9092}
topicCommInfrastructure: NOOP