diff options
Diffstat (limited to 'sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java')
-rw-r--r-- | sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java | 155 |
1 files changed, 155 insertions, 0 deletions
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java new file mode 100644 index 000000000..249eb612e --- /dev/null +++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java @@ -0,0 +1,155 @@ +/* + * ============LICENSE_START======================================================================== + * ONAP : ccsdk feature sdnr wt mountpoint-registrar + * ================================================================================================= + * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved. + * Copyright (C) 2021 Samsung Electronics Intellectual Property. All rights reserved. + * ================================================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * ============LICENSE_END========================================================================== + */ + +package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.List; +import java.util.Properties; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig; +import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.kafka.VESMsgKafkaConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class StrimziKafkaVESMsgConsumerImpl + implements StrimziKafkaVESMsgConsumer, StrimziKafkaVESMsgValidator { + + private static final Logger LOG = LoggerFactory.getLogger(StrimziKafkaVESMsgConsumerImpl.class); + private static final String DEFAULT_SDNRUSER = "admin"; + private static final String DEFAULT_SDNRPASSWD = "admin"; + + private final String name = this.getClass().getSimpleName(); + private VESMsgKafkaConsumer consumer = null; + private boolean running = false; + private boolean ready = false; + private int fetchPause = 5000; // Default pause between fetch - 5 seconds + protected final GeneralConfig generalConfig; + + protected StrimziKafkaVESMsgConsumerImpl(GeneralConfig generalConfig) { + this.generalConfig = generalConfig; + } + + /* + * Thread to fetch messages from the Kafka topic. Waits for the messages to + * arrive on the topic until a certain timeout and returns. If no data arrives + * on the topic, sleeps for a certain time period before checking again + */ + @Override + public void run() { + + if (ready) { + running = true; + while (running) { + try { + boolean noData = true; + List<String> consumerResponse = null; + consumerResponse = consumer.poll(); + for (String msg : consumerResponse) { + noData = false; + LOG.debug("{} received ActualMessage from Kafka VES Message topic {}", name, msg); + if (isMessageValid(msg)) { + processMsg(msg); + } + } + + if (noData) { + pauseThread(); + } + } catch (InterruptedException e) { + LOG.warn("Caught exception reading from Kafka Message Topic", e); + Thread.currentThread().interrupt(); + } catch (JsonProcessingException jsonProcessingException) { + LOG.warn("Failed to convert message to JsonNode: {}", jsonProcessingException.getMessage()); + } catch (InvalidMessageException invalidMessageException) { + LOG.warn("Message is invalid because of: {}", invalidMessageException.getMessage()); + } catch (Exception e) { + LOG.error("Caught exception reading from Kafka Message Topic", e); + running = false; + } + } + } + } + + @Override + public boolean isMessageValid(String message) { + return true; + } + + protected JsonNode convertMessageToJsonNode(String message) throws JsonProcessingException { + return new ObjectMapper().readTree(message); + } + + /* + * Create a Kafka consumer by specifying properties containing information such as + * topic name, timeout, URL etc + */ + @Override + public void init(Properties strimziKafkaProperties, Properties consumerProperties) { + + try { + this.consumer = new VESMsgKafkaConsumer(strimziKafkaProperties, consumerProperties); + this.consumer.subscribe(consumerProperties.getProperty("topic")); + ready = true; + } catch (Exception e) { + LOG.error("Error initializing Kafka Message consumer from file {} {}", consumerProperties, e); + } + } + + private void pauseThread() throws InterruptedException { + if (fetchPause > 0) { + LOG.debug("No data received from fetch. Pausing {} ms before retry", fetchPause); + Thread.sleep(fetchPause); + } else { + LOG.debug("No data received from fetch. No fetch pause specified - retrying immediately"); + } + } + + @Override + public boolean isReady() { + return ready; + } + + @Override + public boolean isRunning() { + return running; + } + + /* + * public String getProperty(String name) { return properties.getProperty(name, + * ""); } + */ + @Override + public void stopConsumer() { + running = false; + } + + public String getBaseUrl() { + return generalConfig.getBaseUrl(); + } + + public String getSDNRUser() { + return generalConfig.getSDNRUser() != null ? generalConfig.getSDNRUser() : DEFAULT_SDNRUSER; + } + + public String getSDNRPasswd() { + return generalConfig.getSDNRPasswd() != null ? generalConfig.getSDNRPasswd() : DEFAULT_SDNRPASSWD; + } +} |