aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java')
-rw-r--r--src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java215
1 files changed, 215 insertions, 0 deletions
diff --git a/src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java b/src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java
new file mode 100644
index 0000000..002f835
--- /dev/null
+++ b/src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java
@@ -0,0 +1,215 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * son-handler
+ * ================================================================================
+ * Copyright (C) 2019 Wipro Limited.
+ * ==============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ *******************************************************************************/
+
+package org.onap.dcaegen2.services.sonhms.dmaap;
+
+import com.att.nsa.apiClient.http.HttpException;
+import com.att.nsa.cambria.client.CambriaClient;
+import com.att.nsa.cambria.client.CambriaClientBuilders;
+import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
+import com.att.nsa.cambria.client.CambriaClientBuilders.TopicManagerBuilder;
+import com.att.nsa.cambria.client.CambriaConsumer;
+import com.att.nsa.cambria.client.CambriaTopicManager;
+import org.onap.dcaegen2.services.sonhms.Configuration;
+import org.onap.dcaegen2.services.sonhms.NewNotification;
+import org.onap.dcaegen2.services.sonhms.Topic;
+import org.onap.dcaegen2.services.sonhms.dao.DmaapNotificationsRepository;
+import org.onap.dcaegen2.services.sonhms.entity.DmaapNotifications;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.security.GeneralSecurityException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class DmaapClient {
+
+ @Autowired
+ private DmaapNotificationsRepository dmaapNotificationsRepository;
+ private Configuration configuration;
+ private static Logger log = LoggerFactory.getLogger(DmaapClient.class);
+ private static final String CONSUMER = "CONSUMER";
+ private static final String PRODUCER = "PRODUCER";
+ private static final String DESCRIPTION = "api keys for OOF PCI use case";
+ private static final int PARTITION_COUNT = 1;
+ private static final int REPLICATION_COUNT = 1;
+ private NewNotification newNotification;
+ private CambriaTopicManager topicManager;
+
+ public class NotificationCallback {
+ DmaapClient dmaapClient;
+
+ public NotificationCallback(DmaapClient dmaapClient) {
+ this.dmaapClient = dmaapClient;
+ }
+
+ public void activateCallBack(String msg) {
+ handleNotification(msg);
+ }
+
+ private void handleNotification(String msg) {
+ DmaapNotifications dmaapNotification = new DmaapNotifications();
+ dmaapNotification.setNotification(msg);
+ if (log.isDebugEnabled()) {
+ log.debug(dmaapNotification.toString());
+ }
+ dmaapNotificationsRepository.save(dmaapNotification);
+ newNotification.setNewNotif(true);
+ }
+ }
+
+ /**
+ * init dmaap client.
+ */
+ public void initClient(NewNotification newNotification) {
+ log.debug("initializing client");
+ configuration = Configuration.getInstance();
+ if (log.isDebugEnabled()) {
+ log.debug(configuration.toString());
+ }
+ this.newNotification = newNotification;
+
+ createAndConfigureTopics();
+ startClient();
+ }
+
+ /**
+ * create and configures topics.
+ */
+ private void createAndConfigureTopics() {
+
+ try {
+ topicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(configuration.getServers())
+ .authenticatedBy(configuration.getManagerApiKey(), configuration.getManagerSecretKey()));
+ } catch (GeneralSecurityException | IOException e) {
+ log.debug("exception during creating topic", e);
+ }
+ List<Topic> topics = configuration.getTopics();
+
+ for (Topic topic : topics) {
+ Set<String> topicsInDmaap = getAllTopicsFromDmaap();
+
+ createTopic(topic, topicsInDmaap);
+ subscribeToTopic(topic.getName(), topic.getProducer(), PRODUCER);
+ subscribeToTopic(topic.getName(), topic.getConsumer(), CONSUMER);
+
+ }
+
+ topicManager.close();
+
+ }
+
+ /**
+ * create topic.
+ */
+ private void createTopic(Topic topic, Set<String> topicsInDmaap) {
+ if (topicsInDmaap.contains(topic.getName())) {
+ log.debug("topic exists in dmaap");
+ } else {
+ try {
+ topicManager.createTopic(topic.getName(), DESCRIPTION, PARTITION_COUNT, REPLICATION_COUNT);
+ } catch (HttpException | IOException e) {
+ log.debug("error while creating topic: {}", e);
+ }
+ }
+ }
+
+ /**
+ * get all topics from dmaap.
+ */
+ private Set<String> getAllTopicsFromDmaap() {
+ Set<String> topics = new HashSet<>();
+ try {
+ topics = topicManager.getTopics();
+ } catch (IOException e) {
+ log.debug("IOException while fetching topics");
+ }
+ return topics;
+
+ }
+
+ /**
+ * start dmaap client.
+ */
+ private synchronized void startClient() {
+
+ ScheduledExecutorService executorPool;
+ CambriaConsumer cambriaConsumer = null;
+
+ try {
+ cambriaConsumer = new ConsumerBuilder()
+ .authenticatedBy(configuration.getPcimsApiKey(), configuration.getPcimsSecretKey())
+ .knownAs(configuration.getCg(), configuration.getCid()).onTopic(configuration.getSdnrTopic())
+ .usingHosts(configuration.getServers()).withSocketTimeout(configuration.getPollingTimeout() * 1000)
+ .build();
+
+ // create notification consumers for SNDR and policy
+ NotificationConsumer notificationConsumer = new NotificationConsumer(cambriaConsumer,
+ new NotificationCallback(this));
+
+ // start notification consumer threads
+ executorPool = Executors.newScheduledThreadPool(10);
+ executorPool.scheduleAtFixedRate(notificationConsumer, 0, configuration.getPollingInterval(),
+ TimeUnit.SECONDS);
+ } catch (MalformedURLException | GeneralSecurityException e) {
+ log.debug("exception during starting client", e);
+ }
+
+ }
+
+ /**
+ * subscribe to topic.
+ */
+ private void subscribeToTopic(String topicName, String subscriberApiKey, String subscriberType) {
+ if (subscriberType.equals(PRODUCER)) {
+ try {
+ topicManager.allowProducer(topicName, subscriberApiKey);
+ } catch (HttpException | IOException e) {
+ log.debug("error while subscribing to a topic: {}", e);
+ }
+ } else if (subscriberType.equals(CONSUMER)) {
+ try {
+ topicManager.allowConsumer(topicName, subscriberApiKey);
+ } catch (HttpException | IOException e) {
+ log.debug("error while subscribing to a topic: {}", e);
+ }
+ }
+
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T extends CambriaClient> T buildCambriaClient(
+ CambriaClientBuilders.AbstractAuthenticatedManagerBuilder<? extends CambriaClient> client)
+ throws MalformedURLException, GeneralSecurityException {
+ return (T) client.build();
+ }
+
+}