aboutsummaryrefslogtreecommitdiffstats
path: root/participant
diff options
context:
space:
mode:
Diffstat (limited to 'participant')
-rw-r--r--participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/handler/MessageActivator.java143
-rw-r--r--participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/handler/MessageHandler.java115
-rw-r--r--participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/handler/MessageListener.java45
-rw-r--r--participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/handler/MessagePublisher.java71
-rw-r--r--participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/AbstractElementService.java47
-rw-r--r--participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/BridgeService.java65
-rw-r--r--participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/ConfigService.java105
-rw-r--r--participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/ElementService.java38
-rw-r--r--participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/SinkService.java36
-rw-r--r--participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/StarterService.java110
10 files changed, 775 insertions, 0 deletions
diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/handler/MessageActivator.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/handler/MessageActivator.java
new file mode 100644
index 000000000..ac3c72e52
--- /dev/null
+++ b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/handler/MessageActivator.java
@@ -0,0 +1,143 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.element.handler;
+
+import java.io.IOException;
+import java.util.List;
+import org.onap.policy.clamp.models.acm.messages.dmaap.element.ElementMessageType;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.event.comm.TopicSource;
+import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
+import org.onap.policy.common.endpoints.parameters.TopicParameterGroup;
+import org.onap.policy.common.utils.services.ServiceManagerContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.event.ContextClosedEvent;
+import org.springframework.context.event.EventListener;
+import org.springframework.stereotype.Component;
+
+/**
+ * This class activates the Kafka together with all its handlers.
+ */
+@Component
+public class MessageActivator extends ServiceManagerContainer implements AutoCloseable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(MessageActivator.class);
+
+ private static final String[] MSG_TYPE_NAMES = { "messageType" };
+
+ // Topics from which the AC element receives and sends messages
+ private List<TopicSink> topicSinks;
+ private List<TopicSource> topicSources;
+
+ private final MessageListener listener;
+ private final MessagePublisher publisher;
+
+ private MessageTypeDispatcher msgDispatcher;
+
+ /**
+ * Constructor.
+ *
+ * @param listener MessageListener
+ * @param publisher MessagePublisher
+ */
+ public MessageActivator(MessageListener listener, MessagePublisher publisher) {
+ super();
+ this.listener = listener;
+ this.publisher = publisher;
+ msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
+ }
+
+ /**
+ * Activate publisher and listener messages.
+ *
+ * @param parameters TopicParameterGroup
+ */
+ public void activate(final TopicParameterGroup parameters) {
+ topicSinks = TopicEndpointManager.getManager().addTopicSinks(parameters.getTopicSinks());
+ topicSources = TopicEndpointManager.getManager().addTopicSources(parameters.getTopicSources());
+
+ // @formatter:off
+ addAction("Topic endpoint management",
+ () -> TopicEndpointManager.getManager().start(),
+ () -> TopicEndpointManager.getManager().shutdown());
+
+ addAction("Message Publisher",
+ () -> publisher.active(topicSinks), publisher::stop);
+
+
+ addAction("Message Listener",
+ () -> msgDispatcher.register(ElementMessageType.STATUS.name(), listener),
+ () -> msgDispatcher.unregister(ElementMessageType.STATUS.name()));
+
+ addAction("Topic Message Dispatcher", this::registerMsgDispatcher, this::unregisterMsgDispatcher);
+ // @formatter:on
+
+ start();
+ LOGGER.info("Kafka configuration initialised successfully");
+ }
+
+ /**
+ * Handle ContextClosedEvent.
+ *
+ * @param ctxClosedEvent ContextClosedEvent
+ */
+ @EventListener
+ public void handleContextClosedEvent(ContextClosedEvent ctxClosedEvent) {
+ deactivate();
+ }
+
+ /**
+ * Deactivate publisher and listener messages.
+ */
+ public void deactivate() {
+ if (isAlive()) {
+ stop();
+ }
+ }
+
+ /**
+ * Registers the dispatcher with the topic source(s).
+ */
+ private void registerMsgDispatcher() {
+ for (final TopicSource source : topicSources) {
+ source.register(msgDispatcher);
+ }
+ }
+
+ /**
+ * Unregisters the dispatcher from the topic source(s).
+ */
+ private void unregisterMsgDispatcher() {
+ for (final TopicSource source : topicSources) {
+ source.unregister(msgDispatcher);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (isAlive()) {
+ super.shutdown();
+ LOGGER.info("Kafka configuration is uninitialised.");
+ }
+ }
+}
diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/handler/MessageHandler.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/handler/MessageHandler.java
new file mode 100644
index 000000000..540c133f0
--- /dev/null
+++ b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/handler/MessageHandler.java
@@ -0,0 +1,115 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.element.handler;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.ws.rs.core.Response;
+import lombok.Getter;
+import lombok.NonNull;
+import org.onap.policy.clamp.acm.element.main.parameters.AcElement;
+import org.onap.policy.clamp.acm.element.service.ElementService;
+import org.onap.policy.clamp.models.acm.messages.dmaap.element.ElementMessage;
+import org.onap.policy.clamp.models.acm.messages.rest.element.ElementConfig;
+import org.onap.policy.clamp.models.acm.messages.rest.element.ElementType;
+import org.onap.policy.models.base.PfModelRuntimeException;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
+import org.springframework.stereotype.Component;
+
+@Component
+public class MessageHandler {
+
+ private ElementType elementType;
+ private ToscaConceptIdentifier elementId;
+
+ private Map<ElementType, ElementService> map = new HashMap<>();
+
+ @Getter
+ private List<ElementMessage> messages = new ArrayList<>();
+
+ /**
+ * Constructor.
+ *
+ * @param acElement AcElement
+ * @param elementServices ElementService list
+ */
+ public MessageHandler(AcElement acElement, List<ElementService> elementServices) {
+ elementId = acElement.getElementId();
+ elementServices.stream().forEach(elementService -> map.put(elementService.getType(), elementService));
+ }
+
+ /**
+ * Active Element Service.
+ *
+ * @param elementConfig ElementConfig
+ */
+ public void active(@NonNull ElementConfig elementConfig) {
+ this.elementType = elementConfig.getElementType();
+ getActiveService().active(elementConfig);
+ }
+
+ /**
+ * Update configuration.
+ *
+ * @param elementConfig ElementConfig
+ */
+ public void update(@NonNull ElementConfig elementConfig) {
+ if (elementType == null) {
+ throw new PfModelRuntimeException(Response.Status.CONFLICT, "ElementType not defined!");
+ }
+ if (!elementType.equals(elementConfig.getElementType())) {
+ throw new PfModelRuntimeException(Response.Status.CONFLICT, "wrong ElementType!");
+ }
+ getActiveService().update(elementConfig);
+ }
+
+ /**
+ * Get Active Service.
+ *
+ * @return ElementService
+ */
+ public ElementService getActiveService() {
+ if (elementType == null) {
+ throw new PfModelRuntimeException(Response.Status.CONFLICT, "ElementType not defined!");
+ }
+ var service = map.get(elementType);
+ if (service == null) {
+ throw new PfModelRuntimeException(Response.Status.CONFLICT, "ElementService not found!");
+ }
+ return service;
+ }
+
+ public void handleMessage(ElementMessage message) {
+ messages.add(message);
+ getActiveService().handleMessage(message);
+ }
+
+ public boolean appliesTo(final ToscaConceptIdentifier elementId) {
+ return this.elementId.equals(elementId);
+ }
+
+ public void deactivateElement() {
+ getActiveService().deactivate();
+ elementType = null;
+ }
+}
diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/handler/MessageListener.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/handler/MessageListener.java
new file mode 100644
index 000000000..9fe4f0661
--- /dev/null
+++ b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/handler/MessageListener.java
@@ -0,0 +1,45 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.element.handler;
+
+import org.onap.policy.clamp.models.acm.messages.dmaap.element.ElementStatus;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.listeners.ScoListener;
+import org.onap.policy.common.utils.coder.StandardCoderObject;
+import org.springframework.stereotype.Component;
+
+@Component
+public class MessageListener extends ScoListener<ElementStatus> {
+
+ private final MessageHandler handler;
+
+ public MessageListener(MessageHandler handler) {
+ super(ElementStatus.class);
+ this.handler = handler;
+ }
+
+ @Override
+ public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco, ElementStatus message) {
+ if (handler.appliesTo(message.getElementId())) {
+ handler.handleMessage(message);
+ }
+ }
+}
diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/handler/MessagePublisher.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/handler/MessagePublisher.java
new file mode 100644
index 000000000..ef0a72f1f
--- /dev/null
+++ b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/handler/MessagePublisher.java
@@ -0,0 +1,71 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.element.handler;
+
+import java.util.List;
+import javax.ws.rs.core.Response;
+import org.onap.policy.clamp.models.acm.messages.dmaap.element.ElementMessage;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
+import org.onap.policy.models.base.PfModelRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+@Component
+public class MessagePublisher {
+ private static final Logger LOGGER = LoggerFactory.getLogger(MessagePublisher.class);
+ private static final String NOT_ACTIVE_TEXT = "Not Active!";
+
+ private boolean active = false;
+ private TopicSinkClient topicSinkClient;
+
+ /**
+ * Constructor for instantiating MessagePublisher.
+ *
+ * @param topicSinks the topic sinks
+ */
+ public void active(List<TopicSink> topicSinks) {
+ if (topicSinks.size() != 1) {
+ throw new IllegalArgumentException("Configuration unsupported, Topic sinks greater than 1");
+ }
+ this.topicSinkClient = new TopicSinkClient(topicSinks.get(0));
+ active = true;
+ }
+
+ /**
+ * Method to send message.
+ *
+ * @param msg the acknowledgement message
+ */
+ public void publishMsg(final ElementMessage msg) {
+ if (!active) {
+ throw new PfModelRuntimeException(Response.Status.CONFLICT, NOT_ACTIVE_TEXT);
+ }
+
+ topicSinkClient.send(msg);
+ LOGGER.debug("Sent message {}", msg);
+ }
+
+ public void stop() {
+ active = false;
+ }
+}
diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/AbstractElementService.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/AbstractElementService.java
new file mode 100644
index 000000000..7c28c4bf9
--- /dev/null
+++ b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/AbstractElementService.java
@@ -0,0 +1,47 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.element.service;
+
+import org.onap.policy.clamp.models.acm.messages.dmaap.element.ElementMessage;
+import org.onap.policy.clamp.models.acm.messages.rest.element.ElementConfig;
+
+public abstract class AbstractElementService implements ElementService {
+
+ @Override
+ public void update(ElementConfig elementConfig) {
+ // Not needs
+ }
+
+ @Override
+ public void handleMessage(ElementMessage message) {
+ // Not needs
+ }
+
+ @Override
+ public void active(ElementConfig elementConfig) {
+ // Not needs
+ }
+
+ @Override
+ public void deactivate() {
+ // Not needs
+ }
+}
diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/BridgeService.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/BridgeService.java
new file mode 100644
index 000000000..18f3a6fdd
--- /dev/null
+++ b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/BridgeService.java
@@ -0,0 +1,65 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.element.service;
+
+import org.onap.policy.clamp.acm.element.handler.MessagePublisher;
+import org.onap.policy.clamp.acm.element.main.parameters.AcElement;
+import org.onap.policy.clamp.models.acm.messages.dmaap.element.ElementMessage;
+import org.onap.policy.clamp.models.acm.messages.dmaap.element.ElementStatus;
+import org.onap.policy.clamp.models.acm.messages.rest.element.ElementConfig;
+import org.onap.policy.clamp.models.acm.messages.rest.element.ElementType;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
+import org.springframework.stereotype.Service;
+
+/**
+ * Bridge Service.
+ */
+@Service
+public class BridgeService extends AbstractElementService {
+
+ private final MessagePublisher messagePublisher;
+ private ToscaConceptIdentifier receiver;
+ private ToscaConceptIdentifier elementId;
+
+ public BridgeService(MessagePublisher messagePublisher, AcElement acElement) {
+ this.messagePublisher = messagePublisher;
+ this.elementId = acElement.getElementId();
+ }
+
+ @Override
+ public ElementType getType() {
+ return ElementType.BRIDGE;
+ }
+
+ @Override
+ public void handleMessage(ElementMessage messageFrom) {
+ var messageTo = new ElementStatus();
+ messageTo.setElementId(receiver);
+ // Add Tracking
+ messageTo.setMessage(messageFrom.getMessage() + ", bridge: " + elementId);
+ messagePublisher.publishMsg(messageTo);
+ }
+
+ @Override
+ public void active(ElementConfig elementConfig) {
+ receiver = elementConfig.getElementId();
+ }
+}
diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/ConfigService.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/ConfigService.java
new file mode 100644
index 000000000..f542be201
--- /dev/null
+++ b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/ConfigService.java
@@ -0,0 +1,105 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.element.service;
+
+import java.util.List;
+import javax.ws.rs.core.Response;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+import org.onap.policy.clamp.acm.element.handler.MessageActivator;
+import org.onap.policy.clamp.acm.element.handler.MessageHandler;
+import org.onap.policy.clamp.models.acm.messages.dmaap.element.ElementMessage;
+import org.onap.policy.clamp.models.acm.messages.rest.element.ElementConfig;
+import org.onap.policy.common.endpoints.parameters.TopicParameterGroup;
+import org.onap.policy.common.endpoints.parameters.TopicParameters;
+import org.onap.policy.models.base.PfModelRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+@Service
+@RequiredArgsConstructor
+public class ConfigService {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ConfigService.class);
+
+ private ElementConfig elementConfig = new ElementConfig();
+
+ private final MessageHandler handler;
+ private final MessageActivator messageActivator;
+
+ /**
+ * Activate messages and service and create the element configuration.
+ *
+ * @param elementConfig the configuration
+ */
+ public void activateElement(@NonNull ElementConfig elementConfig) {
+ var topicParameters = new TopicParameters();
+ topicParameters.setTopic(elementConfig.getTopicParameterGroup().getTopic());
+ topicParameters.setServers(List.of(elementConfig.getTopicParameterGroup().getServer()));
+ topicParameters.setFetchTimeout(elementConfig.getTopicParameterGroup().getFetchTimeout());
+ topicParameters.setTopicCommInfrastructure(elementConfig.getTopicParameterGroup().getTopicCommInfrastructure());
+ topicParameters.setUseHttps(elementConfig.getTopicParameterGroup().isUseHttps());
+
+ var parameters = new TopicParameterGroup();
+ parameters.setTopicSinks(List.of(topicParameters));
+ parameters.setTopicSources(List.of(topicParameters));
+
+ if (!parameters.isValid()) {
+ throw new PfModelRuntimeException(Response.Status.BAD_REQUEST,
+ "Validation failed for topic parameter group. Kafka config not activated");
+ }
+
+ if (messageActivator.isAlive()) {
+ throw new PfModelRuntimeException(Response.Status.CONFLICT,
+ "Service Manager already running, cannot add Topic endpoint management");
+ }
+
+ handler.active(elementConfig);
+ messageActivator.activate(parameters);
+ this.elementConfig = elementConfig;
+
+ LOGGER.info("Messages and service activated");
+ }
+
+ /**
+ * Fetch element configuration.
+ *
+ * @return element configuration present
+ */
+ public ElementConfig getElementConfig() {
+ return elementConfig;
+ }
+
+ /**
+ * Deactivate messages and service and delete the element config.
+ */
+ public void deleteConfig() {
+ handler.deactivateElement();
+ messageActivator.deactivate();
+ elementConfig = new ElementConfig();
+ LOGGER.info("Messages and service deactivated");
+ }
+
+ public List<ElementMessage> getMessages() {
+ return handler.getMessages();
+ }
+}
diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/ElementService.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/ElementService.java
new file mode 100644
index 000000000..00b4d8a2c
--- /dev/null
+++ b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/ElementService.java
@@ -0,0 +1,38 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.element.service;
+
+import org.onap.policy.clamp.models.acm.messages.dmaap.element.ElementMessage;
+import org.onap.policy.clamp.models.acm.messages.rest.element.ElementConfig;
+import org.onap.policy.clamp.models.acm.messages.rest.element.ElementType;
+
+public interface ElementService {
+
+ ElementType getType();
+
+ public void active(ElementConfig elementConfig);
+
+ public void update(ElementConfig elementConfig);
+
+ void handleMessage(ElementMessage message);
+
+ void deactivate();
+}
diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/SinkService.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/SinkService.java
new file mode 100644
index 000000000..61c1c7812
--- /dev/null
+++ b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/SinkService.java
@@ -0,0 +1,36 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.element.service;
+
+import org.onap.policy.clamp.models.acm.messages.rest.element.ElementType;
+import org.springframework.stereotype.Service;
+
+/**
+ * Sink Service.
+ */
+@Service
+public class SinkService extends AbstractElementService {
+
+ @Override
+ public ElementType getType() {
+ return ElementType.SINK;
+ }
+}
diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/StarterService.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/StarterService.java
new file mode 100644
index 000000000..589397db7
--- /dev/null
+++ b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/StarterService.java
@@ -0,0 +1,110 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.element.service;
+
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import javax.ws.rs.core.Response;
+import org.onap.policy.clamp.acm.element.handler.MessagePublisher;
+import org.onap.policy.clamp.acm.element.main.parameters.AcElement;
+import org.onap.policy.clamp.models.acm.messages.dmaap.element.ElementStatus;
+import org.onap.policy.clamp.models.acm.messages.rest.element.ElementConfig;
+import org.onap.policy.clamp.models.acm.messages.rest.element.ElementType;
+import org.onap.policy.models.base.PfModelRuntimeException;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
+import org.springframework.stereotype.Service;
+
+/**
+ * Starter Service.
+ */
+@Service
+public class StarterService extends AbstractElementService implements AutoCloseable {
+
+ private ScheduledThreadPoolExecutor timerPool;
+ private ScheduledFuture<?> future;
+ private ToscaConceptIdentifier receiver;
+ private ToscaConceptIdentifier elementId;
+
+ private final MessagePublisher messagePublisher;
+
+ public StarterService(MessagePublisher messagePublisher, AcElement acElement) {
+ this.messagePublisher = messagePublisher;
+ this.elementId = acElement.getElementId();
+ }
+
+ @Override
+ public ElementType getType() {
+ return ElementType.STARTER;
+ }
+
+ /**
+ * Deactivate Scheduled ThreadPool Executor.
+ */
+ @Override
+ public void deactivate() {
+ if (timerPool != null) {
+ if (future != null) {
+ future.cancel(true);
+ }
+ timerPool.shutdown();
+ timerPool = null;
+ }
+ }
+
+ @Override
+ public void active(ElementConfig elementConfig) {
+ if (timerPool != null) {
+ throw new PfModelRuntimeException(Response.Status.CONFLICT, "StarterService alredy actived!");
+ }
+ receiver = elementConfig.getElementId();
+
+ timerPool = new ScheduledThreadPoolExecutor(1);
+ timerPool.setRemoveOnCancelPolicy(true);
+ future = timerPool.scheduleAtFixedRate(this::sendMessage, elementConfig.getTimerSec(),
+ elementConfig.getTimerSec(), TimeUnit.MILLISECONDS);
+ }
+
+ private void sendMessage() {
+ var messasge = new ElementStatus();
+ messasge.setElementId(receiver);
+ // Add Tracking
+ messasge.setMessage("starter: " + elementId);
+ messagePublisher.publishMsg(messasge);
+ }
+
+ @Override
+ public void update(ElementConfig elementConfig) {
+ if (timerPool == null) {
+ throw new PfModelRuntimeException(Response.Status.CONFLICT, "StarterService not actived!");
+ }
+ if (future != null) {
+ future.cancel(true);
+ }
+ future = timerPool.scheduleAtFixedRate(this::sendMessage, elementConfig.getTimerSec(),
+ elementConfig.getTimerSec(), TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void close() throws Exception {
+ deactivate();
+ }
+}