aboutsummaryrefslogtreecommitdiffstats
path: root/participant/participant-intermediary/src/main/java
diff options
context:
space:
mode:
authorrameshiyer27 <ramesh.murugan.iyer@est.tech>2024-06-11 11:23:44 +0100
committerrameshiyer27 <ramesh.murugan.iyer@est.tech>2024-06-11 13:46:56 +0100
commit5d48bd15e1d799ba4419a8b6d960a089335b9852 (patch)
treed13c4559267335c3cb91e3b05a34c5ae945a0fbe /participant/participant-intermediary/src/main/java
parentb174e37eb1a41e9997c9455edacc36667e0c5c1a (diff)
Add Sync topic for participant Intermediary
Add new sync topic config for Intermediary Add sync topic listener Refactor IntermediaryActivator for processing multiple topic source Issue-ID: POLICY-5030 Change-Id: Idce9839a85571a92048e589bd82ce33699add640 Signed-off-by: zrrmmua <ramesh.murugan.iyer@est.tech>
Diffstat (limited to 'participant/participant-intermediary/src/main/java')
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantAckListener.java5
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantListener.java5
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantSyncListener.java49
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java42
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/Listener.java8
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java13
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/ParticipantIntermediaryParameters.java6
-rw-r--r--participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/Topics.java44
8 files changed, 160 insertions, 12 deletions
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantAckListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantAckListener.java
index ec14e6cd7..4ed89dce8 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantAckListener.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantAckListener.java
@@ -64,4 +64,9 @@ public abstract class ParticipantAckListener<T extends ParticipantAckMessage> ex
public ScoListener<T> getScoListener() {
return this;
}
+
+ @Override
+ public boolean isDefaultTopic() {
+ return true;
+ }
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantListener.java
index bf2892aa8..87eb5c8a1 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantListener.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantListener.java
@@ -61,4 +61,9 @@ public abstract class ParticipantListener<T extends ParticipantMessage> extends
public ScoListener<T> getScoListener() {
return this;
}
+
+ @Override
+ public boolean isDefaultTopic() {
+ return true;
+ }
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantSyncListener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantSyncListener.java
new file mode 100644
index 000000000..0b359f98a
--- /dev/null
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/comm/ParticipantSyncListener.java
@@ -0,0 +1,49 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.participant.intermediary.comm;
+
+import org.onap.policy.clamp.acm.participant.intermediary.handler.ParticipantHandler;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantMessageType;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantSync;
+import org.springframework.stereotype.Component;
+
+@Component
+public class ParticipantSyncListener extends ParticipantListener<ParticipantSync> {
+
+ /**
+ * Constructs the object.
+ *
+ * @param participantHandler the handler for managing the state of the participant
+ */
+ public ParticipantSyncListener(ParticipantHandler participantHandler) {
+ super(ParticipantSync.class, participantHandler, participantHandler::handleParticipantSync);
+ }
+
+ @Override
+ public String getType() {
+ return ParticipantMessageType.PARTICIPANT_SYNC_MSG.name();
+ }
+
+ @Override
+ public boolean isDefaultTopic() {
+ return false;
+ }
+}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java
index a77d5242a..2c54a22dd 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/IntermediaryActivator.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2021 Nordix Foundation.
+ * Copyright (C) 2021,2024 Nordix Foundation.
* Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -28,6 +28,7 @@ import java.util.Timer;
import java.util.TimerTask;
import lombok.Getter;
import org.onap.policy.clamp.acm.participant.intermediary.parameters.ParticipantParameters;
+import org.onap.policy.clamp.acm.participant.intermediary.parameters.Topics;
import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
import org.onap.policy.common.endpoints.event.comm.TopicSink;
import org.onap.policy.common.endpoints.event.comm.TopicSource;
@@ -55,6 +56,9 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl
@Getter
private final MessageTypeDispatcher msgDispatcher;
+ @Getter
+ private final MessageTypeDispatcher syncMsgDispatcher;
+
/**
* Instantiate the activator for participant.
*
@@ -75,22 +79,32 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl
msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
+ syncMsgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
+
// @formatter:off
addAction("Topic endpoint management",
() -> TopicEndpointManager.getManager().start(),
() -> TopicEndpointManager.getManager().shutdown());
- listeners.forEach(listener ->
- addAction("Listener " + listener.getClass().getSimpleName(),
+ listeners.stream().filter(Listener::isDefaultTopic)
+ .forEach(listener -> addAction("Listener " + listener.getClass().getSimpleName(),
() -> msgDispatcher.register(listener.getType(), listener.getScoListener()),
() -> msgDispatcher.unregister(listener.getType())));
+ listeners.stream().filter(l -> ! l.isDefaultTopic())
+ .forEach(listener -> addAction("Listener " + listener.getClass().getSimpleName(),
+ () -> syncMsgDispatcher.register(listener.getType(), listener.getScoListener()),
+ () -> syncMsgDispatcher.unregister(listener.getType())));
+
publishers.forEach(publisher ->
addAction("Publisher " + publisher.getClass().getSimpleName(),
() -> publisher.active(topicSinks),
publisher::stop));
- addAction("Topic Message Dispatcher", this::registerMsgDispatcher, this::unregisterMsgDispatcher);
+ var topics = parameters.getIntermediaryParameters().getTopics();
+
+ addAction("Topic Message Dispatcher", () -> this.registerMsgDispatcher(topics),
+ () -> this.unregisterMsgDispatcher(topics));
// @formatter:on
}
@@ -133,18 +147,26 @@ public class IntermediaryActivator extends ServiceManagerContainer implements Cl
/**
* Registers the dispatcher with the topic source(s).
*/
- private void registerMsgDispatcher() {
- for (final TopicSource source : topicSources) {
- source.register(msgDispatcher);
+ private void registerMsgDispatcher(Topics topics) {
+ for (final var source : topicSources) {
+ if (source.getTopic().equals(topics.getOperationTopic())) {
+ source.register(msgDispatcher);
+ } else if (source.getTopic().equals(topics.getSyncTopic())) {
+ source.register(syncMsgDispatcher);
+ }
}
}
/**
* Unregisters the dispatcher from the topic source(s).
*/
- private void unregisterMsgDispatcher() {
- for (final TopicSource source : topicSources) {
- source.unregister(msgDispatcher);
+ private void unregisterMsgDispatcher(Topics topics) {
+ for (final var source : topicSources) {
+ if (source.getTopic().equals(topics.getOperationTopic())) {
+ source.unregister(msgDispatcher);
+ } else if (source.getTopic().equals(topics.getSyncTopic())) {
+ source.unregister(syncMsgDispatcher);
+ }
}
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/Listener.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/Listener.java
index 56ed55441..27585cf20 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/Listener.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/Listener.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2021 Nordix Foundation.
+ * Copyright (C) 2021,2024 Nordix Foundation.
* Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -38,4 +38,10 @@ public interface Listener<T> {
* @return listener to register
*/
ScoListener<T> getScoListener();
+
+ /**
+ * Check if default topic.
+ * @return true if default topic
+ */
+ boolean isDefaultTopic();
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java
index 54a05912a..7ac58ae6c 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/handler/ParticipantHandler.java
@@ -39,6 +39,7 @@ import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRe
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantRestart;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantStatus;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantStatusReq;
+import org.onap.policy.clamp.models.acm.messages.kafka.participant.ParticipantSync;
import org.onap.policy.clamp.models.acm.messages.kafka.participant.PropertiesUpdate;
import org.onap.policy.clamp.models.acm.messages.rest.instantiation.DeployOrder;
import org.slf4j.Logger;
@@ -209,6 +210,18 @@ public class ParticipantHandler {
}
/**
+ * Handle a ParticipantSync message.
+ *
+ * @param participantSyncMsg the participantSync message
+ */
+ @Timed(value = "listener.participant_sync_msg", description = "PARTICIPANT_SYNC messages received")
+ public void handleParticipantSync(ParticipantSync participantSyncMsg) {
+ LOGGER.debug("ParticipantSync message received for participantId {}",
+ participantSyncMsg.getParticipantId());
+ acDefinitionHandler.handleParticipantRestart(participantSyncMsg);
+ }
+
+ /**
* Dispatch a heartbeat for this participant.
*/
public void sendHeartbeat() {
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/ParticipantIntermediaryParameters.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/ParticipantIntermediaryParameters.java
index 54774044b..1c36ad17f 100644
--- a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/ParticipantIntermediaryParameters.java
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/ParticipantIntermediaryParameters.java
@@ -1,6 +1,6 @@
/*-
* ============LICENSE_START=======================================================
- * Copyright (C) 2021-2023 Nordix Foundation.
+ * Copyright (C) 2021-2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -60,4 +60,8 @@ public class ParticipantIntermediaryParameters {
@Valid
private List<ParticipantSupportedElementType> participantSupportedElementTypes;
+ @NotNull
+ @Valid
+ private Topics topics;
+
}
diff --git a/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/Topics.java b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/Topics.java
new file mode 100644
index 000000000..ddf72052f
--- /dev/null
+++ b/participant/participant-intermediary/src/main/java/org/onap/policy/clamp/acm/participant/intermediary/parameters/Topics.java
@@ -0,0 +1,44 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.participant.intermediary.parameters;
+
+import jakarta.validation.Valid;
+import jakarta.validation.constraints.NotNull;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * Class to hold topic names for operation and synchronization.
+ */
+@Getter
+@Setter
+@AllArgsConstructor
+public class Topics {
+
+ @NotNull
+ @Valid
+ private String operationTopic;
+
+ @NotNull
+ @Valid
+ private String syncTopic;
+}