summaryrefslogtreecommitdiffstats
path: root/src/main/java/com/att/nsa/cambria/beans
diff options
context:
space:
mode:
authorsunil unnava <su622b@att.com>2018-08-14 09:34:46 -0400
committersunil unnava <su622b@att.com>2018-08-14 09:39:23 -0400
commitb32effcaf5684d5e2f338a4537b71a2375c534e5 (patch)
treee1b80407f414509ffcc766b987ec6a95f7254b4e /src/main/java/com/att/nsa/cambria/beans
parent0823cb186012c8e6b7de3d979dfabb9f838da7c2 (diff)
update the testcases after the kafka 11 changes
Issue-ID: DMAAP-526 Change-Id: I477a8ee05fb3cdd76af726b6ca0d1a69aa9eef93 Signed-off-by: sunil unnava <su622b@att.com>
Diffstat (limited to 'src/main/java/com/att/nsa/cambria/beans')
-rw-r--r--src/main/java/com/att/nsa/cambria/beans/ApiKeyBean.java88
-rw-r--r--src/main/java/com/att/nsa/cambria/beans/DMaaPCambriaLimiter.java228
-rw-r--r--src/main/java/com/att/nsa/cambria/beans/DMaaPContext.java104
-rw-r--r--src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaConsumerFactory.java323
-rw-r--r--src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaMetaBroker.java461
-rw-r--r--src/main/java/com/att/nsa/cambria/beans/DMaaPMetricsSet.java232
-rw-r--r--src/main/java/com/att/nsa/cambria/beans/DMaaPNsaApiDb.java139
-rw-r--r--src/main/java/com/att/nsa/cambria/beans/DMaaPZkClient.java45
-rw-r--r--src/main/java/com/att/nsa/cambria/beans/DMaaPZkConfigDb.java52
-rw-r--r--src/main/java/com/att/nsa/cambria/beans/LogDetails.java214
-rw-r--r--src/main/java/com/att/nsa/cambria/beans/TopicBean.java155
-rw-r--r--src/main/java/com/att/nsa/cambria/beans/ZkClientFactory.java36
12 files changed, 0 insertions, 2077 deletions
diff --git a/src/main/java/com/att/nsa/cambria/beans/ApiKeyBean.java b/src/main/java/com/att/nsa/cambria/beans/ApiKeyBean.java
deleted file mode 100644
index df4a2ed..0000000
--- a/src/main/java/com/att/nsa/cambria/beans/ApiKeyBean.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.cambria.beans;
-
-import java.io.Serializable;
-
-import javax.xml.bind.annotation.XmlRootElement;
-
-import com.att.nsa.drumlin.till.data.uniqueStringGenerator;
-/**
- *
- * @author author
- *
- */
-@XmlRootElement
-public class ApiKeyBean implements Serializable {
-
- private static final long serialVersionUID = -8219849086890567740L;
-
- private static final String KEY_CHARS = "ABCDEFGHJIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
-
- private String email;
- private String description;
- /**
- * constructor
- */
- public ApiKeyBean() {
- super();
- }
-/**
- *
- * @param email
- * @param description
- */
- public ApiKeyBean(String email, String description) {
- super();
- this.email = email;
- this.description = description;
- }
-
- public String getEmail() {
- return email;
- }
-
- public void setEmail(String email) {
- this.email = email;
- }
-
- public String getDescription() {
- return description;
- }
-
- public void setDescription(String description) {
- this.description = description;
- }
-
- public String getKey() {
- return generateKey(16);
- }
-
- public String getSharedSecret() {
- return generateKey(24);
- }
-
- private static String generateKey ( int length ) {
- return uniqueStringGenerator.createKeyUsingAlphabet ( KEY_CHARS, length );
- }
-
-}
diff --git a/src/main/java/com/att/nsa/cambria/beans/DMaaPCambriaLimiter.java b/src/main/java/com/att/nsa/cambria/beans/DMaaPCambriaLimiter.java
deleted file mode 100644
index 4e9fc02..0000000
--- a/src/main/java/com/att/nsa/cambria/beans/DMaaPCambriaLimiter.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.cambria.beans;
-
-import java.util.HashMap;
-import java.util.concurrent.TimeUnit;
-
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.stereotype.Component;
-
-import com.att.nsa.cambria.CambriaApiException;
-import com.att.nsa.cambria.constants.CambriaConstants;
-import com.att.nsa.cambria.exception.DMaaPResponseCode;
-import com.att.nsa.cambria.exception.ErrorResponse;
-import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
-import com.att.nsa.drumlin.till.nv.rrNvReadable;
-import com.att.nsa.drumlin.till.nv.rrNvReadable.invalidSettingValue;
-import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
-import com.att.nsa.metrics.impl.CdmRateTicker;
-
-/**
- * class provide rate information
- *
- * @author author
- *
- */
-@Component
-public class DMaaPCambriaLimiter {
- /**
- * constructor initializes
- *
- * @param settings
- * @throws missingReqdSetting
- * @throws invalidSettingValue
- */
- @Autowired
- public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings)
- throws missingReqdSetting, invalidSettingValue {
- fRateInfo = new HashMap<String, RateInfo>();
- fMaxEmptyPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxEmptyPollsPerMinute,
- CambriaConstants.kDefault_MaxEmptyPollsPerMinute);
- fWindowLengthMins = settings.getInt(CambriaConstants.kSetting_RateLimitWindowLength,
- CambriaConstants.kDefault_RateLimitWindowLength);
- fSleepMs = settings.getLong(CambriaConstants.kSetting_MaxEmptyPollsPerMinute,
- CambriaConstants.kDefault_SleepMsOnRateLimit);
- }
-
- /**
- * static method provide the sleep time
- *
- * @param ratePerMinute
- * @return
- */
- public static long getSleepMsForRate(double ratePerMinute) {
- if (ratePerMinute <= 0.0)
- return 0;
- return Math.max(1000, Math.round(60 * 1000 / ratePerMinute));
- }
-
- /**
- * Construct a rate limiter.
- *
- * @param maxEmptyPollsPerMinute
- * Pass <= 0 to deactivate rate limiting.
- * @param windowLengthMins
- */
- public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute, int windowLengthMins) {
- this(maxEmptyPollsPerMinute, windowLengthMins, getSleepMsForRate(maxEmptyPollsPerMinute));
- }
-
- /**
- * Construct a rate limiter
- *
- * @param maxEmptyPollsPerMinute
- * Pass <= 0 to deactivate rate limiting.
- * @param sleepMs
- * @param windowLengthMins
- */
- public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute, int windowLengthMins, long sleepMs) {
- fRateInfo = new HashMap<String, RateInfo>();
- fMaxEmptyPollsPerMinute = Math.max(0, maxEmptyPollsPerMinute);
- fWindowLengthMins = windowLengthMins;
- fSleepMs = Math.max(0, sleepMs);
- }
-
- /**
- * Tell the rate limiter about a call to a topic/group/id. If the rate is
- * too high, this call delays its return and throws an exception.
- *
- * @param topic
- * @param consumerGroup
- * @param clientId
- * @throws CambriaApiException
- */
- public void onCall(String topic, String consumerGroup, String clientId) throws CambriaApiException {
- // do nothing if rate is configured 0 or less
- if (fMaxEmptyPollsPerMinute <= 0) {
- return;
- }
-
- // setup rate info for this tuple
- final RateInfo ri = getRateInfo(topic, consumerGroup, clientId);
-
- final double rate = ri.onCall();
- log.info(ri.getLabel() + ": " + rate + " empty replies/minute.");
-
- if (rate > fMaxEmptyPollsPerMinute) {
- try {
- log.warn(ri.getLabel() + ": " + rate + " empty replies/minute, limit is " + fMaxEmptyPollsPerMinute
- + ".");
- if (fSleepMs > 0) {
- log.warn(ri.getLabel() + ": " + "Slowing response with " + fSleepMs
- + " ms sleep, then responding in error.");
- Thread.sleep(fSleepMs);
- } else {
- log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error.");
- }
- } catch (InterruptedException e) {
- log.error(e.toString());
- Thread.currentThread().interrupt();
- }
- ErrorResponse errRes = new ErrorResponse(HttpStatusCodes.k429_tooManyRequests,
- DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(),
- "This client is making too many requests. Please use a long poll "
- + "setting to decrease the number of requests that result in empty responses. ");
- log.info(errRes.toString());
- throw new CambriaApiException(errRes);
- }
- }
-
- /**
- *
- * @param topic
- * @param consumerGroup
- * @param clientId
- * @param sentCount
- */
- public void onSend(String topic, String consumerGroup, String clientId, long sentCount) {
- // check for good replies
- if (sentCount > 0) {
- // that was a good send, reset the metric
- getRateInfo(topic, consumerGroup, clientId).reset();
- }
- }
-
- private static class RateInfo {
- /**
- * constructor initialzes
- *
- * @param label
- * @param windowLengthMinutes
- */
- public RateInfo(String label, int windowLengthMinutes) {
- fLabel = label;
- fCallRateSinceLastMsgSend = new CdmRateTicker("Call rate since last msg send", 1, TimeUnit.MINUTES,
- windowLengthMinutes, TimeUnit.MINUTES);
- }
-
- public String getLabel() {
- return fLabel;
- }
-
- /**
- * CdmRateTicker is reset
- */
- public void reset() {
- fCallRateSinceLastMsgSend.reset();
- }
-
- /**
- *
- * @return
- */
- public double onCall() {
- fCallRateSinceLastMsgSend.tick();
- return fCallRateSinceLastMsgSend.getRate();
- }
-
- private final String fLabel;
- private final CdmRateTicker fCallRateSinceLastMsgSend;
- }
-
- private final HashMap<String, RateInfo> fRateInfo;
- private final double fMaxEmptyPollsPerMinute;
- private final int fWindowLengthMins;
- private final long fSleepMs;
- //private static final Logger log = LoggerFactory.getLogger(DMaaPCambriaLimiter.class);
- private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPCambriaLimiter.class);
- private RateInfo getRateInfo(String topic, String consumerGroup, String clientId) {
- final String key = makeKey(topic, consumerGroup, clientId);
- RateInfo ri = fRateInfo.get(key);
- if (ri == null) {
- ri = new RateInfo(key, fWindowLengthMins);
- fRateInfo.put(key, ri);
- }
- return ri;
- }
-
- private String makeKey(String topic, String group, String id) {
- return topic + "::" + group + "::" + id;
- }
-}
diff --git a/src/main/java/com/att/nsa/cambria/beans/DMaaPContext.java b/src/main/java/com/att/nsa/cambria/beans/DMaaPContext.java
deleted file mode 100644
index 79a8e1f..0000000
--- a/src/main/java/com/att/nsa/cambria/beans/DMaaPContext.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.cambria.beans;
-
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import javax.servlet.http.HttpSession;
-
-import com.att.nsa.cambria.utils.ConfigurationReader;
-
-/**
- * DMaaPContext provide and maintain all the configurations , Http request/response
- * Session and consumer Request Time
- * @author author
- *
- */
-public class DMaaPContext {
-
- private ConfigurationReader configReader;
- private HttpServletRequest request;
- private HttpServletResponse response;
- private HttpSession session;
- private String consumerRequestTime;
- static int i=0;
-
- public synchronized static long getBatchID() {
- try{
- final long metricsSendTime = System.currentTimeMillis();
- final Date d = new Date(metricsSendTime);
- final String text = new SimpleDateFormat("ddMMyyyyHHmmss").format(d);
- long dt= Long.valueOf(text)+i;
- i++;
- return dt;
- }
- catch(NumberFormatException ex){
- return 0;
- }
- }
-
- public HttpServletRequest getRequest() {
- return request;
- }
-
- public void setRequest(HttpServletRequest request) {
- this.request = request;
- }
-
- public HttpServletResponse getResponse() {
- return response;
- }
-
- public void setResponse(HttpServletResponse response) {
- this.response = response;
- }
-
- public HttpSession getSession() {
- this.session = request.getSession();
- return session;
- }
-
- public void setSession(HttpSession session) {
- this.session = session;
- }
-
- public ConfigurationReader getConfigReader() {
- return configReader;
- }
-
- public void setConfigReader(ConfigurationReader configReader) {
- this.configReader = configReader;
- }
-
- public String getConsumerRequestTime() {
- return consumerRequestTime;
- }
-
- public void setConsumerRequestTime(String consumerRequestTime) {
- this.consumerRequestTime = consumerRequestTime;
- }
-
-
-}
diff --git a/src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaConsumerFactory.java b/src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaConsumerFactory.java
deleted file mode 100644
index 63f1dd5..0000000
--- a/src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaConsumerFactory.java
+++ /dev/null
@@ -1,323 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.cambria.beans;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.locks.InterProcessMutex;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-import org.springframework.beans.factory.annotation.Qualifier;
-
-import com.att.nsa.cambria.backends.Consumer;
-import com.att.nsa.cambria.backends.ConsumerFactory;
-import com.att.nsa.cambria.backends.MetricsSet;
-import com.att.nsa.cambria.backends.kafka.KafkaConsumer;
-import com.att.nsa.cambria.backends.kafka.KafkaConsumerCache;
-import com.att.nsa.cambria.backends.kafka.KafkaConsumerCache.KafkaConsumerCacheException;
-import com.att.nsa.cambria.constants.CambriaConstants;
-import com.att.nsa.cambria.utils.ConfigurationReader;
-import com.att.nsa.drumlin.till.nv.rrNvReadable;
-import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
-import com.att.ajsc.filemonitor.AJSCPropertiesMap;
-import kafka.consumer.ConsumerConfig;
-import kafka.javaapi.consumer.ConsumerConnector;
-
-/**
- * @author author
- *
- */
-public class DMaaPKafkaConsumerFactory implements ConsumerFactory {
-
- //private static final Logger log = LoggerFactory .getLogger(DMaaPKafkaConsumerFactory.class);
- private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPKafkaConsumerFactory.class);
- /**
- * constructor initialization
- *
- * @param settings
- * @param metrics
- * @param curator
- * @throws missingReqdSetting
- * @throws KafkaConsumerCacheException
- * @throws UnknownHostException
- */
- public DMaaPKafkaConsumerFactory(
- @Qualifier("propertyReader") rrNvReadable settings,
- @Qualifier("dMaaPMetricsSet") MetricsSet metrics,
- @Qualifier("curator") CuratorFramework curator)
- throws missingReqdSetting, KafkaConsumerCacheException,
- UnknownHostException {
- /*final String apiNodeId = settings.getString(
- CambriaConstants.kSetting_ApiNodeIdentifier,
- InetAddress.getLocalHost().getCanonicalHostName()
- + ":"
- + settings.getInt(CambriaConstants.kSetting_Port,
- CambriaConstants.kDefault_Port));*/
- String apiNodeId = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
- CambriaConstants.kSetting_ApiNodeIdentifier);
- if (apiNodeId == null){
-
- apiNodeId=InetAddress.getLocalHost().getCanonicalHostName()
- + ":"
- + settings.getInt(CambriaConstants.kSetting_Port,
- CambriaConstants.kDefault_Port);
- }
-
- log.info("This Cambria API Node identifies itself as [" + apiNodeId
- + "].");
- final String mode = CambriaConstants.DMAAP;
- /*fSettings = settings;
- fZooKeeper = fSettings.getString(kSettings_KafkaZookeeper, settings
- .getString(CambriaConstants.kSetting_ZkConfigDbServers,
- CambriaConstants.kDefault_ZkConfigDbServers));*/
-
- String strkSettings_KafkaZookeeper = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSettings_KafkaZookeeper);
- if(null==strkSettings_KafkaZookeeper){
- strkSettings_KafkaZookeeper = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbServers);
- if (null==strkSettings_KafkaZookeeper) strkSettings_KafkaZookeeper = CambriaConstants.kDefault_ZkConfigDbServers;
-
- }
- fZooKeeper= strkSettings_KafkaZookeeper;
-
- //final boolean isCacheEnabled = fSettings.getBoolean(
- // kSetting_EnableCache, kDefault_IsCacheEnabled);
- boolean kSetting_EnableCache= kDefault_IsCacheEnabled;
- String strkSetting_EnableCache = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSetting_EnableCache+"");
- if(null!=strkSetting_EnableCache)
- {
- kSetting_EnableCache=Boolean.parseBoolean(strkSetting_EnableCache);
- }
-
- final boolean isCacheEnabled = kSetting_EnableCache;
-
-
- fCache = (isCacheEnabled) ? new KafkaConsumerCache(apiNodeId,
- metrics) : null;
- if (fCache != null) {
- fCache.startCache(mode, curator);
- }
- }
-
- @Override
- public Consumer getConsumerFor(String topic, String consumerGroupName,
- String consumerId, int timeoutMs) throws UnavailableException {
- KafkaConsumer kc;
-
- try {
- kc = (fCache != null) ? fCache.getConsumerFor(topic,
- consumerGroupName, consumerId) : null;
- } catch (KafkaConsumerCacheException e) {
- throw new UnavailableException(e);
- }
-
- if (kc == null) {
-
- final InterProcessMutex ipLock = new InterProcessMutex( ConfigurationReader.getCurator(), "/consumerFactory/" + topic + "/" + consumerGroupName + "/" + consumerId);
-// final InterProcessMutex fLock = new InterProcessMutex(
-// ConfigurationReader.getCurator(), "/consumerFactory/"
-// + topic + "/" + consumerGroupName + "/"
-// + consumerId);
- boolean locked = false;
- try {
-
- locked = ipLock.acquire(30, TimeUnit.SECONDS);
- if (!locked) {
- // FIXME: this seems to cause trouble in some cases. This exception
- // gets thrown routinely. Possibly a consumer trying multiple servers
- // at once, producing a never-ending cycle of overlapping locks?
- // The problem is that it throws and winds up sending a 503 to the
- // client, which would be incorrect if the client is causing trouble
- // by switching back and forth.
-
- throw new UnavailableException("Could not acquire lock in order to create (topic, group, consumer) = " + "(" + topic + ", " + consumerGroupName + ", " + consumerId + ")");
- }
-
-// if (!fLock.acquire(30, TimeUnit.SECONDS)) {
-// throw new UnavailableException(
-// "Could not acquire lock in order to create (topic, group, consumer) = "
-// + "(" + topic + ", " + consumerGroupName
-// + ", " + consumerId + ")");
-// }
-
- fCache.signalOwnership(topic, consumerGroupName, consumerId);
-
- log.info("Creating Kafka consumer for group ["
- + consumerGroupName + "], consumer [" + consumerId
- + "], on topic [" + topic + "].");
-
- final String fakeGroupName = consumerGroupName + "--" + topic;
-
- final ConsumerConfig ccc = createConsumerConfig(fakeGroupName,
- consumerId);
- final ConsumerConnector cc = kafka.consumer.Consumer
- .createJavaConsumerConnector(ccc);
- kc = new KafkaConsumer(topic, consumerGroupName, consumerId, cc);
-
- if (fCache != null) {
- fCache.putConsumerFor(topic, consumerGroupName, consumerId,
- kc);
- }
- } catch (org.I0Itec.zkclient.exception.ZkTimeoutException x) {
- log.error("Exception find at getConsumerFor(String topic, String consumerGroupName,\r\n" +
- " String consumerId, int timeoutMs) : " + x);
- throw new UnavailableException("Couldn't connect to ZK.");
- } catch (KafkaConsumerCacheException e) {
- log.error("Failed to cache consumer (this may have performance implications): "
- + e.getMessage());
- } catch (Exception e) {
- throw new UnavailableException(
- "Error while acquiring consumer factory lock", e);
- } finally {
- if ( locked )
- {
- try {
- ipLock.release();
- } catch (Exception e) {
- throw new UnavailableException("Error while releasing consumer factory lock", e);
- }
- }
- }
- }
-
- return kc;
- }
-
- @Override
- public synchronized void destroyConsumer(String topic,
- String consumerGroup, String clientId) {
- if (fCache != null) {
- fCache.dropConsumer(topic, consumerGroup, clientId);
- }
- }
-
- @Override
- public synchronized Collection<? extends Consumer> getConsumers() {
- return fCache.getConsumers();
- }
-
- @Override
- public synchronized void dropCache() {
- fCache.dropAllConsumers();
- }
-
- private ConsumerConfig createConsumerConfig(String groupId,
- String consumerId) {
- final Properties props = new Properties();
- props.put("zookeeper.connect", fZooKeeper);
- props.put("group.id", groupId);
- props.put("consumer.id", consumerId);
- //props.put("auto.commit.enable", "false");
- // additional settings: start with our defaults, then pull in configured
- // overrides
- props.putAll(KafkaInternalDefaults);
- for (String key : KafkaConsumerKeys) {
- transferSettingIfProvided(props, key, "kafka");
- }
-
- return new ConsumerConfig(props);
- }
-
- //private final rrNvReadable fSettings;
- private final KafkaConsumerCache fCache;
-
- private String fZooKeeper;
-
- private static final String kSettings_KafkaZookeeper = "kafka.client.zookeeper";
-
- private static final HashMap<String, String> KafkaInternalDefaults = new HashMap<String, String>();
-
- /**
- * putting values in hashmap like consumer timeout, zookeeper time out, etc
- *
- * @param setting
- */
- public static void populateKafkaInternalDefaultsMap() {
- //@Qualifier("propertyReader") rrNvReadable setting) {
- try {
-
- HashMap<String, String> map1= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperties(CambriaConstants.msgRtr_prop);
-
- KafkaInternalDefaults.put("consumer.timeout.ms",
- // AJSCPropertiesMap.get(CambriaConstants.msgRtr_prop, "consumer.timeout.ms"));
- map1.get( "consumer.timeout.ms"));
-
- KafkaInternalDefaults.put("zookeeper.connection.timeout.ms",
- //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "zookeeper.connection.timeout.ms"));
- map1.get("zookeeper.connection.timeout.ms"));
- KafkaInternalDefaults.put("zookeeper.session.timeout.ms",
- //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "zookeeper.session.timeout.ms"));
- map1.get("zookeeper.session.timeout.ms"));
- KafkaInternalDefaults.put("zookeeper.sync.time.ms",
- // AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "zookeeper.sync.time.ms"));
- map1.get( "zookeeper.sync.time.ms"));
- KafkaInternalDefaults.put("auto.commit.interval.ms",
- //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "auto.commit.interval.ms"));
- map1.get( "auto.commit.interval.ms"));
- KafkaInternalDefaults.put("fetch.message.max.bytes",
- //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "fetch.message.max.bytes"));
- map1.get("fetch.message.max.bytes"));
- KafkaInternalDefaults.put("auto.commit.enable",
- // AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "auto.commit.enable"));
- map1.get("auto.commit.enable"));
- } catch (Exception e) {
- log.error("Failed to load Kafka Internal Properties.", e);
- }
- }
-
- private static final String KafkaConsumerKeys[] = { "socket.timeout.ms",
- "socket.receive.buffer.bytes", "fetch.message.max.bytes",
- "auto.commit.interval.ms", "queued.max.message.chunks",
- "rebalance.max.retries", "fetch.min.bytes", "fetch.wait.max.bytes",
- "rebalance.backoff.ms", "refresh.leader.backoff.ms",
- "auto.offset.reset", "consumer.timeout.ms",
- "zookeeper.session.timeout.ms", "zookeeper.connection.timeout.ms",
- "zookeeper.sync.time.ms" };
-
- private static String makeLongKey(String key, String prefix) {
- return prefix + "." + key;
- }
-
- private void transferSettingIfProvided(Properties target, String key,
- String prefix) {
- String keyVal= AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,makeLongKey(key, prefix));
-
- // if (fSettings.hasValueFor(makeLongKey(key, prefix))) {
- if (null!=keyVal) {
- // final String val = fSettings
- // .getString(makeLongKey(key, prefix), "");
- log.info("Setting [" + key + "] to " + keyVal + ".");
- target.put(key, keyVal);
- }
- }
-
- }
-
-
diff --git a/src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaMetaBroker.java b/src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaMetaBroker.java
deleted file mode 100644
index e7d777e..0000000
--- a/src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaMetaBroker.java
+++ /dev/null
@@ -1,461 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.cambria.beans;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-import java.util.TreeSet;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-//import org.apache.log4-j.Logger;
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-
-import org.json.JSONArray;
-import org.json.JSONObject;
-import org.springframework.beans.factory.annotation.Qualifier;
-
-import com.att.nsa.cambria.CambriaApiException;
-import com.att.nsa.cambria.metabroker.Broker;
-import com.att.nsa.cambria.metabroker.Topic;
-import com.att.nsa.cambria.utils.ConfigurationReader;
-import com.att.nsa.configs.ConfigDb;
-import com.att.nsa.configs.ConfigDbException;
-import com.att.nsa.configs.ConfigPath;
-import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
-import com.att.nsa.drumlin.till.nv.rrNvReadable;
-import com.att.nsa.security.NsaAcl;
-import com.att.nsa.security.NsaAclUtils;
-import com.att.nsa.security.NsaApiKey;
-
-import kafka.admin.AdminUtils;
-import kafka.utils.ZKStringSerializer$;
-
-/**
- * class performing all topic operations
- *
- * @author author
- *
- */
-
-public class DMaaPKafkaMetaBroker implements Broker {
-
- //private static final Logger log = Logger.getLogger(DMaaPKafkaMetaBroker.class);
- private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class);
-
-
- /**
- * DMaaPKafkaMetaBroker constructor initializing
- *
- * @param settings
- * @param zk
- * @param configDb
- */
- public DMaaPKafkaMetaBroker(@Qualifier("propertyReader") rrNvReadable settings,
- @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) {
- //fSettings = settings;
- fZk = zk;
- fCambriaConfig = configDb;
- fBaseTopicData = configDb.parse("/topics");
- }
-
- @Override
- public List<Topic> getAllTopics() throws ConfigDbException {
- log.info("Retrieving list of all the topics.");
- final LinkedList<Topic> result = new LinkedList<Topic>();
- try {
- log.info("Retrieving all topics from root: " + zkTopicsRoot);
- final List<String> topics = fZk.getChildren(zkTopicsRoot);
- for (String topic : topics) {
- result.add(new KafkaTopic(topic, fCambriaConfig, fBaseTopicData));
- }
-
- JSONObject dataObj = new JSONObject();
- dataObj.put("topics", new JSONObject());
-
- for (String topic : topics) {
- dataObj.getJSONObject("topics").put(topic, new JSONObject());
- }
- } catch (ZkNoNodeException excp) {
- // very fresh kafka doesn't have any topics or a topics node
- log.error("ZK doesn't have a Kakfa topics node at " + zkTopicsRoot, excp);
- }
- return result;
- }
-
- @Override
- public Topic getTopic(String topic) throws ConfigDbException {
- if (fZk.exists(zkTopicsRoot + "/" + topic)) {
- return getKafkaTopicConfig(fCambriaConfig, fBaseTopicData, topic);
- }
- // else: no such topic in kafka
- return null;
- }
-
- /**
- * static method get KafkaTopic object
- *
- * @param db
- * @param base
- * @param topic
- * @return
- * @throws ConfigDbException
- */
- public static KafkaTopic getKafkaTopicConfig(ConfigDb db, ConfigPath base, String topic) throws ConfigDbException {
- return new KafkaTopic(topic, db, base);
- }
-
- /**
- * creating topic
- */
- @Override
- public Topic createTopic(String topic, String desc, String ownerApiKey, int partitions, int replicas,
- boolean transactionEnabled) throws TopicExistsException, CambriaApiException {
- log.info("Creating topic: " + topic);
- try {
- log.info("Check if topic [" + topic + "] exist.");
- // first check for existence "our way"
- final Topic t = getTopic(topic);
- if (t != null) {
- log.info("Could not create topic [" + topic + "]. Topic Already exists.");
- throw new TopicExistsException("Could not create topic [" + topic + "]. Topic Alreay exists.");
- }
- } catch (ConfigDbException e1) {
- log.error("Topic [" + topic + "] could not be created. Couldn't check topic data in config db.", e1);
- throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
- "Couldn't check topic data in config db.");
- }
-
- // we only allow 3 replicas. (If we don't test this, we get weird
- // results from the cluster,
- // so explicit test and fail.)
- if (replicas < 1 || replicas > 3) {
- log.info("Topic [" + topic + "] could not be created. The replica count must be between 1 and 3.");
- throw new CambriaApiException(HttpStatusCodes.k400_badRequest,
- "The replica count must be between 1 and 3.");
- }
- if (partitions < 1) {
- log.info("Topic [" + topic + "] could not be created. The partition count must be at least 1.");
- throw new CambriaApiException(HttpStatusCodes.k400_badRequest, "The partition count must be at least 1.");
- }
-
- // create via kafka
- try {
- ZkClient zkClient = null;
- try {
- log.info("Loading zookeeper client for creating topic.");
- // FIXME: use of this scala module$ thing is a goofy hack to
- // make Kafka aware of the
- // topic creation. (Otherwise, the topic is only partially
- // created in ZK.)
- zkClient = ZkClientFactory.createZkClient();
- log.info("Zookeeper client loaded successfully. Creating topic.");
- AdminUtils.createTopic(zkClient, topic, partitions, replicas, new Properties());
- } catch (kafka.common.TopicExistsException e) {
- log.error("Topic [" + topic + "] could not be created. " + e.getMessage(), e);
- throw new TopicExistsException(topic);
- } catch (ZkNoNodeException e) {
- log.error("Topic [" + topic + "] could not be created. The Kafka cluster is not setup.", e);
- // Kafka throws this when the server isn't running (and perhaps
- // hasn't ever run)
- throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
- "The Kafka cluster is not setup.");
- } catch (kafka.admin.AdminOperationException e) {
- // Kafka throws this when the server isn't running (and perhaps
- // hasn't ever run)
- log.error("The Kafka cluster can't handle your request. Talk to the administrators: " + e.getMessage(),
- e);
- throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
- "The Kafka cluster can't handle your request. Talk to the administrators.");
- } finally {
- log.info("Closing zookeeper connection.");
- if (zkClient != null)
- zkClient.close();
- }
-
- log.info("Creating topic entry for topic: " + topic);
- // underlying Kafka topic created. now setup our API info
- return createTopicEntry(topic, desc, ownerApiKey, transactionEnabled);
- } catch (ConfigDbException excp) {
- log.error("Failed to create topic data. Talk to the administrators: " + excp.getMessage(), excp);
- throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
- "Failed to create topic data. Talk to the administrators.");
- }
- }
-
- @Override
- public void deleteTopic(String topic) throws CambriaApiException, TopicExistsException {
- log.info("Deleting topic: " + topic);
- ZkClient zkClient = null;
- try {
- log.info("Loading zookeeper client for topic deletion.");
- // FIXME: use of this scala module$ thing is a goofy hack to make
- // Kafka aware of the
- // topic creation. (Otherwise, the topic is only partially created
- // in ZK.)
- zkClient = ZkClientFactory.createZkClient();
-
- log.info("Zookeeper client loaded successfully. Deleting topic.");
- AdminUtils.deleteTopic(zkClient, topic);
- } catch (kafka.common.TopicExistsException e) {
- log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e);
- throw new TopicExistsException(topic);
- } catch (ZkNoNodeException e) {
- log.error("Failed to delete topic [" + topic + "]. The Kafka cluster is not setup." + e.getMessage(), e);
- // Kafka throws this when the server isn't running (and perhaps
- // hasn't ever run)
- throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable, "The Kafka cluster is not setup.");
- } catch (kafka.admin.AdminOperationException e) {
- // Kafka throws this when the server isn't running (and perhaps
- // hasn't ever run)
- log.error("The Kafka cluster can't handle your request. Talk to the administrators." + e.getMessage(), e);
- throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
- "The Kafka cluster can't handle your request. Talk to the administrators.");
- } finally {
- log.info("Closing zookeeper connection.");
- if (zkClient != null)
- zkClient.close();
- }
-
- // throw new UnsupportedOperationException ( "We can't programmatically
- // delete Kafka topics yet." );
- }
-
-
-
- //private final rrNvReadable fSettings;
- private final ZkClient fZk;
- private final ConfigDb fCambriaConfig;
- private final ConfigPath fBaseTopicData;
-
- private static final String zkTopicsRoot = "/brokers/topics";
- private static final JSONObject kEmptyAcl = new JSONObject();
-
- /**
- * method Providing KafkaTopic Object associated with owner and
- * transactionenabled or not
- *
- * @param name
- * @param desc
- * @param owner
- * @param transactionEnabled
- * @return
- * @throws ConfigDbException
- */
- public KafkaTopic createTopicEntry(String name, String desc, String owner, boolean transactionEnabled)
- throws ConfigDbException {
- return createTopicEntry(fCambriaConfig, fBaseTopicData, name, desc, owner, transactionEnabled);
- }
-
- /**
- * static method giving kafka topic object
- *
- * @param db
- * @param basePath
- * @param name
- * @param desc
- * @param owner
- * @param transactionEnabled
- * @return
- * @throws ConfigDbException
- */
- public static KafkaTopic createTopicEntry(ConfigDb db, ConfigPath basePath, String name, String desc, String owner,
- boolean transactionEnabled) throws ConfigDbException {
- final JSONObject o = new JSONObject();
- o.put("owner", owner);
- o.put("description", desc);
- o.put("txenabled", transactionEnabled);
- db.store(basePath.getChild(name), o.toString());
- return new KafkaTopic(name, db, basePath);
- }
-
- /**
- * class performing all user opearation like user is eligible to read,
- * write. permitting a user to write and read,
- *
- * @author author
- *
- */
- public static class KafkaTopic implements Topic {
- /**
- * constructor initializes
- *
- * @param name
- * @param configdb
- * @param baseTopic
- * @throws ConfigDbException
- */
- public KafkaTopic(String name, ConfigDb configdb, ConfigPath baseTopic) throws ConfigDbException {
- fName = name;
- fConfigDb = configdb;
- fBaseTopicData = baseTopic;
-
- String data = fConfigDb.load(fBaseTopicData.getChild(fName));
- if (data == null) {
- data = "{}";
- }
-
- final JSONObject o = new JSONObject(data);
- fOwner = o.optString("owner", "");
- fDesc = o.optString("description", "");
- fTransactionEnabled = o.optBoolean("txenabled", false);// default
- // value is
- // false
- // if this topic has an owner, it needs both read/write ACLs. If there's no
- // owner (or it's empty), null is okay -- this is for existing or implicitly
- // created topics.
- JSONObject readers = o.optJSONObject ( "readers" );
- if ( readers == null && fOwner.length () > 0 ) readers = kEmptyAcl;
- fReaders = fromJson ( readers );
-
- JSONObject writers = o.optJSONObject ( "writers" );
- if ( writers == null && fOwner.length () > 0 ) writers = kEmptyAcl;
- fWriters = fromJson ( writers );
- }
- private NsaAcl fromJson(JSONObject o) {
- NsaAcl acl = new NsaAcl();
- if (o != null) {
- JSONArray a = o.optJSONArray("allowed");
- if (a != null) {
- for (int i = 0; i < a.length(); ++i) {
- String user = a.getString(i);
- acl.add(user);
- }
- }
- }
- return acl;
- }
- @Override
- public String getName() {
- return fName;
- }
-
- @Override
- public String getOwner() {
- return fOwner;
- }
-
- @Override
- public String getDescription() {
- return fDesc;
- }
-
- @Override
- public NsaAcl getReaderAcl() {
- return fReaders;
- }
-
- @Override
- public NsaAcl getWriterAcl() {
- return fWriters;
- }
-
- @Override
- public void checkUserRead(NsaApiKey user) throws AccessDeniedException {
- NsaAclUtils.checkUserAccess ( fOwner, getReaderAcl(), user );
- }
-
- @Override
- public void checkUserWrite(NsaApiKey user) throws AccessDeniedException {
- NsaAclUtils.checkUserAccess ( fOwner, getWriterAcl(), user );
- }
-
- @Override
- public void permitWritesFromUser(String pubId, NsaApiKey asUser)
- throws ConfigDbException, AccessDeniedException {
- updateAcl(asUser, false, true, pubId);
- }
-
- @Override
- public void denyWritesFromUser(String pubId, NsaApiKey asUser) throws ConfigDbException, AccessDeniedException {
- updateAcl(asUser, false, false, pubId);
- }
-
- @Override
- public void permitReadsByUser(String consumerId, NsaApiKey asUser)
- throws ConfigDbException, AccessDeniedException {
- updateAcl(asUser, true, true, consumerId);
- }
-
- @Override
- public void denyReadsByUser(String consumerId, NsaApiKey asUser)
- throws ConfigDbException, AccessDeniedException {
- updateAcl(asUser, true, false, consumerId);
- }
-
- private void updateAcl(NsaApiKey asUser, boolean reader, boolean add, String key)
- throws ConfigDbException, AccessDeniedException{
- try
- {
- final NsaAcl acl = NsaAclUtils.updateAcl ( this, asUser, key, reader, add );
-
- // we have to assume we have current data, or load it again. for the expected use
- // case, assuming we can overwrite the data is fine.
- final JSONObject o = new JSONObject ();
- o.put ( "owner", fOwner );
- o.put ( "readers", safeSerialize ( reader ? acl : fReaders ) );
- o.put ( "writers", safeSerialize ( reader ? fWriters : acl ) );
- fConfigDb.store ( fBaseTopicData.getChild ( fName ), o.toString () );
-
- log.info ( "ACL_UPDATE: " + asUser.getKey () + " " + ( add ? "added" : "removed" ) + ( reader?"subscriber":"publisher" ) + " " + key + " on " + fName );
-
- }
- catch ( ConfigDbException x )
- {
- throw x;
- }
- catch ( AccessDeniedException x )
- {
- throw x;
- }
-
- }
-
- private JSONObject safeSerialize(NsaAcl acl) {
- return acl == null ? null : acl.serialize();
- }
-
- private final String fName;
- private final ConfigDb fConfigDb;
- private final ConfigPath fBaseTopicData;
- private final String fOwner;
- private final String fDesc;
- private final NsaAcl fReaders;
- private final NsaAcl fWriters;
- private boolean fTransactionEnabled;
-
- public boolean isTransactionEnabled() {
- return fTransactionEnabled;
- }
-
- @Override
- public Set<String> getOwners() {
- final TreeSet<String> owners = new TreeSet<String> ();
- owners.add ( fOwner );
- return owners;
- }
- }
-
-}
diff --git a/src/main/java/com/att/nsa/cambria/beans/DMaaPMetricsSet.java b/src/main/java/com/att/nsa/cambria/beans/DMaaPMetricsSet.java
deleted file mode 100644
index 3c3aa6d..0000000
--- a/src/main/java/com/att/nsa/cambria/beans/DMaaPMetricsSet.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.cambria.beans;
-
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import com.att.nsa.apiServer.metrics.cambria.DMaaPMetricsSender;
-import com.att.nsa.cambria.CambriaApiVersionInfo;
-import com.att.nsa.cambria.backends.MetricsSet;
-import com.att.nsa.drumlin.till.nv.rrNvReadable;
-import com.att.nsa.metrics.impl.CdmConstant;
-import com.att.nsa.metrics.impl.CdmCounter;
-import com.att.nsa.metrics.impl.CdmMetricsRegistryImpl;
-import com.att.nsa.metrics.impl.CdmMovingAverage;
-import com.att.nsa.metrics.impl.CdmRateTicker;
-import com.att.nsa.metrics.impl.CdmSimpleMetric;
-import com.att.nsa.metrics.impl.CdmStringConstant;
-import com.att.nsa.metrics.impl.CdmTimeSince;
-
-/*@Component("dMaaPMetricsSet")*/
-/**
- * Metrics related information
- *
- * @author author
- *
- */
-public class DMaaPMetricsSet extends CdmMetricsRegistryImpl implements MetricsSet {
-
- private final CdmStringConstant fVersion;
- private final CdmConstant fStartTime;
- private final CdmTimeSince fUpTime;
-
- private final CdmCounter fRecvTotal;
- private final CdmRateTicker fRecvEpsInstant;
- private final CdmRateTicker fRecvEpsShort;
- private final CdmRateTicker fRecvEpsLong;
-
- private final CdmCounter fSendTotal;
- private final CdmRateTicker fSendEpsInstant;
- private final CdmRateTicker fSendEpsShort;
- private final CdmRateTicker fSendEpsLong;
-
- private final CdmCounter fKafkaConsumerCacheMiss;
- private final CdmCounter fKafkaConsumerCacheHit;
-
- private final CdmCounter fKafkaConsumerClaimed;
- private final CdmCounter fKafkaConsumerTimeout;
-
- private final CdmSimpleMetric fFanOutRatio;
-
- private final HashMap<String, CdmRateTicker> fPathUseRates;
- private final HashMap<String, CdmMovingAverage> fPathAvgs;
-
- private rrNvReadable fSettings;
-
- private final ScheduledExecutorService fScheduler;
-
- /**
- * Constructor initialization
- *
- * @param cs
- */
- //public DMaaPMetricsSet() {
- public DMaaPMetricsSet(rrNvReadable cs) {
- //fSettings = cs;
-
- fVersion = new CdmStringConstant("Version " + CambriaApiVersionInfo.getVersion());
- super.putItem("version", fVersion);
-
- final long startTime = System.currentTimeMillis();
- final Date d = new Date(startTime);
- final String text = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssz").format(d);
- fStartTime = new CdmConstant(startTime / 1000, "Start Time (epoch); " + text);
- super.putItem("startTime", fStartTime);
-
- fUpTime = new CdmTimeSince("seconds since start");
- super.putItem("upTime", fUpTime);
-
- fRecvTotal = new CdmCounter("Total events received since start");
- super.putItem("recvTotalEvents", fRecvTotal);
-
- fRecvEpsInstant = new CdmRateTicker("recv eps (1 min)", 1, TimeUnit.SECONDS, 1, TimeUnit.MINUTES);
- super.putItem("recvEpsInstant", fRecvEpsInstant);
-
- fRecvEpsShort = new CdmRateTicker("recv eps (10 mins)", 1, TimeUnit.SECONDS, 10, TimeUnit.MINUTES);
- super.putItem("recvEpsShort", fRecvEpsShort);
-
- fRecvEpsLong = new CdmRateTicker("recv eps (1 hr)", 1, TimeUnit.SECONDS, 1, TimeUnit.HOURS);
- super.putItem("recvEpsLong", fRecvEpsLong);
-
- fSendTotal = new CdmCounter("Total events sent since start");
- super.putItem("sendTotalEvents", fSendTotal);
-
- fSendEpsInstant = new CdmRateTicker("send eps (1 min)", 1, TimeUnit.SECONDS, 1, TimeUnit.MINUTES);
- super.putItem("sendEpsInstant", fSendEpsInstant);
-
- fSendEpsShort = new CdmRateTicker("send eps (10 mins)", 1, TimeUnit.SECONDS, 10, TimeUnit.MINUTES);
- super.putItem("sendEpsShort", fSendEpsShort);
-
- fSendEpsLong = new CdmRateTicker("send eps (1 hr)", 1, TimeUnit.SECONDS, 1, TimeUnit.HOURS);
- super.putItem("sendEpsLong", fSendEpsLong);
-
- fKafkaConsumerCacheMiss = new CdmCounter("Kafka Consumer Cache Misses");
- super.putItem("kafkaConsumerCacheMiss", fKafkaConsumerCacheMiss);
-
- fKafkaConsumerCacheHit = new CdmCounter("Kafka Consumer Cache Hits");
- super.putItem("kafkaConsumerCacheHit", fKafkaConsumerCacheHit);
-
- fKafkaConsumerClaimed = new CdmCounter("Kafka Consumers Claimed");
- super.putItem("kafkaConsumerClaims", fKafkaConsumerClaimed);
-
- fKafkaConsumerTimeout = new CdmCounter("Kafka Consumers Timedout");
- super.putItem("kafkaConsumerTimeouts", fKafkaConsumerTimeout);
-
- // FIXME: CdmLevel is not exactly a great choice
- fFanOutRatio = new CdmSimpleMetric() {
- @Override
- public String getRawValueString() {
- return getRawValue().toString();
- }
-
- @Override
- public Number getRawValue() {
- final double s = fSendTotal.getValue();
- final double r = fRecvTotal.getValue();
- return r == 0.0 ? 0.0 : s / r;
- }
-
- @Override
- public String summarize() {
- return getRawValueString() + " sends per recv";
- }
-
- };
- super.putItem("fanOut", fFanOutRatio);
-
- // these are added to the metrics catalog as they're discovered
- fPathUseRates = new HashMap<String, CdmRateTicker>();
- fPathAvgs = new HashMap<String, CdmMovingAverage>();
-
- fScheduler = Executors.newScheduledThreadPool(1);
- }
-
- @Override
- public void setupCambriaSender() {
- DMaaPMetricsSender.sendPeriodically(fScheduler, this, "cambria.apinode.metrics.dmaap");
- }
-
- @Override
- public void onRouteComplete(String name, long durationMs) {
- CdmRateTicker ticker = fPathUseRates.get(name);
- if (ticker == null) {
- ticker = new CdmRateTicker("calls/min on path " + name + "", 1, TimeUnit.MINUTES, 1, TimeUnit.HOURS);
- fPathUseRates.put(name, ticker);
- super.putItem("pathUse_" + name, ticker);
- }
- ticker.tick();
-
- CdmMovingAverage durs = fPathAvgs.get(name);
- if (durs == null) {
- durs = new CdmMovingAverage("ms avg duration on path " + name + ", last 10 minutes", 10, TimeUnit.MINUTES);
- fPathAvgs.put(name, durs);
- super.putItem("pathDurationMs_" + name, durs);
- }
- durs.tick(durationMs);
- }
-
- @Override
- public void publishTick(int amount) {
- if (amount > 0) {
- fRecvTotal.bumpBy(amount);
- fRecvEpsInstant.tick(amount);
- fRecvEpsShort.tick(amount);
- fRecvEpsLong.tick(amount);
- }
- }
-
- @Override
- public void consumeTick(int amount) {
- if (amount > 0) {
- fSendTotal.bumpBy(amount);
- fSendEpsInstant.tick(amount);
- fSendEpsShort.tick(amount);
- fSendEpsLong.tick(amount);
- }
- }
-
- @Override
- public void onKafkaConsumerCacheMiss() {
- fKafkaConsumerCacheMiss.bump();
- }
-
- @Override
- public void onKafkaConsumerCacheHit() {
- fKafkaConsumerCacheHit.bump();
- }
-
- @Override
- public void onKafkaConsumerClaimed() {
- fKafkaConsumerClaimed.bump();
- }
-
- @Override
- public void onKafkaConsumerTimeout() {
- fKafkaConsumerTimeout.bump();
- }
-
-}
diff --git a/src/main/java/com/att/nsa/cambria/beans/DMaaPNsaApiDb.java b/src/main/java/com/att/nsa/cambria/beans/DMaaPNsaApiDb.java
deleted file mode 100644
index ce257d4..0000000
--- a/src/main/java/com/att/nsa/cambria/beans/DMaaPNsaApiDb.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.cambria.beans;
-
-import java.security.Key;
-
-//import org.apache.log4-j.Logger;
-import org.springframework.beans.factory.annotation.Autowired;
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-import com.att.nsa.cambria.constants.CambriaConstants;
-import com.att.nsa.configs.ConfigDb;
-import com.att.nsa.configs.ConfigDbException;
-import com.att.nsa.configs.confimpl.EncryptingLayer;
-import com.att.nsa.drumlin.till.nv.rrNvReadable;
-import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
-import com.att.nsa.security.db.BaseNsaApiDbImpl;
-import com.att.nsa.security.db.EncryptingApiDbImpl;
-import com.att.nsa.security.db.NsaApiDb;
-import com.att.nsa.security.db.simple.NsaSimpleApiKey;
-import com.att.nsa.security.db.simple.NsaSimpleApiKeyFactory;
-import com.att.nsa.util.rrConvertor;
-
-/**
- *
- * @author author
- *
- */
-public class DMaaPNsaApiDb {
-
- //private rrNvReadable settings;
- private DMaaPZkConfigDb cdb;
-
- //private static final Logger log = Logger
- // .getLogger(DMaaPNsaApiDb.class.toString());
- private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPNsaApiDb.class);
-
-/**
- *
- * Constructor initialized
- * @param settings
- * @param cdb
- */
- @Autowired
- public DMaaPNsaApiDb(rrNvReadable settings, DMaaPZkConfigDb cdb) {
- //this.setSettings(settings);
- this.setCdb(cdb);
- }
- /**
- *
- * @param settings
- * @param cdb
- * @return
- * @throws ConfigDbException
- * @throws missingReqdSetting
- */
- public static NsaApiDb<NsaSimpleApiKey> buildApiKeyDb(
- rrNvReadable settings, ConfigDb cdb) throws ConfigDbException,
- missingReqdSetting {
- // Cambria uses an encrypted api key db
-
- //final String keyBase64 = settings.getString("cambria.secureConfig.key", null);
- final String keyBase64 =com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"cambria.secureConfig.key");
-
-
- // final String initVectorBase64 = settings.getString( "cambria.secureConfig.iv", null);
- final String initVectorBase64 =com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"cambria.secureConfig.iv");
- // if neither value was provided, don't encrypt api key db
- if (keyBase64 == null && initVectorBase64 == null) {
- log.info("This server is configured to use an unencrypted API key database. See the settings documentation.");
- return new BaseNsaApiDbImpl<NsaSimpleApiKey>(cdb,
- new NsaSimpleApiKeyFactory());
- } else if (keyBase64 == null) {
- // neither or both, otherwise something's goofed
- throw new missingReqdSetting("cambria.secureConfig.key");
- } else if (initVectorBase64 == null) {
- // neither or both, otherwise something's goofed
- throw new missingReqdSetting("cambria.secureConfig.iv");
- } else {
- log.info("This server is configured to use an encrypted API key database.");
- final Key key = EncryptingLayer.readSecretKey(keyBase64);
- final byte[] iv = rrConvertor.base64Decode(initVectorBase64);
- return new EncryptingApiDbImpl<NsaSimpleApiKey>(cdb,
- new NsaSimpleApiKeyFactory(), key, iv);
- }
- }
-
- /**
- * @return
- * returns settings
- */
-/* public rrNvReadable getSettings() {
- return settings;
- }*/
-
- /**
- * @param settings
- * set settings
- */
- /*public void setSettings(rrNvReadable settings) {
- this.settings = settings;
- }*/
-
- /**
- * @return
- * returns cbd
- */
- public DMaaPZkConfigDb getCdb() {
- return cdb;
- }
- /**
- * @param cdb
- * set cdb
- */
- public void setCdb(DMaaPZkConfigDb cdb) {
- this.cdb = cdb;
- }
-
-
-}
diff --git a/src/main/java/com/att/nsa/cambria/beans/DMaaPZkClient.java b/src/main/java/com/att/nsa/cambria/beans/DMaaPZkClient.java
deleted file mode 100644
index 590ecd6..0000000
--- a/src/main/java/com/att/nsa/cambria/beans/DMaaPZkClient.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.cambria.beans;
-
-import org.I0Itec.zkclient.ZkClient;
-import org.springframework.beans.factory.annotation.Qualifier;
-
-import com.att.nsa.cambria.utils.ConfigurationReader;
-import com.att.nsa.drumlin.till.nv.rrNvReadable;
-
-/**
- * Created for Zookeeper client which will read configuration and settings parameter
- * @author author
- *
- */
-public class DMaaPZkClient extends ZkClient {
-
- /**
- * This constructor will get the settings value from rrNvReadable
- * and ConfigurationReader's zookeeper connection
- * @param settings
- */
- public DMaaPZkClient(@Qualifier("propertyReader") rrNvReadable settings) {
- super(ConfigurationReader.getMainZookeeperConnectionString());
- }
-}
diff --git a/src/main/java/com/att/nsa/cambria/beans/DMaaPZkConfigDb.java b/src/main/java/com/att/nsa/cambria/beans/DMaaPZkConfigDb.java
deleted file mode 100644
index 8fe96e9..0000000
--- a/src/main/java/com/att/nsa/cambria/beans/DMaaPZkConfigDb.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.nsa.cambria.beans;
-
-import org.springframework.beans.factory.annotation.Qualifier;
-
-import com.att.nsa.cambria.constants.CambriaConstants;
-import com.att.nsa.cambria.utils.ConfigurationReader;
-import com.att.nsa.configs.confimpl.ZkConfigDb;
-import com.att.nsa.drumlin.till.nv.rrNvReadable;
-//import com.att.nsa.configs.confimpl.ZkConfigDb;
-/**
- * Provide the zookeeper config db connection
- * @author author
- *
- */
-public class DMaaPZkConfigDb extends ZkConfigDb {
- /**
- * This Constructor will provide the configuration details from the property reader
- * and DMaaPZkClient
- * @param zk
- * @param settings
- */
- public DMaaPZkConfigDb(@Qualifier("dMaaPZkClient") DMaaPZkClient zk,
- @Qualifier("propertyReader") rrNvReadable settings) {
-
- //super(com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbRoot)==null?CambriaConstants.kDefault_ZkConfigDbRoot:com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbRoot));
- super(ConfigurationReader.getMainZookeeperConnectionString(),ConfigurationReader.getMainZookeeperConnectionSRoot());
-
- }
-
-
-}
diff --git a/src/main/java/com/att/nsa/cambria/beans/LogDetails.java b/src/main/java/com/att/nsa/cambria/beans/LogDetails.java
deleted file mode 100644
index 5a195e9..0000000
--- a/src/main/java/com/att/nsa/cambria/beans/LogDetails.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-/**
- *
- */
-package com.att.nsa.cambria.beans;
-
-import java.util.Date;
-
-import com.att.nsa.cambria.constants.CambriaConstants;
-import com.att.nsa.cambria.utils.Utils;
-
-/**
- * @author author
- *
- */
-
-public class LogDetails {
-
- private String publisherId;
- private String topicId;
- private String subscriberGroupId;
- private String subscriberId;
- private String publisherIp;
- private String messageBatchId;
- private String messageSequence;
- private String messageTimestamp;
- private String consumeTimestamp;
- private String transactionIdTs;
- private String serverIp;
-
- private long messageLengthInBytes;
- private long totalMessageCount;
-
- private boolean transactionEnabled;
- /**
- * This is for transaction enabled logging details
- *
- */
- public LogDetails() {
- super();
- }
-
- public String getTransactionId() {
- StringBuilder transactionId = new StringBuilder();
- transactionId.append(transactionIdTs);
- transactionId.append(CambriaConstants.TRANSACTION_ID_SEPARATOR);
- transactionId.append(publisherIp);
- transactionId.append(CambriaConstants.TRANSACTION_ID_SEPARATOR);
- transactionId.append(messageBatchId);
- transactionId.append(CambriaConstants.TRANSACTION_ID_SEPARATOR);
- transactionId.append(messageSequence);
-
- return transactionId.toString();
- }
-
- public String getPublisherId() {
- return publisherId;
- }
-
- public void setPublisherId(String publisherId) {
- this.publisherId = publisherId;
- }
-
- public String getTopicId() {
- return topicId;
- }
-
- public void setTopicId(String topicId) {
- this.topicId = topicId;
- }
-
- public String getSubscriberGroupId() {
- return subscriberGroupId;
- }
-
- public void setSubscriberGroupId(String subscriberGroupId) {
- this.subscriberGroupId = subscriberGroupId;
- }
-
- public String getSubscriberId() {
- return subscriberId;
- }
-
- public void setSubscriberId(String subscriberId) {
- this.subscriberId = subscriberId;
- }
-
- public String getPublisherIp() {
- return publisherIp;
- }
-
- public void setPublisherIp(String publisherIp) {
- this.publisherIp = publisherIp;
- }
-
- public String getMessageBatchId() {
- return messageBatchId;
- }
-
- public void setMessageBatchId(Long messageBatchId) {
- this.messageBatchId = Utils.getFromattedBatchSequenceId(messageBatchId);
- }
-
- public String getMessageSequence() {
- return messageSequence;
- }
-
- public void setMessageSequence(String messageSequence) {
- this.messageSequence = messageSequence;
- }
-
- public String getMessageTimestamp() {
- return messageTimestamp;
- }
-
- public void setMessageTimestamp(String messageTimestamp) {
- this.messageTimestamp = messageTimestamp;
- }
-
- public String getPublishTimestamp() {
- return Utils.getFormattedDate(new Date());
- }
-
- public String getConsumeTimestamp() {
- return consumeTimestamp;
- }
-
- public void setConsumeTimestamp(String consumeTimestamp) {
- this.consumeTimestamp = consumeTimestamp;
- }
-
- public long getMessageLengthInBytes() {
- return messageLengthInBytes;
- }
-
- public void setMessageLengthInBytes(long messageLengthInBytes) {
- this.messageLengthInBytes = messageLengthInBytes;
- }
-
- public long getTotalMessageCount() {
- return totalMessageCount;
- }
-
- public void setTotalMessageCount(long totalMessageCount) {
- this.totalMessageCount = totalMessageCount;
- }
-
- public boolean isTransactionEnabled() {
- return transactionEnabled;
- }
-
- public void setTransactionEnabled(boolean transactionEnabled) {
- this.transactionEnabled = transactionEnabled;
- }
-
- public String getTransactionIdTs() {
- return transactionIdTs;
- }
-
- public void setTransactionIdTs(String transactionIdTs) {
- this.transactionIdTs = transactionIdTs;
- }
-
- public String getPublisherLogDetails() {
-
- StringBuilder buffer = new StringBuilder();
- buffer.append("[publisherId=" + publisherId);
- buffer.append(", topicId=" + topicId);
- buffer.append(", messageTimestamp=" + messageTimestamp);
- buffer.append(", publisherIp=" + publisherIp);
- buffer.append(", messageBatchId=" + messageBatchId);
- buffer.append(", messageSequence=" + messageSequence );
- buffer.append(", messageLengthInBytes=" + messageLengthInBytes);
- buffer.append(", transactionEnabled=" + transactionEnabled);
- buffer.append(", transactionId=" + getTransactionId());
- buffer.append(", publishTimestamp=" + getPublishTimestamp());
- buffer.append(", serverIp=" + getServerIp()+"]");
- return buffer.toString();
-
- }
-
- public String getServerIp() {
- return serverIp;
- }
-
- public void setServerIp(String serverIp) {
- this.serverIp = serverIp;
- }
-
- public void setMessageBatchId(String messageBatchId) {
- this.messageBatchId = messageBatchId;
- }
-
-}
diff --git a/src/main/java/com/att/nsa/cambria/beans/TopicBean.java b/src/main/java/com/att/nsa/cambria/beans/TopicBean.java
deleted file mode 100644
index 3303c07..0000000
--- a/src/main/java/com/att/nsa/cambria/beans/TopicBean.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-/**
- *
- */
-package com.att.nsa.cambria.beans;
-
-import java.io.Serializable;
-
-import javax.xml.bind.annotation.XmlRootElement;
-
-/**
- * @author author
- *
- */
-@XmlRootElement
-public class TopicBean implements Serializable {
-
- private static final long serialVersionUID = -8620390377775457949L;
- private String topicName;
- private String topicDescription;
-
- private int partitionCount = 1; //default values
- private int replicationCount = 1; //default value
-
- private boolean transactionEnabled;
-
- /**
- * constructor
- */
- public TopicBean() {
- super();
- }
-
- /**
- * constructor initialization with topic details name, description,
- * partition, replication, transaction
- *
- * @param topicName
- * @param description
- * @param partitionCount
- * @param replicationCount
- * @param transactionEnabled
- */
- public TopicBean(String topicName, String topicDescription, int partitionCount, int replicationCount,
- boolean transactionEnabled) {
- super();
- this.topicName = topicName;
- this.topicDescription = topicDescription;
- this.partitionCount = partitionCount;
- this.replicationCount = replicationCount;
- this.transactionEnabled = transactionEnabled;
- }
-
- /**
- * @return
- * returns topic name which is of String type
- */
- public String getTopicName() {
- return topicName;
- }
-
- /**
- * @param topicName
- * set topic name
- */
- public void setTopicName(String topicName) {
- this.topicName = topicName;
- }
-
-
- /**
- * @return
- * returns partition count which is of int type
- */
- public int getPartitionCount() {
- return partitionCount;
- }
-
- /**
- * @param partitionCount
- * set partition Count
- */
- public void setPartitionCount(int partitionCount) {
- this.partitionCount = partitionCount;
- }
-
- /**
- * @return
- * returns replication count which is of int type
- */
- public int getReplicationCount() {
- return replicationCount;
- }
-
- /**
- * @param
- * set replication count which is of int type
- */
- public void setReplicationCount(int replicationCount) {
- this.replicationCount = replicationCount;
- }
-
- /**
- * @return
- * returns boolean value which indicates whether transaction is Enabled
- */
- public boolean isTransactionEnabled() {
- return transactionEnabled;
- }
-
- /**
- * @param
- * sets boolean value which indicates whether transaction is Enabled
- */
- public void setTransactionEnabled(boolean transactionEnabled) {
- this.transactionEnabled = transactionEnabled;
- }
-
- /**
- *
- * @return returns description which is of String type
- */
- public String getTopicDescription() {
- return topicDescription;
- }
- /**
- *
- * @param topicDescription
- * set description which is of String type
- */
- public void setTopicDescription(String topicDescription) {
- this.topicDescription = topicDescription;
- }
-
-}
diff --git a/src/main/java/com/att/nsa/cambria/beans/ZkClientFactory.java b/src/main/java/com/att/nsa/cambria/beans/ZkClientFactory.java
deleted file mode 100644
index 2aedb95..0000000
--- a/src/main/java/com/att/nsa/cambria/beans/ZkClientFactory.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP Policy Engine
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package com.att.nsa.cambria.beans;
-
-import org.I0Itec.zkclient.ZkClient;
-
-import com.att.nsa.cambria.utils.ConfigurationReader;
-
-import kafka.utils.ZKStringSerializer$;
-
-public class ZkClientFactory {
-
- public static ZkClient createZkClient(){
- return new ZkClient(ConfigurationReader.getMainZookeeperConnectionString(), 10000, 10000,
- ZKStringSerializer$.MODULE$);
-
- }
-
-}