diff options
Diffstat (limited to 'src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java')
-rw-r--r-- | src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java | 255 |
1 files changed, 82 insertions, 173 deletions
diff --git a/src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java b/src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java index 002f835..91e7117 100644 --- a/src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java +++ b/src/main/java/org/onap/dcaegen2/services/sonhms/dmaap/DmaapClient.java @@ -21,25 +21,16 @@ package org.onap.dcaegen2.services.sonhms.dmaap; -import com.att.nsa.apiClient.http.HttpException; -import com.att.nsa.cambria.client.CambriaClient; -import com.att.nsa.cambria.client.CambriaClientBuilders; -import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; -import com.att.nsa.cambria.client.CambriaClientBuilders.TopicManagerBuilder; import com.att.nsa.cambria.client.CambriaConsumer; -import com.att.nsa.cambria.client.CambriaTopicManager; + import org.onap.dcaegen2.services.sonhms.Configuration; import org.onap.dcaegen2.services.sonhms.NewNotification; -import org.onap.dcaegen2.services.sonhms.Topic; import org.onap.dcaegen2.services.sonhms.dao.DmaapNotificationsRepository; import org.onap.dcaegen2.services.sonhms.entity.DmaapNotifications; +import org.onap.dcaegen2.services.sonhms.utils.DmaapUtils; + -import java.io.IOException; -import java.net.MalformedURLException; -import java.security.GeneralSecurityException; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -52,164 +43,82 @@ import org.springframework.stereotype.Component; @Component public class DmaapClient { - @Autowired - private DmaapNotificationsRepository dmaapNotificationsRepository; - private Configuration configuration; - private static Logger log = LoggerFactory.getLogger(DmaapClient.class); - private static final String CONSUMER = "CONSUMER"; - private static final String PRODUCER = "PRODUCER"; - private static final String DESCRIPTION = "api keys for OOF PCI use case"; - private static final int PARTITION_COUNT = 1; - private static final int REPLICATION_COUNT = 1; - private NewNotification newNotification; - private CambriaTopicManager topicManager; - - public class NotificationCallback { - DmaapClient dmaapClient; - - public NotificationCallback(DmaapClient dmaapClient) { - this.dmaapClient = dmaapClient; - } - - public void activateCallBack(String msg) { - handleNotification(msg); - } - - private void handleNotification(String msg) { - DmaapNotifications dmaapNotification = new DmaapNotifications(); - dmaapNotification.setNotification(msg); - if (log.isDebugEnabled()) { - log.debug(dmaapNotification.toString()); - } - dmaapNotificationsRepository.save(dmaapNotification); - newNotification.setNewNotif(true); - } - } - - /** - * init dmaap client. - */ - public void initClient(NewNotification newNotification) { - log.debug("initializing client"); - configuration = Configuration.getInstance(); - if (log.isDebugEnabled()) { - log.debug(configuration.toString()); - } - this.newNotification = newNotification; - - createAndConfigureTopics(); - startClient(); - } - - /** - * create and configures topics. - */ - private void createAndConfigureTopics() { - - try { - topicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(configuration.getServers()) - .authenticatedBy(configuration.getManagerApiKey(), configuration.getManagerSecretKey())); - } catch (GeneralSecurityException | IOException e) { - log.debug("exception during creating topic", e); - } - List<Topic> topics = configuration.getTopics(); - - for (Topic topic : topics) { - Set<String> topicsInDmaap = getAllTopicsFromDmaap(); - - createTopic(topic, topicsInDmaap); - subscribeToTopic(topic.getName(), topic.getProducer(), PRODUCER); - subscribeToTopic(topic.getName(), topic.getConsumer(), CONSUMER); - - } - - topicManager.close(); - - } - - /** - * create topic. - */ - private void createTopic(Topic topic, Set<String> topicsInDmaap) { - if (topicsInDmaap.contains(topic.getName())) { - log.debug("topic exists in dmaap"); - } else { - try { - topicManager.createTopic(topic.getName(), DESCRIPTION, PARTITION_COUNT, REPLICATION_COUNT); - } catch (HttpException | IOException e) { - log.debug("error while creating topic: {}", e); - } - } - } - - /** - * get all topics from dmaap. - */ - private Set<String> getAllTopicsFromDmaap() { - Set<String> topics = new HashSet<>(); - try { - topics = topicManager.getTopics(); - } catch (IOException e) { - log.debug("IOException while fetching topics"); - } - return topics; - - } - - /** - * start dmaap client. - */ - private synchronized void startClient() { - - ScheduledExecutorService executorPool; - CambriaConsumer cambriaConsumer = null; - - try { - cambriaConsumer = new ConsumerBuilder() - .authenticatedBy(configuration.getPcimsApiKey(), configuration.getPcimsSecretKey()) - .knownAs(configuration.getCg(), configuration.getCid()).onTopic(configuration.getSdnrTopic()) - .usingHosts(configuration.getServers()).withSocketTimeout(configuration.getPollingTimeout() * 1000) - .build(); - - // create notification consumers for SNDR and policy - NotificationConsumer notificationConsumer = new NotificationConsumer(cambriaConsumer, - new NotificationCallback(this)); - - // start notification consumer threads - executorPool = Executors.newScheduledThreadPool(10); - executorPool.scheduleAtFixedRate(notificationConsumer, 0, configuration.getPollingInterval(), - TimeUnit.SECONDS); - } catch (MalformedURLException | GeneralSecurityException e) { - log.debug("exception during starting client", e); - } - - } - - /** - * subscribe to topic. - */ - private void subscribeToTopic(String topicName, String subscriberApiKey, String subscriberType) { - if (subscriberType.equals(PRODUCER)) { - try { - topicManager.allowProducer(topicName, subscriberApiKey); - } catch (HttpException | IOException e) { - log.debug("error while subscribing to a topic: {}", e); - } - } else if (subscriberType.equals(CONSUMER)) { - try { - topicManager.allowConsumer(topicName, subscriberApiKey); - } catch (HttpException | IOException e) { - log.debug("error while subscribing to a topic: {}", e); - } - } - - } - - @SuppressWarnings("unchecked") - private static <T extends CambriaClient> T buildCambriaClient( - CambriaClientBuilders.AbstractAuthenticatedManagerBuilder<? extends CambriaClient> client) - throws MalformedURLException, GeneralSecurityException { - return (T) client.build(); - } - + @Autowired + private DmaapNotificationsRepository dmaapNotificationsRepository; + private Configuration configuration; + private static Logger log = LoggerFactory.getLogger(DmaapClient.class); + + @Autowired + private NewNotification newNotification; + private DmaapUtils dmaapUtils; + + public class NotificationCallback { + DmaapClient dmaapClient; + + public NotificationCallback(DmaapClient dmaapClient) { + this.dmaapClient = dmaapClient; + } + + public void activateCallBack(String msg) { + handleNotification(msg); + } + + private void handleNotification(String msg) { + DmaapNotifications dmaapNotification = new DmaapNotifications(); + dmaapNotification.setNotification(msg); + if (log.isDebugEnabled()) { + log.debug(dmaapNotification.toString()); + } + dmaapNotificationsRepository.save(dmaapNotification); + newNotification.setNewNotif(true); + } + } + + /** + * init dmaap client. + */ + public void initClient() { + log.debug("initializing client"); + configuration = Configuration.getInstance(); + if (log.isDebugEnabled()) { + log.debug(configuration.toString()); + } + + startClient(); + } + + + /** + * start dmaap client. + */ + @SuppressWarnings("unchecked") + private synchronized void startClient() { + + Map<String,Object> streamSubscribes= Configuration.getInstance().getStreamsSubscribes(); + String sdnrTopicUrl =((Map<String,String>)((Map<String,Object>)streamSubscribes.get("nbr_list_change_topic")).get("dmaap_info")).get("topic_url"); + String[] sdnrTopicSplit=sdnrTopicUrl.split("\\/"); + String sdnrTopic=sdnrTopicSplit[sdnrTopicSplit.length-1]; + ScheduledExecutorService executorPool; + CambriaConsumer cambriaConsumer = null; + + cambriaConsumer = dmaapUtils.buildConsumer(configuration, sdnrTopic ); + /* + * cambriaConsumer = new ConsumerBuilder() + * .authenticatedBy(configuration.getPcimsApiKey(), + * configuration.getPcimsSecretKey()) .knownAs(configuration.getCg(), + * configuration.getCid()).onTopic(configuration.getSdnrTopic()) + * .usingHosts(configuration.getServers()).withSocketTimeout(configuration. + * getPollingTimeout() * 1000) .build(); + */ + + // create notification consumers for SNDR and policy + NotificationConsumer notificationConsumer = new NotificationConsumer(cambriaConsumer, + new NotificationCallback(this)); + + // start notification consumer threads + executorPool = Executors.newScheduledThreadPool(10); + executorPool.scheduleAtFixedRate(notificationConsumer, 0, configuration.getPollingInterval(), TimeUnit.SECONDS); + + } + } |