summaryrefslogtreecommitdiffstats
path: root/src/main/java/com/att/dmf/mr/beans
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/att/dmf/mr/beans')
-rw-r--r--src/main/java/com/att/dmf/mr/beans/ApiKeyBean.java88
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java288
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPContext.java104
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java361
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java495
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPMetricsSet.java231
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPNsaApiDb.java140
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPZkClient.java45
-rw-r--r--src/main/java/com/att/dmf/mr/beans/DMaaPZkConfigDb.java51
-rw-r--r--src/main/java/com/att/dmf/mr/beans/LogDetails.java214
-rw-r--r--src/main/java/com/att/dmf/mr/beans/TopicBean.java155
11 files changed, 0 insertions, 2172 deletions
diff --git a/src/main/java/com/att/dmf/mr/beans/ApiKeyBean.java b/src/main/java/com/att/dmf/mr/beans/ApiKeyBean.java
deleted file mode 100644
index 4f0108f..0000000
--- a/src/main/java/com/att/dmf/mr/beans/ApiKeyBean.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
-*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.dmf.mr.beans;
-
-import java.io.Serializable;
-
-import javax.xml.bind.annotation.XmlRootElement;
-
-import com.att.nsa.drumlin.till.data.uniqueStringGenerator;
-/**
- *
- * @author anowarul.islam
- *
- */
-@XmlRootElement
-public class ApiKeyBean implements Serializable {
-
- private static final long serialVersionUID = -8219849086890567740L;
-
- private static final String KEY_CHARS = "ABCDEFGHJIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
-
- private String email;
- private String description;
- /**
- * constructor
- */
- public ApiKeyBean() {
- super();
- }
-/**
- *
- * @param email
- * @param description
- */
- public ApiKeyBean(String email, String description) {
- super();
- this.email = email;
- this.description = description;
- }
-
- public String getEmail() {
- return email;
- }
-
- public void setEmail(String email) {
- this.email = email;
- }
-
- public String getDescription() {
- return description;
- }
-
- public void setDescription(String description) {
- this.description = description;
- }
-
- public String getKey() {
- return generateKey(16);
- }
-
- public String getSharedSecret() {
- return generateKey(24);
- }
-
- private static String generateKey ( int length ) {
- return uniqueStringGenerator.createKeyUsingAlphabet ( KEY_CHARS, length );
- }
-
-}
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java b/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java
deleted file mode 100644
index 8cbf64f..0000000
--- a/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java
+++ /dev/null
@@ -1,288 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
-*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.dmf.mr.beans;
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.concurrent.TimeUnit;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.stereotype.Component;
-
-import com.att.dmf.mr.CambriaApiException;
-import com.att.dmf.mr.constants.CambriaConstants;
-import com.att.dmf.mr.exception.DMaaPResponseCode;
-import com.att.dmf.mr.exception.ErrorResponse;
-import com.att.dmf.mr.utils.Utils;
-
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
-import com.att.nsa.drumlin.till.nv.rrNvReadable;
-import com.att.nsa.drumlin.till.nv.rrNvReadable.invalidSettingValue;
-import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
-import com.att.nsa.metrics.impl.CdmRateTicker;
-
-/**
- * class provide rate information
- *
- * @author anowarul.islam
- *
- */
-@Component
-public class DMaaPCambriaLimiter {
- private final HashMap<String, RateInfo> fRateInfo;
- private final HashMap<String, RateInfoCheck> fRateInfoCheck;
- private final double fMaxEmptyPollsPerMinute;
- private final double fMaxPollsPerMinute;
- private final int fWindowLengthMins;
- private final long fSleepMs;
- private final long fSleepMs1;
- private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPCambriaLimiter.class);
-
- /**
- * constructor initializes
- *
- * @param settings
- * @throws missingReqdSetting
- * @throws invalidSettingValue
- */
- @Autowired
- public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings) {
- fRateInfo = new HashMap<>();
- fRateInfoCheck = new HashMap<>();
- fMaxEmptyPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxEmptyPollsPerMinute,
- CambriaConstants.kDefault_MaxEmptyPollsPerMinute);
- fMaxPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxPollsPerMinute,
- 30);
- fWindowLengthMins = settings.getInt(CambriaConstants.kSetting_RateLimitWindowLength,
- CambriaConstants.kDefault_RateLimitWindowLength);
- fSleepMs = settings.getLong(CambriaConstants.kSetting_SleepMsOnRateLimit,
- CambriaConstants.kDefault_SleepMsOnRateLimit);
- fSleepMs1 = settings.getLong(CambriaConstants.kSetting_SleepMsRealOnRateLimit,
- 5000);
-
- }
-
- /**
- * Construct a rate limiter.
- *
- * @param maxEmptyPollsPerMinute
- * Pass <= 0 to deactivate rate limiting.
- * @param windowLengthMins
- */
- public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute, double maxPollsPerMinute,int windowLengthMins) {
- this(maxEmptyPollsPerMinute,maxPollsPerMinute, windowLengthMins, getSleepMsForRate(maxEmptyPollsPerMinute),getSleepMsForRate(1));
- }
-
- /**
- * Construct a rate limiter
- *
- * @param maxEmptyPollsPerMinute
- * Pass <= 0 to deactivate rate limiting.
- * @param sleepMs
- * @param windowLengthMins
- */
- public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute,double maxPollsPerMinute, int windowLengthMins, long sleepMs ,long sleepMS1) {
- fRateInfo = new HashMap<>();
- fRateInfoCheck = new HashMap<>();
- fMaxEmptyPollsPerMinute = Math.max(0, maxEmptyPollsPerMinute);
- fMaxPollsPerMinute = Math.max(0, maxPollsPerMinute);
- fWindowLengthMins = windowLengthMins;
- fSleepMs = Math.max(0, sleepMs);
- fSleepMs1 = Math.max(0, sleepMS1);
- }
-
- /**
- * static method provide the sleep time
- *
- * @param ratePerMinute
- * @return
- */
- public static long getSleepMsForRate(double ratePerMinute) {
- if (ratePerMinute <= 0.0)
- return 0;
- return Math.max(1000, Math.round(60 * 1000 / ratePerMinute));
- }
-
- /**
- * Tell the rate limiter about a call to a topic/group/id. If the rate is
- * too high, this call delays its return and throws an exception.
- *
- * @param topic
- * @param consumerGroup
- * @param clientId
- * @throws CambriaApiException
- */
- public void onCall(String topic, String consumerGroup, String clientId,String remoteHost) throws CambriaApiException {
- // do nothing if rate is configured 0 or less
- if (fMaxEmptyPollsPerMinute <= 0) {
- return;
- }
- // setup rate info for this tuple
- final RateInfo ri = getRateInfo(topic, consumerGroup, clientId);
- final double rate = ri.onCall();
- log.info(ri.getLabel() + ": " + rate + " empty replies/minute.");
- if (rate > fMaxEmptyPollsPerMinute) {
- try {
- log.warn(ri.getLabel() + ": " + rate + " empty replies/minute, limit is " + fMaxPollsPerMinute
- + ".");
- if (fSleepMs > 0) {
- log.warn(ri.getLabel() + ": " + "Slowing response with " + fSleepMs
- + " ms sleep, then responding in error.");
- Thread.sleep(fSleepMs);
-
- } else {
- log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error.");
- }
- } catch (InterruptedException e) {
- log.error("Exception "+ e);
- // ignore
- }
-
-
- ErrorResponse errRes = new ErrorResponse(HttpStatusCodes.k429_tooManyRequests,
- DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(),
- "This client is making too many requests. Please use a long poll "
- + "setting to decrease the number of requests that result in empty responses. ","",Utils.getFormattedDate(new Date()),topic,"","",consumerGroup+"/"+clientId,remoteHost);
-
- log.info(errRes.toString());
- throw new CambriaApiException(errRes);
- }
-
-
- }
-
- /**
- *
- * @param topic
- * @param consumerGroup
- * @param clientId
- * @param sentCount
- */
- public void onSend(String topic, String consumerGroup, String clientId, long sentCount) {
- // check for good replies
- if (sentCount > 0) {
- // that was a good send, reset the metric
- getRateInfo(topic, consumerGroup, clientId).reset();
- }
- }
-
- private static class RateInfo {
- private final String fLabel;
- private final CdmRateTicker fCallRateSinceLastMsgSend;
- /**
- * constructor initialzes
- *
- * @param label
- * @param windowLengthMinutes
- */
- public RateInfo(String label, int windowLengthMinutes) {
- fLabel = label;
- fCallRateSinceLastMsgSend = new CdmRateTicker("Call rate since last msg send", 1, TimeUnit.MINUTES,
- windowLengthMinutes, TimeUnit.MINUTES);
- }
-
- public String getLabel() {
- return fLabel;
- }
-
- /**
- * CdmRateTicker is reset
- */
- public void reset() {
- fCallRateSinceLastMsgSend.reset();
- }
-
- /**
- *
- * @return
- */
- public double onCall() {
- fCallRateSinceLastMsgSend.tick();
- return fCallRateSinceLastMsgSend.getRate();
- }
- }
-
-
-
- private static class RateInfoCheck {
-
- private final String fLabel;
- private final CdmRateTicker fCallRateSinceLastMsgSend;
- /**
- * constructor initialzes
- *
- * @param label
- * @param windowLengthMinutes
- */
- public RateInfoCheck(String label, int windowLengthMinutes) {
- fLabel = label;
- fCallRateSinceLastMsgSend = new CdmRateTicker("Call rate since last msg send", 1, TimeUnit.MINUTES,
- windowLengthMinutes, TimeUnit.MINUTES);
- }
-
- public String getLabel() {
- return fLabel;
- }
-
- /**
- * CdmRateTicker is reset
- */
- public void reset() {
- fCallRateSinceLastMsgSend.reset();
- }
-
- /**
- *
- * @return
- */
- public double onCall() {
- fCallRateSinceLastMsgSend.tick();
- return fCallRateSinceLastMsgSend.getRate();
- }
- }
-
-
-
-
- private RateInfo getRateInfo(String topic, String consumerGroup, String clientId) {
- final String key = makeKey(topic, consumerGroup, clientId);
- RateInfo ri = fRateInfo.get(key);
- if (ri == null) {
- ri = new RateInfo(key, fWindowLengthMins);
- fRateInfo.put(key, ri);
- }
- return ri;
- }
-
-
-
-
-
-
-
- private String makeKey(String topic, String group, String id) {
- return topic + "::" + group + "::" + id;
- }
-}
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPContext.java b/src/main/java/com/att/dmf/mr/beans/DMaaPContext.java
deleted file mode 100644
index a880877..0000000
--- a/src/main/java/com/att/dmf/mr/beans/DMaaPContext.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
-*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.dmf.mr.beans;
-
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import javax.servlet.http.HttpSession;
-
-import com.att.dmf.mr.utils.ConfigurationReader;
-
-/**
- * DMaaPContext provide and maintain all the configurations , Http request/response
- * Session and consumer Request Time
- * @author nilanjana.maity
- *
- */
-public class DMaaPContext {
-
- private ConfigurationReader configReader;
- private HttpServletRequest request;
- private HttpServletResponse response;
- private HttpSession session;
- private String consumerRequestTime;
- static int i=0;
-
- public synchronized static long getBatchID() {
- try{
- final long metricsSendTime = System.currentTimeMillis();
- final Date d = new Date(metricsSendTime);
- final String text = new SimpleDateFormat("ddMMyyyyHHmmss").format(d);
- long dt= Long.valueOf(text)+i;
- i++;
- return dt;
- }
- catch(NumberFormatException ex){
- return 0;
- }
- }
-
- public HttpServletRequest getRequest() {
- return request;
- }
-
- public void setRequest(HttpServletRequest request) {
- this.request = request;
- }
-
- public HttpServletResponse getResponse() {
- return response;
- }
-
- public void setResponse(HttpServletResponse response) {
- this.response = response;
- }
-
- public HttpSession getSession() {
- this.session = request.getSession();
- return session;
- }
-
- public void setSession(HttpSession session) {
- this.session = session;
- }
-
- public ConfigurationReader getConfigReader() {
- return configReader;
- }
-
- public void setConfigReader(ConfigurationReader configReader) {
- this.configReader = configReader;
- }
-
- public String getConsumerRequestTime() {
- return consumerRequestTime;
- }
-
- public void setConsumerRequestTime(String consumerRequestTime) {
- this.consumerRequestTime = consumerRequestTime;
- }
-
-
-}
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
deleted file mode 100644
index fb0ace0..0000000
--- a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaConsumerFactory.java
+++ /dev/null
@@ -1,361 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
-*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.dmf.mr.beans;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.locks.InterProcessMutex;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.springframework.beans.factory.annotation.Qualifier;
-
-import com.att.ajsc.filemonitor.AJSCPropertiesMap;
-import com.att.dmf.mr.CambriaApiException;
-import com.att.dmf.mr.backends.Consumer;
-import com.att.dmf.mr.backends.ConsumerFactory;
-import com.att.dmf.mr.backends.MetricsSet;
-import com.att.dmf.mr.backends.kafka.Kafka011Consumer;
-import com.att.dmf.mr.backends.kafka.Kafka011ConsumerUtil;
-import com.att.dmf.mr.backends.kafka.KafkaConsumerCache;
-import com.att.dmf.mr.backends.kafka.KafkaConsumerCache.KafkaConsumerCacheException;
-import com.att.dmf.mr.backends.kafka.KafkaLiveLockAvoider2;
-import com.att.dmf.mr.backends.kafka.LiveLockAvoidance;
-import com.att.dmf.mr.constants.CambriaConstants;
-import com.att.dmf.mr.utils.ConfigurationReader;
-import com.att.dmf.mr.utils.Utils;
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
-
-/**
- * @author nilanjana.maity
- *
- */
-public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
-
-
- private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPKafkaConsumerFactory.class);
-
-
- /**
- * constructor initialization
- *
- * @param settings
- * @param metrics
- * @param curator
- * @throws missingReqdSetting
- * @throws KafkaConsumerCacheException
- * @throws UnknownHostException
- */
-
- public DMaaPKafkaConsumerFactory(@Qualifier("dMaaPMetricsSet") MetricsSet metrics,
- @Qualifier("curator") CuratorFramework curator,
- @Qualifier("kafkalockavoid") KafkaLiveLockAvoider2 kafkaLiveLockAvoider)
- throws missingReqdSetting, KafkaConsumerCacheException, UnknownHostException {
-
- String apiNodeId = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
- CambriaConstants.kSetting_ApiNodeIdentifier);
- if (apiNodeId == null) {
-
- apiNodeId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + CambriaConstants.kDefault_Port;
- }
-
- log.info("This Cambria API Node identifies itself as [" + apiNodeId + "].");
- final String mode = CambriaConstants.DMAAP;
-
- fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
- "kafka.metadata.broker.list");
- if (null == fkafkaBrokers) {
-
- fkafkaBrokers = "localhost:9092";
- }
-
- boolean kSetting_EnableCache = kDefault_IsCacheEnabled;
- String strkSetting_EnableCache = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
- "cambria.consumer.cache.enabled");
- if (null != strkSetting_EnableCache)
- kSetting_EnableCache = Boolean.parseBoolean(strkSetting_EnableCache);
-
- final boolean isCacheEnabled = kSetting_EnableCache;
-
-
- fCache = null;
- if (isCacheEnabled) {
- fCache = KafkaConsumerCache.getInstance();
-
- }
- if (fCache != null) {
- fCache.setfMetrics(metrics);
- fCache.setfApiId(apiNodeId);
- fCache.startCache(mode, curator);
- if(kafkaLiveLockAvoider!=null){
- kafkaLiveLockAvoider.startNewWatcherForServer(apiNodeId, makeAvoidanceCallback(apiNodeId));
- fkafkaLiveLockAvoider = kafkaLiveLockAvoider;
- }
- }
- }
-
- /*
- * getConsumerFor
- *
- * @see
- * com.att.dmf.mr.backends.ConsumerFactory#getConsumerFor(java.lang.String,
- * java.lang.String, java.lang.String, int, java.lang.String) This method is
- * used by EventServiceImpl.getEvents() method to get a Kakfa consumer
- * either from kafkaconsumer cache or create a new connection This also get
- * the list of other consumer objects for the same consumer group and set to
- * KafkaConsumer object. This list may be used during poll-rebalancing
- * issue.
- */
- @Override
- public Consumer getConsumerFor(String topic, String consumerGroupName, String consumerId, int timeoutMs,
- String remotehost) throws UnavailableException, CambriaApiException {
- Kafka011Consumer kc;
-
- // To synchronize based on the consumer group.
-
- Object syncObject = synchash.get(topic + consumerGroupName);
- if (null == syncObject) {
- syncObject = new Object();
- synchash.put(topic + consumerGroupName, syncObject);
- }
-
- synchronized (syncObject) {
- try {
- kc = (fCache != null) ? fCache.getConsumerFor(topic, consumerGroupName, consumerId) : null; // consumerId
-
- } catch (KafkaConsumerCacheException e) {
- log.info("######@@@@### Error occured in Kafka Caching" + e + " " + topic + "::" + consumerGroupName
- + "::" + consumerId);
- log.error("####@@@@## Error occured in Kafka Caching" + e + " " + topic + "::" + consumerGroupName
- + "::" + consumerId);
- throw new UnavailableException(e);
- }
-
- // Ideally if cache exists below flow should be skipped. If cache
- // didnt
- // exist, then create this first time on this node.
- if (kc == null) {
-
- log.info("^Kafka consumer cache value " + topic + "::" + consumerGroupName + "::" + consumerId + " =>"
- + kc);
-
- final InterProcessMutex ipLock = new InterProcessMutex(ConfigurationReader.getCurator(),
- "/consumerFactory/" + topic + "/" + consumerGroupName + "/" + consumerId);
- boolean locked = false;
-
- try {
-
- locked = ipLock.acquire(30, TimeUnit.SECONDS);
- if (!locked) {
-
- log.info("Could not acquire lock in order to create (topic, group, consumer) = " + "(" + topic
- + ", " + consumerGroupName + ", " + consumerId + ") from " + remotehost);
- throw new UnavailableException(
- "Could not acquire lock in order to create (topic, group, consumer) = " + "(" + topic
- + ", " + consumerGroupName + ", " + consumerId + ") " + remotehost);
- }
-
- // ConfigurationReader.getCurator().checkExists().forPath("S").
-
- log.info("Creating Kafka consumer for group [" + consumerGroupName + "], consumer [" + consumerId
- + "], on topic [" + topic + "].");
-
- if (fCache != null) {
- fCache.signalOwnership(topic, consumerGroupName, consumerId);
- }
-
- final Properties props = createConsumerConfig(topic,consumerGroupName, consumerId);
- long fCreateTimeMs = System.currentTimeMillis();
- KafkaConsumer<String, String> cc = new KafkaConsumer<>(props);
- kc = new Kafka011Consumer(topic, consumerGroupName, consumerId, cc, fkafkaLiveLockAvoider);
- log.info(" kafka stream created in " + (System.currentTimeMillis() - fCreateTimeMs));
-
- if (fCache != null) {
- fCache.putConsumerFor(topic, consumerGroupName, consumerId, kc); //
- }
-
- } catch (org.I0Itec.zkclient.exception.ZkTimeoutException x) {
- log.info(
- "Kafka consumer couldn't connect to ZK. " + x + " " + consumerGroupName + "/" + consumerId);
- throw new UnavailableException("Couldn't connect to ZK.");
- } catch (KafkaConsumerCacheException e) {
- log.info("Failed to cache consumer (this may have performance implications): " + e.getMessage()
- + " " + consumerGroupName + "/" + consumerId);
- } catch (UnavailableException u) {
- log.info("Failed and in UnavailableException block " + u.getMessage() + " " + consumerGroupName
- + "/" + consumerId);
- throw new UnavailableException("Error while acquiring consumer factory lock " + u.getMessage(), u);
- } catch (Exception e) {
- log.info("Failed and go to Exception block " + e.getMessage() + " " + consumerGroupName + "/"
- + consumerId);
- log.error("Failed and go to Exception block " + e.getMessage() + " " + consumerGroupName + "/"
- + consumerId);
-
- } finally {
- if (locked) {
- try {
- ipLock.release();
- } catch (Exception e) {
- throw new UnavailableException("Error while releasing consumer factory lock" + e, e);
- }
- }
- }
- }
- }
- return kc;
- }
-
- @Override
- public synchronized void destroyConsumer(String topic, String consumerGroup, String clientId) {
- if (fCache != null) {
- fCache.dropConsumer(topic, consumerGroup, clientId);
- }
- }
-
- @Override
- public synchronized Collection<? extends Consumer> getConsumers() {
- return fCache.getConsumers();
- }
-
- @Override
- public synchronized void dropCache() {
- fCache.dropAllConsumers();
- }
-
-
- private KafkaConsumerCache fCache;
- private KafkaLiveLockAvoider2 fkafkaLiveLockAvoider;
- private String fkafkaBrokers;
-
-
-
- private static String makeLongKey(String key, String prefix) {
- return prefix + "." + key;
- }
-
- private void transferSettingIfProvided(Properties target, String key, String prefix) {
- String keyVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, makeLongKey(key, prefix));
-
-
- if (null != keyVal) {
-
- log.info("Setting [" + key + "] to " + keyVal + ".");
- target.put(key, keyVal);
- }
- }
-
- /**
- * Name CreateConsumerconfig
- * @param topic
- * @param groupId
- * @param consumerId
- * @return Properties
- *
- * This method is to create Properties required to create kafka connection
- * Group name is replaced with different format groupid--topic to address same
- * groupids for multiple topics. Same groupid with multiple topics
- * may start frequent consumer rebalancing on all the topics . Replacing them makes it unique
- */
- private Properties createConsumerConfig(String topic ,String groupId, String consumerId) {
- final Properties props = new Properties();
- //fakeGroupName is added to avoid multiple consumer group for multiple topics.Donot Change this logic
- //Fix for CPFMF-644 :
- final String fakeGroupName = groupId + "--" + topic;
- props.put("group.id", fakeGroupName);
- props.put("enable.auto.commit", "false"); // 0.11
- props.put("bootstrap.servers", fkafkaBrokers);
- if(Utils.isCadiEnabled()){
- props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';");
- props.put("security.protocol", "SASL_PLAINTEXT");
- props.put("sasl.mechanism", "PLAIN");
- }
- props.put("client.id", consumerId);
-
- // additional settings: start with our defaults, then pull in configured
- // overrides
- populateKafkaInternalDefaultsMap();
- for (String key : KafkaConsumerKeys) {
- transferSettingIfProvided(props, key, "kafka");
- }
-
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-
- return props;
- }
-
-
- private static final String KafkaConsumerKeys[] = { "bootstrap.servers", "heartbeat.interval.ms",
- "auto.offset.reset", "exclude.internal.topics", "session.timeout.ms", "fetch.max.bytes",
- "auto.commit.interval.ms", "connections.max.idle.ms", "fetch.min.bytes", "isolation.level",
- "fetch.max.bytes", "request.timeout.ms", "fetch.max.wait.bytes", "reconnect.backoff.max.ms",
- "max.partition.fetch.bytes", "reconnect.backoff.max.ms", "reconnect.backoff.ms", "retry.backoff.ms",
- "max.poll.interval.ms", "max.poll.records", "receive.buffer.bytes", "metadata.max.age.ms" };
-
- /**
- * putting values in hashmap like consumer timeout, zookeeper time out, etc
- *
- * @param setting
- */
- private static void populateKafkaInternalDefaultsMap() { }
-
- /*
- * The starterIncremnt value is just to emulate calling certain consumers,
- * in this test app all the consumers are local
- *
- */
- private LiveLockAvoidance makeAvoidanceCallback(final String appId) {
-
- return new LiveLockAvoidance() {
-
- @Override
- public String getAppId() {
- return appId;
- }
-
- @Override
- public void handleRebalanceUnlock(String groupName) {
- log.info("FORCE A POLL NOW FOR appId: [{}] group: [{}]", getAppId(), groupName);
- Kafka011ConsumerUtil.forcePollOnConsumer(groupName + "::");
- }
-
- };
-
- }
-
- @SuppressWarnings("rawtypes")
- @Override
- public HashMap getConsumerForKafka011(String topic, String consumerGroupName, String consumerId, int timeoutMs,
- String remotehost) throws UnavailableException, CambriaApiException {
- // TODO Auto-generated method stub
- return null;
- }
-
- private HashMap<String, Object> synchash = new HashMap<String, Object>();
-
-} \ No newline at end of file
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java b/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java
deleted file mode 100644
index acf4824..0000000
--- a/src/main/java/com/att/dmf/mr/beans/DMaaPKafkaMetaBroker.java
+++ /dev/null
@@ -1,495 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
-*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.dmf.mr.beans;
-
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.ExecutionException;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.AdminClientConfig;
-import org.apache.kafka.clients.admin.CreateTopicsResult;
-import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.common.KafkaFuture;
-import org.json.JSONObject;
-import org.json.JSONArray;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.stereotype.Component;
-
-import com.att.dmf.mr.CambriaApiException;
-import com.att.dmf.mr.constants.CambriaConstants;
-import com.att.dmf.mr.metabroker.Broker;
-import com.att.dmf.mr.metabroker.Broker1;
-import com.att.dmf.mr.metabroker.Topic;
-import com.att.dmf.mr.utils.ConfigurationReader;
-import com.att.dmf.mr.utils.Utils;
-//import org.apache.log4-j.Logger;
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-
-import com.att.nsa.configs.ConfigDb;
-import com.att.nsa.configs.ConfigDbException;
-import com.att.nsa.configs.ConfigPath;
-import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
-import com.att.nsa.drumlin.till.nv.rrNvReadable;
-import com.att.nsa.security.NsaAcl;
-import com.att.nsa.security.NsaAclUtils;
-import com.att.nsa.security.NsaApiKey;
-
-
-/**
- * class performing all topic operations
- *
- * @author anowarul.islam
- *
- */
-//@Component
-public class DMaaPKafkaMetaBroker implements Broker1 {
-
- public DMaaPKafkaMetaBroker() {
- fZk = null;
- fCambriaConfig = null;
- fBaseTopicData = null;
- final Properties props = new Properties ();
- String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
- "kafka.metadata.broker.list");
- if (null == fkafkaBrokers) {
-
- fkafkaBrokers = "localhost:9092";
- }
-
- props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
- if(Utils.isCadiEnabled()){
- props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';");
- props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
- props.put("sasl.mechanism", "PLAIN");
- }
-
- fKafkaAdminClient=AdminClient.create ( props );
-
- }
-
- //private static final Logger log = Logger.getLogger(DMaaPKafkaMetaBroker.class);
- private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class);
- private final AdminClient fKafkaAdminClient;
-
-
-
- /**
- * DMaaPKafkaMetaBroker constructor initializing
- *
- * @param settings
- * @param zk
- * @param configDb
- */
- public DMaaPKafkaMetaBroker(@Qualifier("propertyReader") rrNvReadable settings,
- @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) {
- //fSettings = settings;
- fZk = zk;
- fCambriaConfig = configDb;
- fBaseTopicData = configDb.parse("/topics");
- final Properties props = new Properties ();
- String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
- "kafka.metadata.broker.list");
- if (null == fkafkaBrokers) {
-
- fkafkaBrokers = "localhost:9092";
- }
-
- if(Utils.isCadiEnabled()){
- props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='"+Utils.getKafkaproperty()+"';");
- props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
- props.put("sasl.mechanism", "PLAIN");
- }
- props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
-
- fKafkaAdminClient=AdminClient.create ( props );
-
-
-
- }
-
- public DMaaPKafkaMetaBroker( rrNvReadable settings,
- ZkClient zk, ConfigDb configDb,AdminClient client) {
-
- fZk = zk;
- fCambriaConfig = configDb;
- fBaseTopicData = configDb.parse("/topics");
- fKafkaAdminClient= client;
-
-
-
- }
-
- @Override
- public List<Topic> getAllTopics() throws ConfigDbException {
- log.info("Retrieving list of all the topics.");
- final LinkedList<Topic> result = new LinkedList<Topic>();
- try {
- log.info("Retrieving all topics from root: " + zkTopicsRoot);
- final List<String> topics = fZk.getChildren(zkTopicsRoot);
- for (String topic : topics) {
- result.add(new KafkaTopic(topic, fCambriaConfig, fBaseTopicData));
- }
- JSONObject dataObj = new JSONObject();
- dataObj.put("topics", new JSONObject());
-
- for (String topic : topics) {
- dataObj.getJSONObject("topics").put(topic, new JSONObject());
- }
- } catch (ZkNoNodeException excp) {
- // very fresh kafka doesn't have any topics or a topics node
- log.error("ZK doesn't have a Kakfa topics node at " + zkTopicsRoot, excp);
- }
- return result;
- }
-
- @Override
- public Topic getTopic(String topic) throws ConfigDbException {
- if (fZk.exists(zkTopicsRoot + "/" + topic)) {
- return getKafkaTopicConfig(fCambriaConfig, fBaseTopicData, topic);
- }
- // else: no such topic in kafka
- return null;
- }
-
- /**
- * static method get KafkaTopic object
- *
- * @param db
- * @param base
- * @param topic
- * @return
- * @throws ConfigDbException
- */
- public static KafkaTopic getKafkaTopicConfig(ConfigDb db, ConfigPath base, String topic) throws ConfigDbException {
- return new KafkaTopic(topic, db, base);
- }
-
- /**
- * creating topic
- */
- @Override
- public Topic createTopic(String topic, String desc, String ownerApiKey, int partitions, int replicas,
- boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException {
- log.info("Creating topic: " + topic);
- try {
- log.info("Check if topic [" + topic + "] exist.");
- // first check for existence "our way"
- final Topic t = getTopic(topic);
- if (t != null) {
- log.info("Could not create topic [" + topic + "]. Topic Already exists.");
- throw new TopicExistsException("Could not create topic [" + topic + "]. Topic Alreay exists.");
- }
- } catch (ConfigDbException e1) {
- log.error("Topic [" + topic + "] could not be created. Couldn't check topic data in config db.", e1);
- throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
- "Couldn't check topic data in config db.");
- }
-
- // we only allow 3 replicas. (If we don't test this, we get weird
- // results from the cluster,
- // so explicit test and fail.)
- if (replicas < 1 || replicas > 3) {
- log.info("Topic [" + topic + "] could not be created. The replica count must be between 1 and 3.");
- throw new CambriaApiException(HttpStatusCodes.k400_badRequest,
- "The replica count must be between 1 and 3.");
- }
- if (partitions < 1) {
- log.info("Topic [" + topic + "] could not be created. The partition count must be at least 1.");
- throw new CambriaApiException(HttpStatusCodes.k400_badRequest, "The partition count must be at least 1.");
- }
-
- // create via kafka
-
- try
- {
- final NewTopic topicRequest = new NewTopic ( topic, partitions, new Integer(replicas).shortValue () );
- final CreateTopicsResult ctr = fKafkaAdminClient.createTopics ( Arrays.asList ( topicRequest ) );
- final KafkaFuture<Void> ctrResult = ctr.all ();
- ctrResult.get ();
- // underlying Kafka topic created. now setup our API info
- return createTopicEntry ( topic, desc, ownerApiKey, transactionEnabled );
- }
- catch ( InterruptedException e )
- {
-
- log.warn ( "Execution of describeTopics timed out." );
- throw new ConfigDbException ( e );
- }
- catch ( ExecutionException e )
- {
-
- log.warn ( "Execution of describeTopics failed: " + e.getCause ().getMessage (), e.getCause () );
- throw new ConfigDbException ( e.getCause () );
- }
-
- }
-
- @Override
- public void deleteTopic(String topic) throws CambriaApiException, TopicExistsException,ConfigDbException {
- log.info("Deleting topic: " + topic);
- ZkClient zkClient = null;
- try {
- log.info("Loading zookeeper client for topic deletion.");
- // topic creation. (Otherwise, the topic is only partially created
- // in ZK.)
-
-
- fKafkaAdminClient.deleteTopics(Arrays.asList(topic));
- log.info("Zookeeper client loaded successfully. Deleting topic.");
-
- } catch (Exception e) {
- log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e);
- throw new ConfigDbException(e);
- } finally {
- log.info("Closing zookeeper connection.");
- if (zkClient != null)
- zkClient.close();
- }
-
- // throw new UnsupportedOperationException ( "We can't programmatically
- // delete Kafka topics yet." );
- }
-
- //private final rrNvReadable fSettings;
- private final ZkClient fZk;
- private final ConfigDb fCambriaConfig;
- private final ConfigPath fBaseTopicData;
-
- private static final String zkTopicsRoot = "/brokers/topics";
- private static final JSONObject kEmptyAcl = new JSONObject();
-
- /**
- * method Providing KafkaTopic Object associated with owner and
- * transactionenabled or not
- *
- * @param name
- * @param desc
- * @param owner
- * @param transactionEnabled
- * @return
- * @throws ConfigDbException
- */
- public KafkaTopic createTopicEntry(String name, String desc, String owner, boolean transactionEnabled)
- throws ConfigDbException {
- return createTopicEntry(fCambriaConfig, fBaseTopicData, name, desc, owner, transactionEnabled);
- }
-
- /**
- * static method giving kafka topic object
- *
- * @param db
- * @param basePath
- * @param name
- * @param desc
- * @param owner
- * @param transactionEnabled
- * @return
- * @throws ConfigDbException
- */
- public static KafkaTopic createTopicEntry(ConfigDb db, ConfigPath basePath, String name, String desc, String owner,
- boolean transactionEnabled) throws ConfigDbException {
- final JSONObject o = new JSONObject();
- o.put("owner", owner);
- o.put("description", desc);
- o.put("txenabled", transactionEnabled);
- db.store(basePath.getChild(name), o.toString());
- return new KafkaTopic(name, db, basePath);
- }
-
- /**
- * class performing all user opearation like user is eligible to read,
- * write. permitting a user to write and read,
- *
- * @author anowarul.islam
- *
- */
- public static class KafkaTopic implements Topic {
- /**
- * constructor initializes
- *
- * @param name
- * @param configdb
- * @param baseTopic
- * @throws ConfigDbException
- */
- public KafkaTopic(String name, ConfigDb configdb, ConfigPath baseTopic) throws ConfigDbException {
- fName = name;
- fConfigDb = configdb;
- fBaseTopicData = baseTopic;
-
- String data = fConfigDb.load(fBaseTopicData.getChild(fName));
- if (data == null) {
- data = "{}";
- }
-
- final JSONObject o = new JSONObject(data);
- fOwner = o.optString("owner", "");
- fDesc = o.optString("description", "");
- fTransactionEnabled = o.optBoolean("txenabled", false);// default
- // value is
- // false
- // if this topic has an owner, it needs both read/write ACLs. If there's no
- // owner (or it's empty), null is okay -- this is for existing or implicitly
- // created topics.
- JSONObject readers = o.optJSONObject ( "readers" );
- if ( readers == null && fOwner.length () > 0 ) readers = kEmptyAcl;
- fReaders = fromJson ( readers );
-
- JSONObject writers = o.optJSONObject ( "writers" );
- if ( writers == null && fOwner.length () > 0 ) writers = kEmptyAcl;
- fWriters = fromJson ( writers );
- }
-
- private NsaAcl fromJson(JSONObject o) {
- NsaAcl acl = new NsaAcl();
- if (o != null) {
- JSONArray a = o.optJSONArray("allowed");
- if (a != null) {
- for (int i = 0; i < a.length(); ++i) {
- String user = a.getString(i);
- acl.add(user);
- }
- }
- }
- return acl;
- }
-
- @Override
- public String getName() {
- return fName;
- }
-
- @Override
- public String getOwner() {
- return fOwner;
- }
-
- @Override
- public String getDescription() {
- return fDesc;
- }
-
- @Override
- public NsaAcl getReaderAcl() {
- return fReaders;
- }
-
- @Override
- public NsaAcl getWriterAcl() {
- return fWriters;
- }
-
- @Override
- public void checkUserRead(NsaApiKey user) throws AccessDeniedException {
- NsaAclUtils.checkUserAccess ( fOwner, getReaderAcl(), user );
- }
-
- @Override
- public void checkUserWrite(NsaApiKey user) throws AccessDeniedException {
- NsaAclUtils.checkUserAccess ( fOwner, getWriterAcl(), user );
- }
-
- @Override
- public void permitWritesFromUser(String pubId, NsaApiKey asUser)
- throws ConfigDbException, AccessDeniedException {
- updateAcl(asUser, false, true, pubId);
- }
-
- @Override
- public void denyWritesFromUser(String pubId, NsaApiKey asUser) throws ConfigDbException, AccessDeniedException {
- updateAcl(asUser, false, false, pubId);
- }
-
- @Override
- public void permitReadsByUser(String consumerId, NsaApiKey asUser)
- throws ConfigDbException, AccessDeniedException {
- updateAcl(asUser, true, true, consumerId);
- }
-
- @Override
- public void denyReadsByUser(String consumerId, NsaApiKey asUser)
- throws ConfigDbException, AccessDeniedException {
- updateAcl(asUser, true, false, consumerId);
- }
-
- private void updateAcl(NsaApiKey asUser, boolean reader, boolean add, String key)
- throws ConfigDbException, AccessDeniedException{
- try
- {
- final NsaAcl acl = NsaAclUtils.updateAcl ( this, asUser, key, reader, add );
-
- // we have to assume we have current data, or load it again. for the expected use
- // case, assuming we can overwrite the data is fine.
- final JSONObject o = new JSONObject ();
- o.put ( "owner", fOwner );
- o.put ( "readers", safeSerialize ( reader ? acl : fReaders ) );
- o.put ( "writers", safeSerialize ( reader ? fWriters : acl ) );
- fConfigDb.store ( fBaseTopicData.getChild ( fName ), o.toString () );
-
- log.info ( "ACL_UPDATE: " + asUser.getKey () + " " + ( add ? "added" : "removed" ) + ( reader?"subscriber":"publisher" ) + " " + key + " on " + fName );
-
- }
- catch ( ConfigDbException x )
- {
- throw x;
- }
- catch ( AccessDeniedException x )
- {
- throw x;
- }
-
- }
-
- private JSONObject safeSerialize(NsaAcl acl) {
- return acl == null ? null : acl.serialize();
- }
-
- private final String fName;
- private final ConfigDb fConfigDb;
- private final ConfigPath fBaseTopicData;
- private final String fOwner;
- private final String fDesc;
- private final NsaAcl fReaders;
- private final NsaAcl fWriters;
- private boolean fTransactionEnabled;
-
- public boolean isTransactionEnabled() {
- return fTransactionEnabled;
- }
-
- @Override
- public Set<String> getOwners() {
- final TreeSet<String> owners = new TreeSet<String> ();
- owners.add ( fOwner );
- return owners;
- }
- }
-
-}
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPMetricsSet.java b/src/main/java/com/att/dmf/mr/beans/DMaaPMetricsSet.java
deleted file mode 100644
index 4c9532b..0000000
--- a/src/main/java/com/att/dmf/mr/beans/DMaaPMetricsSet.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
-*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.dmf.mr.beans;
-
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import com.att.dmf.mr.CambriaApiVersionInfo;
-import com.att.dmf.mr.backends.MetricsSet;
-import com.att.mr.apiServer.metrics.cambria.DMaaPMetricsSender;
-import com.att.nsa.drumlin.till.nv.rrNvReadable;
-import com.att.nsa.metrics.impl.CdmConstant;
-import com.att.nsa.metrics.impl.CdmCounter;
-import com.att.nsa.metrics.impl.CdmMetricsRegistryImpl;
-import com.att.nsa.metrics.impl.CdmMovingAverage;
-import com.att.nsa.metrics.impl.CdmRateTicker;
-import com.att.nsa.metrics.impl.CdmSimpleMetric;
-import com.att.nsa.metrics.impl.CdmStringConstant;
-import com.att.nsa.metrics.impl.CdmTimeSince;
-
-/*@Component("dMaaPMetricsSet")*/
-/**
- * Metrics related information
- *
- * @author anowarul.islam
- *
- */
-public class DMaaPMetricsSet extends CdmMetricsRegistryImpl implements MetricsSet {
-
- private final CdmStringConstant fVersion;
- private final CdmConstant fStartTime;
- private final CdmTimeSince fUpTime;
-
- private final CdmCounter fRecvTotal;
- private final CdmRateTicker fRecvEpsInstant;
- private final CdmRateTicker fRecvEpsShort;
- private final CdmRateTicker fRecvEpsLong;
-
- private final CdmCounter fSendTotal;
- private final CdmRateTicker fSendEpsInstant;
- private final CdmRateTicker fSendEpsShort;
- private final CdmRateTicker fSendEpsLong;
-
- private final CdmCounter fKafkaConsumerCacheMiss;
- private final CdmCounter fKafkaConsumerCacheHit;
-
- private final CdmCounter fKafkaConsumerClaimed;
- private final CdmCounter fKafkaConsumerTimeout;
-
- private final CdmSimpleMetric fFanOutRatio;
-
- private final HashMap<String, CdmRateTicker> fPathUseRates;
- private final HashMap<String, CdmMovingAverage> fPathAvgs;
-
- private rrNvReadable fSettings;
-
- private final ScheduledExecutorService fScheduler;
-
- /**
- * Constructor initialization
- *
- * @param cs
- */
-
- public DMaaPMetricsSet(rrNvReadable cs) {
-
- fVersion = new CdmStringConstant("Version " + CambriaApiVersionInfo.getVersion());
- super.putItem("version", fVersion);
-
- final long startTime = System.currentTimeMillis();
- final Date d = new Date(startTime);
- final String text = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssz").format(d);
- fStartTime = new CdmConstant(startTime / 1000, "Start Time (epoch); " + text);
- super.putItem("startTime", fStartTime);
-
- fUpTime = new CdmTimeSince("seconds since start");
- super.putItem("upTime", fUpTime);
-
- fRecvTotal = new CdmCounter("Total events received since start");
- super.putItem("recvTotalEvents", fRecvTotal);
-
- fRecvEpsInstant = new CdmRateTicker("recv eps (1 min)", 1, TimeUnit.SECONDS, 1, TimeUnit.MINUTES);
- super.putItem("recvEpsInstant", fRecvEpsInstant);
-
- fRecvEpsShort = new CdmRateTicker("recv eps (10 mins)", 1, TimeUnit.SECONDS, 10, TimeUnit.MINUTES);
- super.putItem("recvEpsShort", fRecvEpsShort);
-
- fRecvEpsLong = new CdmRateTicker("recv eps (1 hr)", 1, TimeUnit.SECONDS, 1, TimeUnit.HOURS);
- super.putItem("recvEpsLong", fRecvEpsLong);
-
- fSendTotal = new CdmCounter("Total events sent since start");
- super.putItem("sendTotalEvents", fSendTotal);
-
- fSendEpsInstant = new CdmRateTicker("send eps (1 min)", 1, TimeUnit.SECONDS, 1, TimeUnit.MINUTES);
- super.putItem("sendEpsInstant", fSendEpsInstant);
-
- fSendEpsShort = new CdmRateTicker("send eps (10 mins)", 1, TimeUnit.SECONDS, 10, TimeUnit.MINUTES);
- super.putItem("sendEpsShort", fSendEpsShort);
-
- fSendEpsLong = new CdmRateTicker("send eps (1 hr)", 1, TimeUnit.SECONDS, 1, TimeUnit.HOURS);
- super.putItem("sendEpsLong", fSendEpsLong);
-
- fKafkaConsumerCacheMiss = new CdmCounter("Kafka Consumer Cache Misses");
- super.putItem("kafkaConsumerCacheMiss", fKafkaConsumerCacheMiss);
-
- fKafkaConsumerCacheHit = new CdmCounter("Kafka Consumer Cache Hits");
- super.putItem("kafkaConsumerCacheHit", fKafkaConsumerCacheHit);
-
- fKafkaConsumerClaimed = new CdmCounter("Kafka Consumers Claimed");
- super.putItem("kafkaConsumerClaims", fKafkaConsumerClaimed);
-
- fKafkaConsumerTimeout = new CdmCounter("Kafka Consumers Timedout");
- super.putItem("kafkaConsumerTimeouts", fKafkaConsumerTimeout);
-
- // FIXME: CdmLevel is not exactly a great choice
- fFanOutRatio = new CdmSimpleMetric() {
- @Override
- public String getRawValueString() {
- return getRawValue().toString();
- }
-
- @Override
- public Number getRawValue() {
- final double s = fSendTotal.getValue();
- final double r = fRecvTotal.getValue();
- return r == 0.0 ? 0.0 : s / r;
- }
-
- @Override
- public String summarize() {
- return getRawValueString() + " sends per recv";
- }
-
- };
- super.putItem("fanOut", fFanOutRatio);
-
- // these are added to the metrics catalog as they're discovered
- fPathUseRates = new HashMap<String, CdmRateTicker>();
- fPathAvgs = new HashMap<String, CdmMovingAverage>();
-
- fScheduler = Executors.newScheduledThreadPool(1);
- }
-
- @Override
- public void setupCambriaSender() {
- DMaaPMetricsSender.sendPeriodically(fScheduler, this, "cambria.apinode.metrics.dmaap");
- }
-
- @Override
- public void onRouteComplete(String name, long durationMs) {
- CdmRateTicker ticker = fPathUseRates.get(name);
- if (ticker == null) {
- ticker = new CdmRateTicker("calls/min on path " + name + "", 1, TimeUnit.MINUTES, 1, TimeUnit.HOURS);
- fPathUseRates.put(name, ticker);
- super.putItem("pathUse_" + name, ticker);
- }
- ticker.tick();
-
- CdmMovingAverage durs = fPathAvgs.get(name);
- if (durs == null) {
- durs = new CdmMovingAverage("ms avg duration on path " + name + ", last 10 minutes", 10, TimeUnit.MINUTES);
- fPathAvgs.put(name, durs);
- super.putItem("pathDurationMs_" + name, durs);
- }
- durs.tick(durationMs);
- }
-
- @Override
- public void publishTick(int amount) {
- if (amount > 0) {
- fRecvTotal.bumpBy(amount);
- fRecvEpsInstant.tick(amount);
- fRecvEpsShort.tick(amount);
- fRecvEpsLong.tick(amount);
- }
- }
-
- @Override
- public void consumeTick(int amount) {
- if (amount > 0) {
- fSendTotal.bumpBy(amount);
- fSendEpsInstant.tick(amount);
- fSendEpsShort.tick(amount);
- fSendEpsLong.tick(amount);
- }
- }
-
- @Override
- public void onKafkaConsumerCacheMiss() {
- fKafkaConsumerCacheMiss.bump();
- }
-
- @Override
- public void onKafkaConsumerCacheHit() {
- fKafkaConsumerCacheHit.bump();
- }
-
- @Override
- public void onKafkaConsumerClaimed() {
- fKafkaConsumerClaimed.bump();
- }
-
- @Override
- public void onKafkaConsumerTimeout() {
- fKafkaConsumerTimeout.bump();
- }
-
-}
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPNsaApiDb.java b/src/main/java/com/att/dmf/mr/beans/DMaaPNsaApiDb.java
deleted file mode 100644
index 963ff2d..0000000
--- a/src/main/java/com/att/dmf/mr/beans/DMaaPNsaApiDb.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
-*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.dmf.mr.beans;
-
-import java.security.Key;
-
-
-import org.springframework.beans.factory.annotation.Autowired;
-
-import com.att.dmf.mr.constants.CambriaConstants;
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-import com.att.nsa.configs.ConfigDb;
-import com.att.nsa.configs.ConfigDbException;
-import com.att.nsa.configs.confimpl.EncryptingLayer;
-import com.att.nsa.drumlin.till.nv.rrNvReadable;
-import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
-import com.att.nsa.security.db.BaseNsaApiDbImpl;
-import com.att.nsa.security.db.EncryptingApiDbImpl;
-import com.att.nsa.security.db.NsaApiDb;
-import com.att.nsa.security.db.simple.NsaSimpleApiKey;
-import com.att.nsa.security.db.simple.NsaSimpleApiKeyFactory;
-import com.att.nsa.util.rrConvertor;
-
-/**
- *
- * @author anowarul.islam
- *
- */
-public class DMaaPNsaApiDb {
-
-
- private DMaaPZkConfigDb cdb;
-
- //private static final Logger log = Logger
-
- private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPNsaApiDb.class);
-
-/**
- *
- * Constructor initialized
- * @param settings
- * @param cdb
- */
- @Autowired
- public DMaaPNsaApiDb(rrNvReadable settings, DMaaPZkConfigDb cdb) {
-
- this.setCdb(cdb);
- }
- /**
- *
- * @param settings
- * @param cdb
- * @return
- * @throws ConfigDbException
- * @throws missingReqdSetting
- */
- public static NsaApiDb<NsaSimpleApiKey> buildApiKeyDb(
- rrNvReadable settings, ConfigDb cdb) throws ConfigDbException,
- missingReqdSetting {
- // Cambria uses an encrypted api key db
-
-
- final String keyBase64 =com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"cambria.secureConfig.key");
-
-
-
- final String initVectorBase64 =com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"cambria.secureConfig.iv");
- // if neither value was provided, don't encrypt api key db
- if (keyBase64 == null && initVectorBase64 == null) {
- log.info("This server is configured to use an unencrypted API key database. See the settings documentation.");
- return new BaseNsaApiDbImpl<>(cdb,
- new NsaSimpleApiKeyFactory());
- } else if (keyBase64 == null) {
- // neither or both, otherwise something's goofed
- throw new missingReqdSetting("cambria.secureConfig.key");
- } else if (initVectorBase64 == null) {
- // neither or both, otherwise something's goofed
- throw new missingReqdSetting("cambria.secureConfig.iv");
- } else {
- log.info("This server is configured to use an encrypted API key database.");
- final Key key = EncryptingLayer.readSecretKey(keyBase64);
- final byte[] iv = rrConvertor.base64Decode(initVectorBase64);
- return new EncryptingApiDbImpl<>(cdb,
- new NsaSimpleApiKeyFactory(), key, iv);
- }
- }
-
- /**
- * @return
- * returns settings
- */
-
-
-
-
- /**
- * @param settings
- * set settings
- */
-
-
-
-
- /**
- * @return
- * returns cbd
- */
- public DMaaPZkConfigDb getCdb() {
- return cdb;
- }
- /**
- * @param cdb
- * set cdb
- */
- public void setCdb(DMaaPZkConfigDb cdb) {
- this.cdb = cdb;
- }
-
-
-}
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPZkClient.java b/src/main/java/com/att/dmf/mr/beans/DMaaPZkClient.java
deleted file mode 100644
index 78a7426..0000000
--- a/src/main/java/com/att/dmf/mr/beans/DMaaPZkClient.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
-*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.dmf.mr.beans;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.springframework.beans.factory.annotation.Qualifier;
-
-import com.att.dmf.mr.utils.ConfigurationReader;
-import com.att.nsa.drumlin.till.nv.rrNvReadable;
-
-/**
- * Created for Zookeeper client which will read configuration and settings parameter
- * @author nilanjana.maity
- *
- */
-public class DMaaPZkClient extends ZkClient {
-
- /**
- * This constructor will get the settings value from rrNvReadable
- * and ConfigurationReader's zookeeper connection
- * @param settings
- */
- public DMaaPZkClient(@Qualifier("propertyReader") rrNvReadable settings) {
- super(ConfigurationReader.getMainZookeeperConnectionString());
- }
-}
diff --git a/src/main/java/com/att/dmf/mr/beans/DMaaPZkConfigDb.java b/src/main/java/com/att/dmf/mr/beans/DMaaPZkConfigDb.java
deleted file mode 100644
index 5aa25fa..0000000
--- a/src/main/java/com/att/dmf/mr/beans/DMaaPZkConfigDb.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
-*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.dmf.mr.beans;
-
-import org.springframework.beans.factory.annotation.Qualifier;
-
-import com.att.dmf.mr.utils.ConfigurationReader;
-import com.att.nsa.configs.confimpl.ZkConfigDb;
-import com.att.nsa.drumlin.till.nv.rrNvReadable;
-
-/**
- * Provide the zookeeper config db connection
- * @author nilanjana.maity
- *
- */
-public class DMaaPZkConfigDb extends ZkConfigDb {
- /**
- * This Constructor will provide the configuration details from the property reader
- * and DMaaPZkClient
- * @param zk
- * @param settings
- */
- public DMaaPZkConfigDb(@Qualifier("dMaaPZkClient") DMaaPZkClient zk,
- @Qualifier("propertyReader") rrNvReadable settings) {
-
-
- super(ConfigurationReader.getMainZookeeperConnectionString(),ConfigurationReader.getMainZookeeperConnectionSRoot());
-
- }
-
-
-}
diff --git a/src/main/java/com/att/dmf/mr/beans/LogDetails.java b/src/main/java/com/att/dmf/mr/beans/LogDetails.java
deleted file mode 100644
index b7fb325..0000000
--- a/src/main/java/com/att/dmf/mr/beans/LogDetails.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/**
- *
- */
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
-*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.dmf.mr.beans;
-
-import java.util.Date;
-
-import com.att.dmf.mr.constants.CambriaConstants;
-import com.att.dmf.mr.utils.Utils;
-
-/**
- * @author muzainulhaque.qazi
- *
- */
-
-public class LogDetails {
-
- private String publisherId;
- private String topicId;
- private String subscriberGroupId;
- private String subscriberId;
- private String publisherIp;
- private String messageBatchId;
- private String messageSequence;
- private String messageTimestamp;
- private String consumeTimestamp;
- private String transactionIdTs;
- private String serverIp;
-
- private long messageLengthInBytes;
- private long totalMessageCount;
-
- private boolean transactionEnabled;
- /**
- * This is for transaction enabled logging details
- *
- */
- public LogDetails() {
- super();
- }
-
- public String getTransactionId() {
- StringBuilder transactionId = new StringBuilder();
- transactionId.append(transactionIdTs);
- transactionId.append(CambriaConstants.TRANSACTION_ID_SEPARATOR);
- transactionId.append(publisherIp);
- transactionId.append(CambriaConstants.TRANSACTION_ID_SEPARATOR);
- transactionId.append(messageBatchId);
- transactionId.append(CambriaConstants.TRANSACTION_ID_SEPARATOR);
- transactionId.append(messageSequence);
-
- return transactionId.toString();
- }
-
- public String getPublisherId() {
- return publisherId;
- }
-
- public void setPublisherId(String publisherId) {
- this.publisherId = publisherId;
- }
-
- public String getTopicId() {
- return topicId;
- }
-
- public void setTopicId(String topicId) {
- this.topicId = topicId;
- }
-
- public String getSubscriberGroupId() {
- return subscriberGroupId;
- }
-
- public void setSubscriberGroupId(String subscriberGroupId) {
- this.subscriberGroupId = subscriberGroupId;
- }
-
- public String getSubscriberId() {
- return subscriberId;
- }
-
- public void setSubscriberId(String subscriberId) {
- this.subscriberId = subscriberId;
- }
-
- public String getPublisherIp() {
- return publisherIp;
- }
-
- public void setPublisherIp(String publisherIp) {
- this.publisherIp = publisherIp;
- }
-
- public String getMessageBatchId() {
- return messageBatchId;
- }
-
- public void setMessageBatchId(Long messageBatchId) {
- this.messageBatchId = Utils.getFromattedBatchSequenceId(messageBatchId);
- }
-
- public String getMessageSequence() {
- return messageSequence;
- }
-
- public void setMessageSequence(String messageSequence) {
- this.messageSequence = messageSequence;
- }
-
- public String getMessageTimestamp() {
- return messageTimestamp;
- }
-
- public void setMessageTimestamp(String messageTimestamp) {
- this.messageTimestamp = messageTimestamp;
- }
-
- public String getPublishTimestamp() {
- return Utils.getFormattedDate(new Date());
- }
-
- public String getConsumeTimestamp() {
- return consumeTimestamp;
- }
-
- public void setConsumeTimestamp(String consumeTimestamp) {
- this.consumeTimestamp = consumeTimestamp;
- }
-
- public long getMessageLengthInBytes() {
- return messageLengthInBytes;
- }
-
- public void setMessageLengthInBytes(long messageLengthInBytes) {
- this.messageLengthInBytes = messageLengthInBytes;
- }
-
- public long getTotalMessageCount() {
- return totalMessageCount;
- }
-
- public void setTotalMessageCount(long totalMessageCount) {
- this.totalMessageCount = totalMessageCount;
- }
-
- public boolean isTransactionEnabled() {
- return transactionEnabled;
- }
-
- public void setTransactionEnabled(boolean transactionEnabled) {
- this.transactionEnabled = transactionEnabled;
- }
-
- public String getTransactionIdTs() {
- return transactionIdTs;
- }
-
- public void setTransactionIdTs(String transactionIdTs) {
- this.transactionIdTs = transactionIdTs;
- }
-
- public String getPublisherLogDetails() {
-
- StringBuilder buffer = new StringBuilder();
- buffer.append("[publisherId=" + publisherId);
- buffer.append(", topicId=" + topicId);
- buffer.append(", messageTimestamp=" + messageTimestamp);
- buffer.append(", publisherIp=" + publisherIp);
- buffer.append(", messageBatchId=" + messageBatchId);
- buffer.append(", messageSequence=" + messageSequence );
- buffer.append(", messageLengthInBytes=" + messageLengthInBytes);
- buffer.append(", transactionEnabled=" + transactionEnabled);
- buffer.append(", transactionId=" + getTransactionId());
- buffer.append(", publishTimestamp=" + getPublishTimestamp());
- buffer.append(", serverIp=" + getServerIp()+"]");
- return buffer.toString();
-
- }
-
- public String getServerIp() {
- return serverIp;
- }
-
- public void setServerIp(String serverIp) {
- this.serverIp = serverIp;
- }
-
- public void setMessageBatchId(String messageBatchId) {
- this.messageBatchId = messageBatchId;
- }
-
-}
diff --git a/src/main/java/com/att/dmf/mr/beans/TopicBean.java b/src/main/java/com/att/dmf/mr/beans/TopicBean.java
deleted file mode 100644
index a397921..0000000
--- a/src/main/java/com/att/dmf/mr/beans/TopicBean.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- *
- */
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
-*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.dmf.mr.beans;
-
-import java.io.Serializable;
-
-import javax.xml.bind.annotation.XmlRootElement;
-
-/**
- * @author muzainulhaque.qazi
- *
- */
-@XmlRootElement
-public class TopicBean implements Serializable {
-
- private static final long serialVersionUID = -8620390377775457949L;
- private String topicName;
- private String topicDescription;
-
- private int partitionCount;
- private int replicationCount;
-
- private boolean transactionEnabled;
-
- /**
- * constructor
- */
- public TopicBean() {
- super();
- }
-
- /**
- * constructor initialization with topic details name, description,
- * partition, replication, transaction
- *
- * @param topicName
- * @param description
- * @param partitionCount
- * @param replicationCount
- * @param transactionEnabled
- */
- public TopicBean(String topicName, String topicDescription, int partitionCount, int replicationCount,
- boolean transactionEnabled) {
- super();
- this.topicName = topicName;
- this.topicDescription = topicDescription;
- this.partitionCount = partitionCount;
- this.replicationCount = replicationCount;
- this.transactionEnabled = transactionEnabled;
- }
-
- /**
- * @return
- * returns topic name which is of String type
- */
- public String getTopicName() {
- return topicName;
- }
-
- /**
- * @param topicName
- * set topic name
- */
- public void setTopicName(String topicName) {
- this.topicName = topicName;
- }
-
-
- /**
- * @return
- * returns partition count which is of int type
- */
- public int getPartitionCount() {
- return partitionCount;
- }
-
- /**
- * @param partitionCount
- * set partition Count
- */
- public void setPartitionCount(int partitionCount) {
- this.partitionCount = partitionCount;
- }
-
- /**
- * @return
- * returns replication count which is of int type
- */
- public int getReplicationCount() {
- return replicationCount;
- }
-
- /**
- * @param
- * set replication count which is of int type
- */
- public void setReplicationCount(int replicationCount) {
- this.replicationCount = replicationCount;
- }
-
- /**
- * @return
- * returns boolean value which indicates whether transaction is Enabled
- */
- public boolean isTransactionEnabled() {
- return transactionEnabled;
- }
-
- /**
- * @param
- * sets boolean value which indicates whether transaction is Enabled
- */
- public void setTransactionEnabled(boolean transactionEnabled) {
- this.transactionEnabled = transactionEnabled;
- }
-
- /**
- *
- * @return returns description which is of String type
- */
- public String getTopicDescription() {
- return topicDescription;
- }
- /**
- *
- * @param topicDescription
- * set description which is of String type
- */
- public void setTopicDescription(String topicDescription) {
- this.topicDescription = topicDescription;
- }
-
-}