diff options
Diffstat (limited to 'src/main/java/com/wipro/www/sonhms/dmaap/DmaapClient.java')
-rw-r--r-- | src/main/java/com/wipro/www/sonhms/dmaap/DmaapClient.java | 215 |
1 files changed, 0 insertions, 215 deletions
diff --git a/src/main/java/com/wipro/www/sonhms/dmaap/DmaapClient.java b/src/main/java/com/wipro/www/sonhms/dmaap/DmaapClient.java deleted file mode 100644 index 051b00e..0000000 --- a/src/main/java/com/wipro/www/sonhms/dmaap/DmaapClient.java +++ /dev/null @@ -1,215 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * pcims - * ================================================================================ - * Copyright (C) 2018 Wipro Limited. - * ============================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - ******************************************************************************/ - -package com.wipro.www.sonhms.dmaap; - -import com.att.nsa.apiClient.http.HttpException; -import com.att.nsa.cambria.client.CambriaClient; -import com.att.nsa.cambria.client.CambriaClientBuilders; -import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; -import com.att.nsa.cambria.client.CambriaClientBuilders.TopicManagerBuilder; -import com.att.nsa.cambria.client.CambriaConsumer; -import com.att.nsa.cambria.client.CambriaTopicManager; -import com.wipro.www.sonhms.Configuration; -import com.wipro.www.sonhms.NewNotification; -import com.wipro.www.sonhms.Topic; -import com.wipro.www.sonhms.dao.DmaapNotificationsRepository; -import com.wipro.www.sonhms.entity.DmaapNotifications; - - -import java.io.IOException; -import java.net.MalformedURLException; -import java.security.GeneralSecurityException; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -@Component -public class DmaapClient { - - @Autowired - private DmaapNotificationsRepository dmaapNotificationsRepository; - private Configuration configuration; - private static Logger log = LoggerFactory.getLogger(DmaapClient.class); - private static final String CONSUMER = "CONSUMER"; - private static final String PRODUCER = "PRODUCER"; - private static final String DESCRIPTION = "api keys for OOF PCI use case"; - private static final int PARTITION_COUNT = 1; - private static final int REPLICATION_COUNT = 1; - private NewNotification newNotification; - private CambriaTopicManager topicManager; - - public class NotificationCallback { - DmaapClient dmaapClient; - - public NotificationCallback(DmaapClient dmaapClient) { - this.dmaapClient = dmaapClient; - } - - public void activateCallBack(String msg) { - handleNotification(msg); - } - - private void handleNotification(String msg) { - DmaapNotifications dmaapNotification = new DmaapNotifications(); - dmaapNotification.setNotification(msg); - if (log.isDebugEnabled()) { - log.debug(dmaapNotification.toString()); - } - dmaapNotificationsRepository.save(dmaapNotification); - newNotification.setNewNotif(true); - } - } - - /** - * init dmaap client. - */ - public void initClient(NewNotification newNotification) { - log.debug("initializing client"); - configuration = Configuration.getInstance(); - if (log.isDebugEnabled()) { - log.debug(configuration.toString()); - } - this.newNotification = newNotification; - - createAndConfigureTopics(); - startClient(); - } - - /** - * create and configures topics. - */ - private void createAndConfigureTopics() { - - try { - topicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(configuration.getServers()) - .authenticatedBy(configuration.getManagerApiKey(), configuration.getManagerSecretKey())); - } catch (GeneralSecurityException | IOException e) { - log.debug("exception during creating topic", e); - } - List<Topic> topics = configuration.getTopics(); - - for (Topic topic : topics) { - Set<String> topicsInDmaap = getAllTopicsFromDmaap(); - - createTopic(topic, topicsInDmaap); - subscribeToTopic(topic.getName(), topic.getProducer(), PRODUCER); - subscribeToTopic(topic.getName(), topic.getConsumer(), CONSUMER); - - } - - topicManager.close(); - - } - - /** - * create topic. - */ - private void createTopic(Topic topic, Set<String> topicsInDmaap) { - if (topicsInDmaap.contains(topic.getName())) { - log.debug("topic exists in dmaap"); - } else { - try { - topicManager.createTopic(topic.getName(), DESCRIPTION, PARTITION_COUNT, REPLICATION_COUNT); - } catch (HttpException | IOException e) { - log.debug("error while creating topic: {}", e); - } - } - } - - /** - * get all topics from dmaap. - */ - private Set<String> getAllTopicsFromDmaap() { - Set<String> topics = new HashSet<>(); - try { - topics = topicManager.getTopics(); - } catch (IOException e) { - log.debug("IOException while fetching topics"); - } - return topics; - - } - - /** - * start dmaap client. - */ - private synchronized void startClient() { - - ScheduledExecutorService executorPool; - CambriaConsumer cambriaConsumer = null; - - try { - cambriaConsumer = new ConsumerBuilder() - .authenticatedBy(configuration.getPcimsApiKey(), configuration.getPcimsSecretKey()) - .knownAs(configuration.getCg(), configuration.getCid()).onTopic(configuration.getSdnrTopic()) - .usingHosts(configuration.getServers()).withSocketTimeout(configuration.getPollingTimeout() * 1000) - .build(); - - // create notification consumers for SNDR and policy - NotificationConsumer notificationConsumer = new NotificationConsumer(cambriaConsumer, - new NotificationCallback(this)); - - // start notification consumer threads - executorPool = Executors.newScheduledThreadPool(10); - executorPool.scheduleAtFixedRate(notificationConsumer, 0, configuration.getPollingInterval(), - TimeUnit.SECONDS); - } catch (MalformedURLException | GeneralSecurityException e) { - log.debug("exception during starting client", e); - } - - } - - /** - * subscribe to topic. - */ - private void subscribeToTopic(String topicName, String subscriberApiKey, String subscriberType) { - if (subscriberType.equals(PRODUCER)) { - try { - topicManager.allowProducer(topicName, subscriberApiKey); - } catch (HttpException | IOException e) { - log.debug("error while subscribing to a topic: {}", e); - } - } else if (subscriberType.equals(CONSUMER)) { - try { - topicManager.allowConsumer(topicName, subscriberApiKey); - } catch (HttpException | IOException e) { - log.debug("error while subscribing to a topic: {}", e); - } - } - - } - - @SuppressWarnings("unchecked") - private static <T extends CambriaClient> T buildCambriaClient( - CambriaClientBuilders.AbstractAuthenticatedManagerBuilder<? extends CambriaClient> client) - throws MalformedURLException, GeneralSecurityException { - return (T) client.build(); - } - -} |