diff options
author | FrancescoFioraEst <francesco.fiora@est.tech> | 2022-08-04 10:35:08 +0100 |
---|---|---|
committer | FrancescoFioraEst <francesco.fiora@est.tech> | 2022-08-09 14:24:28 +0100 |
commit | eafce4b03a77e729e1eb7fce7368e206b7fc9dce (patch) | |
tree | c48f0c1dc5b6a9cdfd3c624d5e246dfdf1d7d353 /participant/participant-impl/participant-impl-acelement/src/main/java | |
parent | c0202ca94ae499b88061efbbc06ffde53218a7a5 (diff) |
Create Message Handler and Activator for the test microservice
Issue-ID: POLICY-4319
Change-Id: If096467ad717fdeaf70e6a9079c531a201e6cec7
Signed-off-by: FrancescoFioraEst <francesco.fiora@est.tech>
Diffstat (limited to 'participant/participant-impl/participant-impl-acelement/src/main/java')
10 files changed, 775 insertions, 0 deletions
diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/handler/MessageActivator.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/handler/MessageActivator.java new file mode 100644 index 000000000..ac3c72e52 --- /dev/null +++ b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/handler/MessageActivator.java @@ -0,0 +1,143 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.acm.element.handler; + +import java.io.IOException; +import java.util.List; +import org.onap.policy.clamp.models.acm.messages.dmaap.element.ElementMessageType; +import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; +import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.onap.policy.common.endpoints.event.comm.TopicSource; +import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher; +import org.onap.policy.common.endpoints.parameters.TopicParameterGroup; +import org.onap.policy.common.utils.services.ServiceManagerContainer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.event.ContextClosedEvent; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; + +/** + * This class activates the Kafka together with all its handlers. + */ +@Component +public class MessageActivator extends ServiceManagerContainer implements AutoCloseable { + + private static final Logger LOGGER = LoggerFactory.getLogger(MessageActivator.class); + + private static final String[] MSG_TYPE_NAMES = { "messageType" }; + + // Topics from which the AC element receives and sends messages + private List<TopicSink> topicSinks; + private List<TopicSource> topicSources; + + private final MessageListener listener; + private final MessagePublisher publisher; + + private MessageTypeDispatcher msgDispatcher; + + /** + * Constructor. + * + * @param listener MessageListener + * @param publisher MessagePublisher + */ + public MessageActivator(MessageListener listener, MessagePublisher publisher) { + super(); + this.listener = listener; + this.publisher = publisher; + msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES); + } + + /** + * Activate publisher and listener messages. + * + * @param parameters TopicParameterGroup + */ + public void activate(final TopicParameterGroup parameters) { + topicSinks = TopicEndpointManager.getManager().addTopicSinks(parameters.getTopicSinks()); + topicSources = TopicEndpointManager.getManager().addTopicSources(parameters.getTopicSources()); + + // @formatter:off + addAction("Topic endpoint management", + () -> TopicEndpointManager.getManager().start(), + () -> TopicEndpointManager.getManager().shutdown()); + + addAction("Message Publisher", + () -> publisher.active(topicSinks), publisher::stop); + + + addAction("Message Listener", + () -> msgDispatcher.register(ElementMessageType.STATUS.name(), listener), + () -> msgDispatcher.unregister(ElementMessageType.STATUS.name())); + + addAction("Topic Message Dispatcher", this::registerMsgDispatcher, this::unregisterMsgDispatcher); + // @formatter:on + + start(); + LOGGER.info("Kafka configuration initialised successfully"); + } + + /** + * Handle ContextClosedEvent. + * + * @param ctxClosedEvent ContextClosedEvent + */ + @EventListener + public void handleContextClosedEvent(ContextClosedEvent ctxClosedEvent) { + deactivate(); + } + + /** + * Deactivate publisher and listener messages. + */ + public void deactivate() { + if (isAlive()) { + stop(); + } + } + + /** + * Registers the dispatcher with the topic source(s). + */ + private void registerMsgDispatcher() { + for (final TopicSource source : topicSources) { + source.register(msgDispatcher); + } + } + + /** + * Unregisters the dispatcher from the topic source(s). + */ + private void unregisterMsgDispatcher() { + for (final TopicSource source : topicSources) { + source.unregister(msgDispatcher); + } + } + + @Override + public void close() throws IOException { + if (isAlive()) { + super.shutdown(); + LOGGER.info("Kafka configuration is uninitialised."); + } + } +} diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/handler/MessageHandler.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/handler/MessageHandler.java new file mode 100644 index 000000000..540c133f0 --- /dev/null +++ b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/handler/MessageHandler.java @@ -0,0 +1,115 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.acm.element.handler; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.ws.rs.core.Response; +import lombok.Getter; +import lombok.NonNull; +import org.onap.policy.clamp.acm.element.main.parameters.AcElement; +import org.onap.policy.clamp.acm.element.service.ElementService; +import org.onap.policy.clamp.models.acm.messages.dmaap.element.ElementMessage; +import org.onap.policy.clamp.models.acm.messages.rest.element.ElementConfig; +import org.onap.policy.clamp.models.acm.messages.rest.element.ElementType; +import org.onap.policy.models.base.PfModelRuntimeException; +import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; +import org.springframework.stereotype.Component; + +@Component +public class MessageHandler { + + private ElementType elementType; + private ToscaConceptIdentifier elementId; + + private Map<ElementType, ElementService> map = new HashMap<>(); + + @Getter + private List<ElementMessage> messages = new ArrayList<>(); + + /** + * Constructor. + * + * @param acElement AcElement + * @param elementServices ElementService list + */ + public MessageHandler(AcElement acElement, List<ElementService> elementServices) { + elementId = acElement.getElementId(); + elementServices.stream().forEach(elementService -> map.put(elementService.getType(), elementService)); + } + + /** + * Active Element Service. + * + * @param elementConfig ElementConfig + */ + public void active(@NonNull ElementConfig elementConfig) { + this.elementType = elementConfig.getElementType(); + getActiveService().active(elementConfig); + } + + /** + * Update configuration. + * + * @param elementConfig ElementConfig + */ + public void update(@NonNull ElementConfig elementConfig) { + if (elementType == null) { + throw new PfModelRuntimeException(Response.Status.CONFLICT, "ElementType not defined!"); + } + if (!elementType.equals(elementConfig.getElementType())) { + throw new PfModelRuntimeException(Response.Status.CONFLICT, "wrong ElementType!"); + } + getActiveService().update(elementConfig); + } + + /** + * Get Active Service. + * + * @return ElementService + */ + public ElementService getActiveService() { + if (elementType == null) { + throw new PfModelRuntimeException(Response.Status.CONFLICT, "ElementType not defined!"); + } + var service = map.get(elementType); + if (service == null) { + throw new PfModelRuntimeException(Response.Status.CONFLICT, "ElementService not found!"); + } + return service; + } + + public void handleMessage(ElementMessage message) { + messages.add(message); + getActiveService().handleMessage(message); + } + + public boolean appliesTo(final ToscaConceptIdentifier elementId) { + return this.elementId.equals(elementId); + } + + public void deactivateElement() { + getActiveService().deactivate(); + elementType = null; + } +} diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/handler/MessageListener.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/handler/MessageListener.java new file mode 100644 index 000000000..9fe4f0661 --- /dev/null +++ b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/handler/MessageListener.java @@ -0,0 +1,45 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.acm.element.handler; + +import org.onap.policy.clamp.models.acm.messages.dmaap.element.ElementStatus; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.listeners.ScoListener; +import org.onap.policy.common.utils.coder.StandardCoderObject; +import org.springframework.stereotype.Component; + +@Component +public class MessageListener extends ScoListener<ElementStatus> { + + private final MessageHandler handler; + + public MessageListener(MessageHandler handler) { + super(ElementStatus.class); + this.handler = handler; + } + + @Override + public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco, ElementStatus message) { + if (handler.appliesTo(message.getElementId())) { + handler.handleMessage(message); + } + } +} diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/handler/MessagePublisher.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/handler/MessagePublisher.java new file mode 100644 index 000000000..ef0a72f1f --- /dev/null +++ b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/handler/MessagePublisher.java @@ -0,0 +1,71 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.acm.element.handler; + +import java.util.List; +import javax.ws.rs.core.Response; +import org.onap.policy.clamp.models.acm.messages.dmaap.element.ElementMessage; +import org.onap.policy.common.endpoints.event.comm.TopicSink; +import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient; +import org.onap.policy.models.base.PfModelRuntimeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +@Component +public class MessagePublisher { + private static final Logger LOGGER = LoggerFactory.getLogger(MessagePublisher.class); + private static final String NOT_ACTIVE_TEXT = "Not Active!"; + + private boolean active = false; + private TopicSinkClient topicSinkClient; + + /** + * Constructor for instantiating MessagePublisher. + * + * @param topicSinks the topic sinks + */ + public void active(List<TopicSink> topicSinks) { + if (topicSinks.size() != 1) { + throw new IllegalArgumentException("Configuration unsupported, Topic sinks greater than 1"); + } + this.topicSinkClient = new TopicSinkClient(topicSinks.get(0)); + active = true; + } + + /** + * Method to send message. + * + * @param msg the acknowledgement message + */ + public void publishMsg(final ElementMessage msg) { + if (!active) { + throw new PfModelRuntimeException(Response.Status.CONFLICT, NOT_ACTIVE_TEXT); + } + + topicSinkClient.send(msg); + LOGGER.debug("Sent message {}", msg); + } + + public void stop() { + active = false; + } +} diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/AbstractElementService.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/AbstractElementService.java new file mode 100644 index 000000000..7c28c4bf9 --- /dev/null +++ b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/AbstractElementService.java @@ -0,0 +1,47 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.acm.element.service; + +import org.onap.policy.clamp.models.acm.messages.dmaap.element.ElementMessage; +import org.onap.policy.clamp.models.acm.messages.rest.element.ElementConfig; + +public abstract class AbstractElementService implements ElementService { + + @Override + public void update(ElementConfig elementConfig) { + // Not needs + } + + @Override + public void handleMessage(ElementMessage message) { + // Not needs + } + + @Override + public void active(ElementConfig elementConfig) { + // Not needs + } + + @Override + public void deactivate() { + // Not needs + } +} diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/BridgeService.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/BridgeService.java new file mode 100644 index 000000000..18f3a6fdd --- /dev/null +++ b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/BridgeService.java @@ -0,0 +1,65 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.acm.element.service; + +import org.onap.policy.clamp.acm.element.handler.MessagePublisher; +import org.onap.policy.clamp.acm.element.main.parameters.AcElement; +import org.onap.policy.clamp.models.acm.messages.dmaap.element.ElementMessage; +import org.onap.policy.clamp.models.acm.messages.dmaap.element.ElementStatus; +import org.onap.policy.clamp.models.acm.messages.rest.element.ElementConfig; +import org.onap.policy.clamp.models.acm.messages.rest.element.ElementType; +import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; +import org.springframework.stereotype.Service; + +/** + * Bridge Service. + */ +@Service +public class BridgeService extends AbstractElementService { + + private final MessagePublisher messagePublisher; + private ToscaConceptIdentifier receiver; + private ToscaConceptIdentifier elementId; + + public BridgeService(MessagePublisher messagePublisher, AcElement acElement) { + this.messagePublisher = messagePublisher; + this.elementId = acElement.getElementId(); + } + + @Override + public ElementType getType() { + return ElementType.BRIDGE; + } + + @Override + public void handleMessage(ElementMessage messageFrom) { + var messageTo = new ElementStatus(); + messageTo.setElementId(receiver); + // Add Tracking + messageTo.setMessage(messageFrom.getMessage() + ", bridge: " + elementId); + messagePublisher.publishMsg(messageTo); + } + + @Override + public void active(ElementConfig elementConfig) { + receiver = elementConfig.getElementId(); + } +} diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/ConfigService.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/ConfigService.java new file mode 100644 index 000000000..f542be201 --- /dev/null +++ b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/ConfigService.java @@ -0,0 +1,105 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.acm.element.service; + +import java.util.List; +import javax.ws.rs.core.Response; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import org.onap.policy.clamp.acm.element.handler.MessageActivator; +import org.onap.policy.clamp.acm.element.handler.MessageHandler; +import org.onap.policy.clamp.models.acm.messages.dmaap.element.ElementMessage; +import org.onap.policy.clamp.models.acm.messages.rest.element.ElementConfig; +import org.onap.policy.common.endpoints.parameters.TopicParameterGroup; +import org.onap.policy.common.endpoints.parameters.TopicParameters; +import org.onap.policy.models.base.PfModelRuntimeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +@Service +@RequiredArgsConstructor +public class ConfigService { + + private static final Logger LOGGER = LoggerFactory.getLogger(ConfigService.class); + + private ElementConfig elementConfig = new ElementConfig(); + + private final MessageHandler handler; + private final MessageActivator messageActivator; + + /** + * Activate messages and service and create the element configuration. + * + * @param elementConfig the configuration + */ + public void activateElement(@NonNull ElementConfig elementConfig) { + var topicParameters = new TopicParameters(); + topicParameters.setTopic(elementConfig.getTopicParameterGroup().getTopic()); + topicParameters.setServers(List.of(elementConfig.getTopicParameterGroup().getServer())); + topicParameters.setFetchTimeout(elementConfig.getTopicParameterGroup().getFetchTimeout()); + topicParameters.setTopicCommInfrastructure(elementConfig.getTopicParameterGroup().getTopicCommInfrastructure()); + topicParameters.setUseHttps(elementConfig.getTopicParameterGroup().isUseHttps()); + + var parameters = new TopicParameterGroup(); + parameters.setTopicSinks(List.of(topicParameters)); + parameters.setTopicSources(List.of(topicParameters)); + + if (!parameters.isValid()) { + throw new PfModelRuntimeException(Response.Status.BAD_REQUEST, + "Validation failed for topic parameter group. Kafka config not activated"); + } + + if (messageActivator.isAlive()) { + throw new PfModelRuntimeException(Response.Status.CONFLICT, + "Service Manager already running, cannot add Topic endpoint management"); + } + + handler.active(elementConfig); + messageActivator.activate(parameters); + this.elementConfig = elementConfig; + + LOGGER.info("Messages and service activated"); + } + + /** + * Fetch element configuration. + * + * @return element configuration present + */ + public ElementConfig getElementConfig() { + return elementConfig; + } + + /** + * Deactivate messages and service and delete the element config. + */ + public void deleteConfig() { + handler.deactivateElement(); + messageActivator.deactivate(); + elementConfig = new ElementConfig(); + LOGGER.info("Messages and service deactivated"); + } + + public List<ElementMessage> getMessages() { + return handler.getMessages(); + } +} diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/ElementService.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/ElementService.java new file mode 100644 index 000000000..00b4d8a2c --- /dev/null +++ b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/ElementService.java @@ -0,0 +1,38 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.acm.element.service; + +import org.onap.policy.clamp.models.acm.messages.dmaap.element.ElementMessage; +import org.onap.policy.clamp.models.acm.messages.rest.element.ElementConfig; +import org.onap.policy.clamp.models.acm.messages.rest.element.ElementType; + +public interface ElementService { + + ElementType getType(); + + public void active(ElementConfig elementConfig); + + public void update(ElementConfig elementConfig); + + void handleMessage(ElementMessage message); + + void deactivate(); +} diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/SinkService.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/SinkService.java new file mode 100644 index 000000000..61c1c7812 --- /dev/null +++ b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/SinkService.java @@ -0,0 +1,36 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.acm.element.service; + +import org.onap.policy.clamp.models.acm.messages.rest.element.ElementType; +import org.springframework.stereotype.Service; + +/** + * Sink Service. + */ +@Service +public class SinkService extends AbstractElementService { + + @Override + public ElementType getType() { + return ElementType.SINK; + } +} diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/StarterService.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/StarterService.java new file mode 100644 index 000000000..589397db7 --- /dev/null +++ b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/StarterService.java @@ -0,0 +1,110 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2022 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.acm.element.service; + +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import javax.ws.rs.core.Response; +import org.onap.policy.clamp.acm.element.handler.MessagePublisher; +import org.onap.policy.clamp.acm.element.main.parameters.AcElement; +import org.onap.policy.clamp.models.acm.messages.dmaap.element.ElementStatus; +import org.onap.policy.clamp.models.acm.messages.rest.element.ElementConfig; +import org.onap.policy.clamp.models.acm.messages.rest.element.ElementType; +import org.onap.policy.models.base.PfModelRuntimeException; +import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; +import org.springframework.stereotype.Service; + +/** + * Starter Service. + */ +@Service +public class StarterService extends AbstractElementService implements AutoCloseable { + + private ScheduledThreadPoolExecutor timerPool; + private ScheduledFuture<?> future; + private ToscaConceptIdentifier receiver; + private ToscaConceptIdentifier elementId; + + private final MessagePublisher messagePublisher; + + public StarterService(MessagePublisher messagePublisher, AcElement acElement) { + this.messagePublisher = messagePublisher; + this.elementId = acElement.getElementId(); + } + + @Override + public ElementType getType() { + return ElementType.STARTER; + } + + /** + * Deactivate Scheduled ThreadPool Executor. + */ + @Override + public void deactivate() { + if (timerPool != null) { + if (future != null) { + future.cancel(true); + } + timerPool.shutdown(); + timerPool = null; + } + } + + @Override + public void active(ElementConfig elementConfig) { + if (timerPool != null) { + throw new PfModelRuntimeException(Response.Status.CONFLICT, "StarterService alredy actived!"); + } + receiver = elementConfig.getElementId(); + + timerPool = new ScheduledThreadPoolExecutor(1); + timerPool.setRemoveOnCancelPolicy(true); + future = timerPool.scheduleAtFixedRate(this::sendMessage, elementConfig.getTimerSec(), + elementConfig.getTimerSec(), TimeUnit.MILLISECONDS); + } + + private void sendMessage() { + var messasge = new ElementStatus(); + messasge.setElementId(receiver); + // Add Tracking + messasge.setMessage("starter: " + elementId); + messagePublisher.publishMsg(messasge); + } + + @Override + public void update(ElementConfig elementConfig) { + if (timerPool == null) { + throw new PfModelRuntimeException(Response.Status.CONFLICT, "StarterService not actived!"); + } + if (future != null) { + future.cancel(true); + } + future = timerPool.scheduleAtFixedRate(this::sendMessage, elementConfig.getTimerSec(), + elementConfig.getTimerSec(), TimeUnit.MILLISECONDS); + } + + @Override + public void close() throws Exception { + deactivate(); + } +} |