diff options
author | ramya <ramya.ravichandran5@wipro.com> | 2019-03-11 17:34:41 +0530 |
---|---|---|
committer | ramya <ramya.ravichandran5@wipro.com> | 2019-03-11 17:37:12 +0530 |
commit | 3cd4023832810e8b65f85a9db807c8bafec38d82 (patch) | |
tree | 13175e409ca3b29eccffacd75538ded25cfdb518 /src/main/java/com/wipro/www/sonhms/dmaap | |
parent | 79f9e63b3e531ae7ca4eaa7224ab8a740ad578cb (diff) |
code coverage for M3
Change-Id: Ib421ce2ac9357a3210d99293497f9e304f8cd2f5
Issue-ID: DCAEGEN2-1259
Signed-off-by: Ramya Ravichandran<ramya.ravichandran5@wipro.com>
Diffstat (limited to 'src/main/java/com/wipro/www/sonhms/dmaap')
4 files changed, 0 insertions, 384 deletions
diff --git a/src/main/java/com/wipro/www/sonhms/dmaap/DmaapClient.java b/src/main/java/com/wipro/www/sonhms/dmaap/DmaapClient.java deleted file mode 100644 index 051b00e..0000000 --- a/src/main/java/com/wipro/www/sonhms/dmaap/DmaapClient.java +++ /dev/null @@ -1,215 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * pcims - * ================================================================================ - * Copyright (C) 2018 Wipro Limited. - * ============================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - ******************************************************************************/ - -package com.wipro.www.sonhms.dmaap; - -import com.att.nsa.apiClient.http.HttpException; -import com.att.nsa.cambria.client.CambriaClient; -import com.att.nsa.cambria.client.CambriaClientBuilders; -import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; -import com.att.nsa.cambria.client.CambriaClientBuilders.TopicManagerBuilder; -import com.att.nsa.cambria.client.CambriaConsumer; -import com.att.nsa.cambria.client.CambriaTopicManager; -import com.wipro.www.sonhms.Configuration; -import com.wipro.www.sonhms.NewNotification; -import com.wipro.www.sonhms.Topic; -import com.wipro.www.sonhms.dao.DmaapNotificationsRepository; -import com.wipro.www.sonhms.entity.DmaapNotifications; - - -import java.io.IOException; -import java.net.MalformedURLException; -import java.security.GeneralSecurityException; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -@Component -public class DmaapClient { - - @Autowired - private DmaapNotificationsRepository dmaapNotificationsRepository; - private Configuration configuration; - private static Logger log = LoggerFactory.getLogger(DmaapClient.class); - private static final String CONSUMER = "CONSUMER"; - private static final String PRODUCER = "PRODUCER"; - private static final String DESCRIPTION = "api keys for OOF PCI use case"; - private static final int PARTITION_COUNT = 1; - private static final int REPLICATION_COUNT = 1; - private NewNotification newNotification; - private CambriaTopicManager topicManager; - - public class NotificationCallback { - DmaapClient dmaapClient; - - public NotificationCallback(DmaapClient dmaapClient) { - this.dmaapClient = dmaapClient; - } - - public void activateCallBack(String msg) { - handleNotification(msg); - } - - private void handleNotification(String msg) { - DmaapNotifications dmaapNotification = new DmaapNotifications(); - dmaapNotification.setNotification(msg); - if (log.isDebugEnabled()) { - log.debug(dmaapNotification.toString()); - } - dmaapNotificationsRepository.save(dmaapNotification); - newNotification.setNewNotif(true); - } - } - - /** - * init dmaap client. - */ - public void initClient(NewNotification newNotification) { - log.debug("initializing client"); - configuration = Configuration.getInstance(); - if (log.isDebugEnabled()) { - log.debug(configuration.toString()); - } - this.newNotification = newNotification; - - createAndConfigureTopics(); - startClient(); - } - - /** - * create and configures topics. - */ - private void createAndConfigureTopics() { - - try { - topicManager = buildCambriaClient(new TopicManagerBuilder().usingHosts(configuration.getServers()) - .authenticatedBy(configuration.getManagerApiKey(), configuration.getManagerSecretKey())); - } catch (GeneralSecurityException | IOException e) { - log.debug("exception during creating topic", e); - } - List<Topic> topics = configuration.getTopics(); - - for (Topic topic : topics) { - Set<String> topicsInDmaap = getAllTopicsFromDmaap(); - - createTopic(topic, topicsInDmaap); - subscribeToTopic(topic.getName(), topic.getProducer(), PRODUCER); - subscribeToTopic(topic.getName(), topic.getConsumer(), CONSUMER); - - } - - topicManager.close(); - - } - - /** - * create topic. - */ - private void createTopic(Topic topic, Set<String> topicsInDmaap) { - if (topicsInDmaap.contains(topic.getName())) { - log.debug("topic exists in dmaap"); - } else { - try { - topicManager.createTopic(topic.getName(), DESCRIPTION, PARTITION_COUNT, REPLICATION_COUNT); - } catch (HttpException | IOException e) { - log.debug("error while creating topic: {}", e); - } - } - } - - /** - * get all topics from dmaap. - */ - private Set<String> getAllTopicsFromDmaap() { - Set<String> topics = new HashSet<>(); - try { - topics = topicManager.getTopics(); - } catch (IOException e) { - log.debug("IOException while fetching topics"); - } - return topics; - - } - - /** - * start dmaap client. - */ - private synchronized void startClient() { - - ScheduledExecutorService executorPool; - CambriaConsumer cambriaConsumer = null; - - try { - cambriaConsumer = new ConsumerBuilder() - .authenticatedBy(configuration.getPcimsApiKey(), configuration.getPcimsSecretKey()) - .knownAs(configuration.getCg(), configuration.getCid()).onTopic(configuration.getSdnrTopic()) - .usingHosts(configuration.getServers()).withSocketTimeout(configuration.getPollingTimeout() * 1000) - .build(); - - // create notification consumers for SNDR and policy - NotificationConsumer notificationConsumer = new NotificationConsumer(cambriaConsumer, - new NotificationCallback(this)); - - // start notification consumer threads - executorPool = Executors.newScheduledThreadPool(10); - executorPool.scheduleAtFixedRate(notificationConsumer, 0, configuration.getPollingInterval(), - TimeUnit.SECONDS); - } catch (MalformedURLException | GeneralSecurityException e) { - log.debug("exception during starting client", e); - } - - } - - /** - * subscribe to topic. - */ - private void subscribeToTopic(String topicName, String subscriberApiKey, String subscriberType) { - if (subscriberType.equals(PRODUCER)) { - try { - topicManager.allowProducer(topicName, subscriberApiKey); - } catch (HttpException | IOException e) { - log.debug("error while subscribing to a topic: {}", e); - } - } else if (subscriberType.equals(CONSUMER)) { - try { - topicManager.allowConsumer(topicName, subscriberApiKey); - } catch (HttpException | IOException e) { - log.debug("error while subscribing to a topic: {}", e); - } - } - - } - - @SuppressWarnings("unchecked") - private static <T extends CambriaClient> T buildCambriaClient( - CambriaClientBuilders.AbstractAuthenticatedManagerBuilder<? extends CambriaClient> client) - throws MalformedURLException, GeneralSecurityException { - return (T) client.build(); - } - -} diff --git a/src/main/java/com/wipro/www/sonhms/dmaap/NotificationConsumer.java b/src/main/java/com/wipro/www/sonhms/dmaap/NotificationConsumer.java deleted file mode 100644 index 1bdbac2..0000000 --- a/src/main/java/com/wipro/www/sonhms/dmaap/NotificationConsumer.java +++ /dev/null @@ -1,59 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * pcims - * ================================================================================ - * Copyright (C) 2018 Wipro Limited. - * ============================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - ******************************************************************************/ - -package com.wipro.www.sonhms.dmaap; - -import com.att.nsa.cambria.client.CambriaConsumer; -import com.wipro.www.sonhms.dmaap.DmaapClient.NotificationCallback; - -import java.io.IOException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class NotificationConsumer implements Runnable { - - private static Logger log = LoggerFactory.getLogger(NotificationConsumer.class); - private CambriaConsumer cambriaConsumer; - private NotificationCallback notificationCallback; - - /** - * Parameterized Constructor. - */ - public NotificationConsumer(CambriaConsumer cambriaConsumer, NotificationCallback notificationCallback) { - super(); - this.cambriaConsumer = cambriaConsumer; - this.notificationCallback = notificationCallback; - } - - @Override - public void run() { - try { - Iterable<String> msgs = cambriaConsumer.fetch(); - for (String msg : msgs) { - log.debug(msg); - notificationCallback.activateCallBack(msg); - } - } catch (IOException e) { - log.debug("exception when fetching msgs from dmaap", e); - } - - } -} diff --git a/src/main/java/com/wipro/www/sonhms/dmaap/NotificationProducer.java b/src/main/java/com/wipro/www/sonhms/dmaap/NotificationProducer.java deleted file mode 100644 index 7dd505f..0000000 --- a/src/main/java/com/wipro/www/sonhms/dmaap/NotificationProducer.java +++ /dev/null @@ -1,57 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * pcims - * ================================================================================ - * Copyright (C) 2018 Wipro Limited. - * ============================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - ******************************************************************************/ - -package com.wipro.www.sonhms.dmaap; - -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; - -import java.io.IOException; -import java.security.GeneralSecurityException; -import java.util.List; - -public class NotificationProducer { - - private List<String> servers; - private String apiKey; - private String secret; - - /** - * Parameterised constructor. - */ - public NotificationProducer(List<String> servers, String apiKey, String secret) { - super(); - this.servers = servers; - this.apiKey = apiKey; - this.secret = secret; - } - - /** - * sends notification to dmaap. - */ - public int sendNotification(String topic, String msg) throws GeneralSecurityException, IOException { - CambriaBatchingPublisher cambriaBatchingPublisher = null; - cambriaBatchingPublisher = new PublisherBuilder().usingHosts(servers).onTopic(topic) - .authenticatedBy(apiKey, secret).build(); - return cambriaBatchingPublisher.send("", msg); - - } - -} diff --git a/src/main/java/com/wipro/www/sonhms/dmaap/PolicyDmaapClient.java b/src/main/java/com/wipro/www/sonhms/dmaap/PolicyDmaapClient.java deleted file mode 100644 index f1491b4..0000000 --- a/src/main/java/com/wipro/www/sonhms/dmaap/PolicyDmaapClient.java +++ /dev/null @@ -1,53 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * pcims - * ================================================================================ - * Copyright (C) 2018 Wipro Limited. - * ============================================================================== - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - ******************************************************************************/ - -package com.wipro.www.sonhms.dmaap; - -import com.wipro.www.sonhms.Configuration; - -import java.io.IOException; -import java.security.GeneralSecurityException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public class PolicyDmaapClient { - - private static Logger log = LoggerFactory.getLogger(PolicyDmaapClient.class); - - /** - * Method stub for sending notification to policy. - */ - public boolean sendNotificationToPolicy(String msg) { - - Configuration configuration = Configuration.getInstance(); - NotificationProducer notificationProducer = new NotificationProducer(configuration.getServers(), - configuration.getPcimsApiKey(), configuration.getPcimsSecretKey()); - try { - int result = notificationProducer.sendNotification(configuration.getPolicyTopic(), msg); - log.debug("result: {}", result); - } catch (GeneralSecurityException | IOException e) { - log.debug("exception when sending notification to policy", e); - return false; - } - return true; - } -} |