From b32effcaf5684d5e2f338a4537b71a2375c534e5 Mon Sep 17 00:00:00 2001 From: sunil unnava Date: Tue, 14 Aug 2018 09:34:46 -0400 Subject: update the testcases after the kafka 11 changes Issue-ID: DMAAP-526 Change-Id: I477a8ee05fb3cdd76af726b6ca0d1a69aa9eef93 Signed-off-by: sunil unnava --- .../java/com/att/nsa/cambria/beans/ApiKeyBean.java | 88 ---- .../att/nsa/cambria/beans/DMaaPCambriaLimiter.java | 228 ---------- .../com/att/nsa/cambria/beans/DMaaPContext.java | 104 ----- .../cambria/beans/DMaaPKafkaConsumerFactory.java | 323 --------------- .../nsa/cambria/beans/DMaaPKafkaMetaBroker.java | 461 --------------------- .../com/att/nsa/cambria/beans/DMaaPMetricsSet.java | 232 ----------- .../com/att/nsa/cambria/beans/DMaaPNsaApiDb.java | 139 ------- .../com/att/nsa/cambria/beans/DMaaPZkClient.java | 45 -- .../com/att/nsa/cambria/beans/DMaaPZkConfigDb.java | 52 --- .../java/com/att/nsa/cambria/beans/LogDetails.java | 214 ---------- .../java/com/att/nsa/cambria/beans/TopicBean.java | 155 ------- .../com/att/nsa/cambria/beans/ZkClientFactory.java | 36 -- 12 files changed, 2077 deletions(-) delete mode 100644 src/main/java/com/att/nsa/cambria/beans/ApiKeyBean.java delete mode 100644 src/main/java/com/att/nsa/cambria/beans/DMaaPCambriaLimiter.java delete mode 100644 src/main/java/com/att/nsa/cambria/beans/DMaaPContext.java delete mode 100644 src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaConsumerFactory.java delete mode 100644 src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaMetaBroker.java delete mode 100644 src/main/java/com/att/nsa/cambria/beans/DMaaPMetricsSet.java delete mode 100644 src/main/java/com/att/nsa/cambria/beans/DMaaPNsaApiDb.java delete mode 100644 src/main/java/com/att/nsa/cambria/beans/DMaaPZkClient.java delete mode 100644 src/main/java/com/att/nsa/cambria/beans/DMaaPZkConfigDb.java delete mode 100644 src/main/java/com/att/nsa/cambria/beans/LogDetails.java delete mode 100644 src/main/java/com/att/nsa/cambria/beans/TopicBean.java delete mode 100644 src/main/java/com/att/nsa/cambria/beans/ZkClientFactory.java (limited to 'src/main/java/com/att/nsa/cambria/beans') diff --git a/src/main/java/com/att/nsa/cambria/beans/ApiKeyBean.java b/src/main/java/com/att/nsa/cambria/beans/ApiKeyBean.java deleted file mode 100644 index df4a2ed..0000000 --- a/src/main/java/com/att/nsa/cambria/beans/ApiKeyBean.java +++ /dev/null @@ -1,88 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package com.att.nsa.cambria.beans; - -import java.io.Serializable; - -import javax.xml.bind.annotation.XmlRootElement; - -import com.att.nsa.drumlin.till.data.uniqueStringGenerator; -/** - * - * @author author - * - */ -@XmlRootElement -public class ApiKeyBean implements Serializable { - - private static final long serialVersionUID = -8219849086890567740L; - - private static final String KEY_CHARS = "ABCDEFGHJIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; - - private String email; - private String description; - /** - * constructor - */ - public ApiKeyBean() { - super(); - } -/** - * - * @param email - * @param description - */ - public ApiKeyBean(String email, String description) { - super(); - this.email = email; - this.description = description; - } - - public String getEmail() { - return email; - } - - public void setEmail(String email) { - this.email = email; - } - - public String getDescription() { - return description; - } - - public void setDescription(String description) { - this.description = description; - } - - public String getKey() { - return generateKey(16); - } - - public String getSharedSecret() { - return generateKey(24); - } - - private static String generateKey ( int length ) { - return uniqueStringGenerator.createKeyUsingAlphabet ( KEY_CHARS, length ); - } - -} diff --git a/src/main/java/com/att/nsa/cambria/beans/DMaaPCambriaLimiter.java b/src/main/java/com/att/nsa/cambria/beans/DMaaPCambriaLimiter.java deleted file mode 100644 index 4e9fc02..0000000 --- a/src/main/java/com/att/nsa/cambria/beans/DMaaPCambriaLimiter.java +++ /dev/null @@ -1,228 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package com.att.nsa.cambria.beans; - -import java.util.HashMap; -import java.util.concurrent.TimeUnit; - -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.stereotype.Component; - -import com.att.nsa.cambria.CambriaApiException; -import com.att.nsa.cambria.constants.CambriaConstants; -import com.att.nsa.cambria.exception.DMaaPResponseCode; -import com.att.nsa.cambria.exception.ErrorResponse; -import com.att.nsa.drumlin.service.standards.HttpStatusCodes; -import com.att.nsa.drumlin.till.nv.rrNvReadable; -import com.att.nsa.drumlin.till.nv.rrNvReadable.invalidSettingValue; -import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; -import com.att.nsa.metrics.impl.CdmRateTicker; - -/** - * class provide rate information - * - * @author author - * - */ -@Component -public class DMaaPCambriaLimiter { - /** - * constructor initializes - * - * @param settings - * @throws missingReqdSetting - * @throws invalidSettingValue - */ - @Autowired - public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings) - throws missingReqdSetting, invalidSettingValue { - fRateInfo = new HashMap(); - fMaxEmptyPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxEmptyPollsPerMinute, - CambriaConstants.kDefault_MaxEmptyPollsPerMinute); - fWindowLengthMins = settings.getInt(CambriaConstants.kSetting_RateLimitWindowLength, - CambriaConstants.kDefault_RateLimitWindowLength); - fSleepMs = settings.getLong(CambriaConstants.kSetting_MaxEmptyPollsPerMinute, - CambriaConstants.kDefault_SleepMsOnRateLimit); - } - - /** - * static method provide the sleep time - * - * @param ratePerMinute - * @return - */ - public static long getSleepMsForRate(double ratePerMinute) { - if (ratePerMinute <= 0.0) - return 0; - return Math.max(1000, Math.round(60 * 1000 / ratePerMinute)); - } - - /** - * Construct a rate limiter. - * - * @param maxEmptyPollsPerMinute - * Pass <= 0 to deactivate rate limiting. - * @param windowLengthMins - */ - public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute, int windowLengthMins) { - this(maxEmptyPollsPerMinute, windowLengthMins, getSleepMsForRate(maxEmptyPollsPerMinute)); - } - - /** - * Construct a rate limiter - * - * @param maxEmptyPollsPerMinute - * Pass <= 0 to deactivate rate limiting. - * @param sleepMs - * @param windowLengthMins - */ - public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute, int windowLengthMins, long sleepMs) { - fRateInfo = new HashMap(); - fMaxEmptyPollsPerMinute = Math.max(0, maxEmptyPollsPerMinute); - fWindowLengthMins = windowLengthMins; - fSleepMs = Math.max(0, sleepMs); - } - - /** - * Tell the rate limiter about a call to a topic/group/id. If the rate is - * too high, this call delays its return and throws an exception. - * - * @param topic - * @param consumerGroup - * @param clientId - * @throws CambriaApiException - */ - public void onCall(String topic, String consumerGroup, String clientId) throws CambriaApiException { - // do nothing if rate is configured 0 or less - if (fMaxEmptyPollsPerMinute <= 0) { - return; - } - - // setup rate info for this tuple - final RateInfo ri = getRateInfo(topic, consumerGroup, clientId); - - final double rate = ri.onCall(); - log.info(ri.getLabel() + ": " + rate + " empty replies/minute."); - - if (rate > fMaxEmptyPollsPerMinute) { - try { - log.warn(ri.getLabel() + ": " + rate + " empty replies/minute, limit is " + fMaxEmptyPollsPerMinute - + "."); - if (fSleepMs > 0) { - log.warn(ri.getLabel() + ": " + "Slowing response with " + fSleepMs - + " ms sleep, then responding in error."); - Thread.sleep(fSleepMs); - } else { - log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error."); - } - } catch (InterruptedException e) { - log.error(e.toString()); - Thread.currentThread().interrupt(); - } - ErrorResponse errRes = new ErrorResponse(HttpStatusCodes.k429_tooManyRequests, - DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(), - "This client is making too many requests. Please use a long poll " - + "setting to decrease the number of requests that result in empty responses. "); - log.info(errRes.toString()); - throw new CambriaApiException(errRes); - } - } - - /** - * - * @param topic - * @param consumerGroup - * @param clientId - * @param sentCount - */ - public void onSend(String topic, String consumerGroup, String clientId, long sentCount) { - // check for good replies - if (sentCount > 0) { - // that was a good send, reset the metric - getRateInfo(topic, consumerGroup, clientId).reset(); - } - } - - private static class RateInfo { - /** - * constructor initialzes - * - * @param label - * @param windowLengthMinutes - */ - public RateInfo(String label, int windowLengthMinutes) { - fLabel = label; - fCallRateSinceLastMsgSend = new CdmRateTicker("Call rate since last msg send", 1, TimeUnit.MINUTES, - windowLengthMinutes, TimeUnit.MINUTES); - } - - public String getLabel() { - return fLabel; - } - - /** - * CdmRateTicker is reset - */ - public void reset() { - fCallRateSinceLastMsgSend.reset(); - } - - /** - * - * @return - */ - public double onCall() { - fCallRateSinceLastMsgSend.tick(); - return fCallRateSinceLastMsgSend.getRate(); - } - - private final String fLabel; - private final CdmRateTicker fCallRateSinceLastMsgSend; - } - - private final HashMap fRateInfo; - private final double fMaxEmptyPollsPerMinute; - private final int fWindowLengthMins; - private final long fSleepMs; - //private static final Logger log = LoggerFactory.getLogger(DMaaPCambriaLimiter.class); - private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPCambriaLimiter.class); - private RateInfo getRateInfo(String topic, String consumerGroup, String clientId) { - final String key = makeKey(topic, consumerGroup, clientId); - RateInfo ri = fRateInfo.get(key); - if (ri == null) { - ri = new RateInfo(key, fWindowLengthMins); - fRateInfo.put(key, ri); - } - return ri; - } - - private String makeKey(String topic, String group, String id) { - return topic + "::" + group + "::" + id; - } -} diff --git a/src/main/java/com/att/nsa/cambria/beans/DMaaPContext.java b/src/main/java/com/att/nsa/cambria/beans/DMaaPContext.java deleted file mode 100644 index 79a8e1f..0000000 --- a/src/main/java/com/att/nsa/cambria/beans/DMaaPContext.java +++ /dev/null @@ -1,104 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package com.att.nsa.cambria.beans; - -import java.text.SimpleDateFormat; -import java.util.Date; - -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import javax.servlet.http.HttpSession; - -import com.att.nsa.cambria.utils.ConfigurationReader; - -/** - * DMaaPContext provide and maintain all the configurations , Http request/response - * Session and consumer Request Time - * @author author - * - */ -public class DMaaPContext { - - private ConfigurationReader configReader; - private HttpServletRequest request; - private HttpServletResponse response; - private HttpSession session; - private String consumerRequestTime; - static int i=0; - - public synchronized static long getBatchID() { - try{ - final long metricsSendTime = System.currentTimeMillis(); - final Date d = new Date(metricsSendTime); - final String text = new SimpleDateFormat("ddMMyyyyHHmmss").format(d); - long dt= Long.valueOf(text)+i; - i++; - return dt; - } - catch(NumberFormatException ex){ - return 0; - } - } - - public HttpServletRequest getRequest() { - return request; - } - - public void setRequest(HttpServletRequest request) { - this.request = request; - } - - public HttpServletResponse getResponse() { - return response; - } - - public void setResponse(HttpServletResponse response) { - this.response = response; - } - - public HttpSession getSession() { - this.session = request.getSession(); - return session; - } - - public void setSession(HttpSession session) { - this.session = session; - } - - public ConfigurationReader getConfigReader() { - return configReader; - } - - public void setConfigReader(ConfigurationReader configReader) { - this.configReader = configReader; - } - - public String getConsumerRequestTime() { - return consumerRequestTime; - } - - public void setConsumerRequestTime(String consumerRequestTime) { - this.consumerRequestTime = consumerRequestTime; - } - - -} diff --git a/src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaConsumerFactory.java b/src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaConsumerFactory.java deleted file mode 100644 index 63f1dd5..0000000 --- a/src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaConsumerFactory.java +++ /dev/null @@ -1,323 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package com.att.nsa.cambria.beans; - -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Properties; -import java.util.concurrent.TimeUnit; - -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.locks.InterProcessMutex; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; -import org.springframework.beans.factory.annotation.Qualifier; - -import com.att.nsa.cambria.backends.Consumer; -import com.att.nsa.cambria.backends.ConsumerFactory; -import com.att.nsa.cambria.backends.MetricsSet; -import com.att.nsa.cambria.backends.kafka.KafkaConsumer; -import com.att.nsa.cambria.backends.kafka.KafkaConsumerCache; -import com.att.nsa.cambria.backends.kafka.KafkaConsumerCache.KafkaConsumerCacheException; -import com.att.nsa.cambria.constants.CambriaConstants; -import com.att.nsa.cambria.utils.ConfigurationReader; -import com.att.nsa.drumlin.till.nv.rrNvReadable; -import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; -import com.att.ajsc.filemonitor.AJSCPropertiesMap; -import kafka.consumer.ConsumerConfig; -import kafka.javaapi.consumer.ConsumerConnector; - -/** - * @author author - * - */ -public class DMaaPKafkaConsumerFactory implements ConsumerFactory { - - //private static final Logger log = LoggerFactory .getLogger(DMaaPKafkaConsumerFactory.class); - private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPKafkaConsumerFactory.class); - /** - * constructor initialization - * - * @param settings - * @param metrics - * @param curator - * @throws missingReqdSetting - * @throws KafkaConsumerCacheException - * @throws UnknownHostException - */ - public DMaaPKafkaConsumerFactory( - @Qualifier("propertyReader") rrNvReadable settings, - @Qualifier("dMaaPMetricsSet") MetricsSet metrics, - @Qualifier("curator") CuratorFramework curator) - throws missingReqdSetting, KafkaConsumerCacheException, - UnknownHostException { - /*final String apiNodeId = settings.getString( - CambriaConstants.kSetting_ApiNodeIdentifier, - InetAddress.getLocalHost().getCanonicalHostName() - + ":" - + settings.getInt(CambriaConstants.kSetting_Port, - CambriaConstants.kDefault_Port));*/ - String apiNodeId = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, - CambriaConstants.kSetting_ApiNodeIdentifier); - if (apiNodeId == null){ - - apiNodeId=InetAddress.getLocalHost().getCanonicalHostName() - + ":" - + settings.getInt(CambriaConstants.kSetting_Port, - CambriaConstants.kDefault_Port); - } - - log.info("This Cambria API Node identifies itself as [" + apiNodeId - + "]."); - final String mode = CambriaConstants.DMAAP; - /*fSettings = settings; - fZooKeeper = fSettings.getString(kSettings_KafkaZookeeper, settings - .getString(CambriaConstants.kSetting_ZkConfigDbServers, - CambriaConstants.kDefault_ZkConfigDbServers));*/ - - String strkSettings_KafkaZookeeper = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSettings_KafkaZookeeper); - if(null==strkSettings_KafkaZookeeper){ - strkSettings_KafkaZookeeper = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbServers); - if (null==strkSettings_KafkaZookeeper) strkSettings_KafkaZookeeper = CambriaConstants.kDefault_ZkConfigDbServers; - - } - fZooKeeper= strkSettings_KafkaZookeeper; - - //final boolean isCacheEnabled = fSettings.getBoolean( - // kSetting_EnableCache, kDefault_IsCacheEnabled); - boolean kSetting_EnableCache= kDefault_IsCacheEnabled; - String strkSetting_EnableCache = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSetting_EnableCache+""); - if(null!=strkSetting_EnableCache) - { - kSetting_EnableCache=Boolean.parseBoolean(strkSetting_EnableCache); - } - - final boolean isCacheEnabled = kSetting_EnableCache; - - - fCache = (isCacheEnabled) ? new KafkaConsumerCache(apiNodeId, - metrics) : null; - if (fCache != null) { - fCache.startCache(mode, curator); - } - } - - @Override - public Consumer getConsumerFor(String topic, String consumerGroupName, - String consumerId, int timeoutMs) throws UnavailableException { - KafkaConsumer kc; - - try { - kc = (fCache != null) ? fCache.getConsumerFor(topic, - consumerGroupName, consumerId) : null; - } catch (KafkaConsumerCacheException e) { - throw new UnavailableException(e); - } - - if (kc == null) { - - final InterProcessMutex ipLock = new InterProcessMutex( ConfigurationReader.getCurator(), "/consumerFactory/" + topic + "/" + consumerGroupName + "/" + consumerId); -// final InterProcessMutex fLock = new InterProcessMutex( -// ConfigurationReader.getCurator(), "/consumerFactory/" -// + topic + "/" + consumerGroupName + "/" -// + consumerId); - boolean locked = false; - try { - - locked = ipLock.acquire(30, TimeUnit.SECONDS); - if (!locked) { - // FIXME: this seems to cause trouble in some cases. This exception - // gets thrown routinely. Possibly a consumer trying multiple servers - // at once, producing a never-ending cycle of overlapping locks? - // The problem is that it throws and winds up sending a 503 to the - // client, which would be incorrect if the client is causing trouble - // by switching back and forth. - - throw new UnavailableException("Could not acquire lock in order to create (topic, group, consumer) = " + "(" + topic + ", " + consumerGroupName + ", " + consumerId + ")"); - } - -// if (!fLock.acquire(30, TimeUnit.SECONDS)) { -// throw new UnavailableException( -// "Could not acquire lock in order to create (topic, group, consumer) = " -// + "(" + topic + ", " + consumerGroupName -// + ", " + consumerId + ")"); -// } - - fCache.signalOwnership(topic, consumerGroupName, consumerId); - - log.info("Creating Kafka consumer for group [" - + consumerGroupName + "], consumer [" + consumerId - + "], on topic [" + topic + "]."); - - final String fakeGroupName = consumerGroupName + "--" + topic; - - final ConsumerConfig ccc = createConsumerConfig(fakeGroupName, - consumerId); - final ConsumerConnector cc = kafka.consumer.Consumer - .createJavaConsumerConnector(ccc); - kc = new KafkaConsumer(topic, consumerGroupName, consumerId, cc); - - if (fCache != null) { - fCache.putConsumerFor(topic, consumerGroupName, consumerId, - kc); - } - } catch (org.I0Itec.zkclient.exception.ZkTimeoutException x) { - log.error("Exception find at getConsumerFor(String topic, String consumerGroupName,\r\n" + - " String consumerId, int timeoutMs) : " + x); - throw new UnavailableException("Couldn't connect to ZK."); - } catch (KafkaConsumerCacheException e) { - log.error("Failed to cache consumer (this may have performance implications): " - + e.getMessage()); - } catch (Exception e) { - throw new UnavailableException( - "Error while acquiring consumer factory lock", e); - } finally { - if ( locked ) - { - try { - ipLock.release(); - } catch (Exception e) { - throw new UnavailableException("Error while releasing consumer factory lock", e); - } - } - } - } - - return kc; - } - - @Override - public synchronized void destroyConsumer(String topic, - String consumerGroup, String clientId) { - if (fCache != null) { - fCache.dropConsumer(topic, consumerGroup, clientId); - } - } - - @Override - public synchronized Collection getConsumers() { - return fCache.getConsumers(); - } - - @Override - public synchronized void dropCache() { - fCache.dropAllConsumers(); - } - - private ConsumerConfig createConsumerConfig(String groupId, - String consumerId) { - final Properties props = new Properties(); - props.put("zookeeper.connect", fZooKeeper); - props.put("group.id", groupId); - props.put("consumer.id", consumerId); - //props.put("auto.commit.enable", "false"); - // additional settings: start with our defaults, then pull in configured - // overrides - props.putAll(KafkaInternalDefaults); - for (String key : KafkaConsumerKeys) { - transferSettingIfProvided(props, key, "kafka"); - } - - return new ConsumerConfig(props); - } - - //private final rrNvReadable fSettings; - private final KafkaConsumerCache fCache; - - private String fZooKeeper; - - private static final String kSettings_KafkaZookeeper = "kafka.client.zookeeper"; - - private static final HashMap KafkaInternalDefaults = new HashMap(); - - /** - * putting values in hashmap like consumer timeout, zookeeper time out, etc - * - * @param setting - */ - public static void populateKafkaInternalDefaultsMap() { - //@Qualifier("propertyReader") rrNvReadable setting) { - try { - - HashMap map1= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperties(CambriaConstants.msgRtr_prop); - - KafkaInternalDefaults.put("consumer.timeout.ms", - // AJSCPropertiesMap.get(CambriaConstants.msgRtr_prop, "consumer.timeout.ms")); - map1.get( "consumer.timeout.ms")); - - KafkaInternalDefaults.put("zookeeper.connection.timeout.ms", - //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "zookeeper.connection.timeout.ms")); - map1.get("zookeeper.connection.timeout.ms")); - KafkaInternalDefaults.put("zookeeper.session.timeout.ms", - //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "zookeeper.session.timeout.ms")); - map1.get("zookeeper.session.timeout.ms")); - KafkaInternalDefaults.put("zookeeper.sync.time.ms", - // AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "zookeeper.sync.time.ms")); - map1.get( "zookeeper.sync.time.ms")); - KafkaInternalDefaults.put("auto.commit.interval.ms", - //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "auto.commit.interval.ms")); - map1.get( "auto.commit.interval.ms")); - KafkaInternalDefaults.put("fetch.message.max.bytes", - //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "fetch.message.max.bytes")); - map1.get("fetch.message.max.bytes")); - KafkaInternalDefaults.put("auto.commit.enable", - // AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "auto.commit.enable")); - map1.get("auto.commit.enable")); - } catch (Exception e) { - log.error("Failed to load Kafka Internal Properties.", e); - } - } - - private static final String KafkaConsumerKeys[] = { "socket.timeout.ms", - "socket.receive.buffer.bytes", "fetch.message.max.bytes", - "auto.commit.interval.ms", "queued.max.message.chunks", - "rebalance.max.retries", "fetch.min.bytes", "fetch.wait.max.bytes", - "rebalance.backoff.ms", "refresh.leader.backoff.ms", - "auto.offset.reset", "consumer.timeout.ms", - "zookeeper.session.timeout.ms", "zookeeper.connection.timeout.ms", - "zookeeper.sync.time.ms" }; - - private static String makeLongKey(String key, String prefix) { - return prefix + "." + key; - } - - private void transferSettingIfProvided(Properties target, String key, - String prefix) { - String keyVal= AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,makeLongKey(key, prefix)); - - // if (fSettings.hasValueFor(makeLongKey(key, prefix))) { - if (null!=keyVal) { - // final String val = fSettings - // .getString(makeLongKey(key, prefix), ""); - log.info("Setting [" + key + "] to " + keyVal + "."); - target.put(key, keyVal); - } - } - - } - - diff --git a/src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaMetaBroker.java b/src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaMetaBroker.java deleted file mode 100644 index e7d777e..0000000 --- a/src/main/java/com/att/nsa/cambria/beans/DMaaPKafkaMetaBroker.java +++ /dev/null @@ -1,461 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package com.att.nsa.cambria.beans; - -import java.util.LinkedList; -import java.util.List; -import java.util.Properties; -import java.util.Set; -import java.util.TreeSet; - -import org.I0Itec.zkclient.ZkClient; -import org.I0Itec.zkclient.exception.ZkNoNodeException; -//import org.apache.log4-j.Logger; -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; - -import org.json.JSONArray; -import org.json.JSONObject; -import org.springframework.beans.factory.annotation.Qualifier; - -import com.att.nsa.cambria.CambriaApiException; -import com.att.nsa.cambria.metabroker.Broker; -import com.att.nsa.cambria.metabroker.Topic; -import com.att.nsa.cambria.utils.ConfigurationReader; -import com.att.nsa.configs.ConfigDb; -import com.att.nsa.configs.ConfigDbException; -import com.att.nsa.configs.ConfigPath; -import com.att.nsa.drumlin.service.standards.HttpStatusCodes; -import com.att.nsa.drumlin.till.nv.rrNvReadable; -import com.att.nsa.security.NsaAcl; -import com.att.nsa.security.NsaAclUtils; -import com.att.nsa.security.NsaApiKey; - -import kafka.admin.AdminUtils; -import kafka.utils.ZKStringSerializer$; - -/** - * class performing all topic operations - * - * @author author - * - */ - -public class DMaaPKafkaMetaBroker implements Broker { - - //private static final Logger log = Logger.getLogger(DMaaPKafkaMetaBroker.class); - private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class); - - - /** - * DMaaPKafkaMetaBroker constructor initializing - * - * @param settings - * @param zk - * @param configDb - */ - public DMaaPKafkaMetaBroker(@Qualifier("propertyReader") rrNvReadable settings, - @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) { - //fSettings = settings; - fZk = zk; - fCambriaConfig = configDb; - fBaseTopicData = configDb.parse("/topics"); - } - - @Override - public List getAllTopics() throws ConfigDbException { - log.info("Retrieving list of all the topics."); - final LinkedList result = new LinkedList(); - try { - log.info("Retrieving all topics from root: " + zkTopicsRoot); - final List topics = fZk.getChildren(zkTopicsRoot); - for (String topic : topics) { - result.add(new KafkaTopic(topic, fCambriaConfig, fBaseTopicData)); - } - - JSONObject dataObj = new JSONObject(); - dataObj.put("topics", new JSONObject()); - - for (String topic : topics) { - dataObj.getJSONObject("topics").put(topic, new JSONObject()); - } - } catch (ZkNoNodeException excp) { - // very fresh kafka doesn't have any topics or a topics node - log.error("ZK doesn't have a Kakfa topics node at " + zkTopicsRoot, excp); - } - return result; - } - - @Override - public Topic getTopic(String topic) throws ConfigDbException { - if (fZk.exists(zkTopicsRoot + "/" + topic)) { - return getKafkaTopicConfig(fCambriaConfig, fBaseTopicData, topic); - } - // else: no such topic in kafka - return null; - } - - /** - * static method get KafkaTopic object - * - * @param db - * @param base - * @param topic - * @return - * @throws ConfigDbException - */ - public static KafkaTopic getKafkaTopicConfig(ConfigDb db, ConfigPath base, String topic) throws ConfigDbException { - return new KafkaTopic(topic, db, base); - } - - /** - * creating topic - */ - @Override - public Topic createTopic(String topic, String desc, String ownerApiKey, int partitions, int replicas, - boolean transactionEnabled) throws TopicExistsException, CambriaApiException { - log.info("Creating topic: " + topic); - try { - log.info("Check if topic [" + topic + "] exist."); - // first check for existence "our way" - final Topic t = getTopic(topic); - if (t != null) { - log.info("Could not create topic [" + topic + "]. Topic Already exists."); - throw new TopicExistsException("Could not create topic [" + topic + "]. Topic Alreay exists."); - } - } catch (ConfigDbException e1) { - log.error("Topic [" + topic + "] could not be created. Couldn't check topic data in config db.", e1); - throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable, - "Couldn't check topic data in config db."); - } - - // we only allow 3 replicas. (If we don't test this, we get weird - // results from the cluster, - // so explicit test and fail.) - if (replicas < 1 || replicas > 3) { - log.info("Topic [" + topic + "] could not be created. The replica count must be between 1 and 3."); - throw new CambriaApiException(HttpStatusCodes.k400_badRequest, - "The replica count must be between 1 and 3."); - } - if (partitions < 1) { - log.info("Topic [" + topic + "] could not be created. The partition count must be at least 1."); - throw new CambriaApiException(HttpStatusCodes.k400_badRequest, "The partition count must be at least 1."); - } - - // create via kafka - try { - ZkClient zkClient = null; - try { - log.info("Loading zookeeper client for creating topic."); - // FIXME: use of this scala module$ thing is a goofy hack to - // make Kafka aware of the - // topic creation. (Otherwise, the topic is only partially - // created in ZK.) - zkClient = ZkClientFactory.createZkClient(); - log.info("Zookeeper client loaded successfully. Creating topic."); - AdminUtils.createTopic(zkClient, topic, partitions, replicas, new Properties()); - } catch (kafka.common.TopicExistsException e) { - log.error("Topic [" + topic + "] could not be created. " + e.getMessage(), e); - throw new TopicExistsException(topic); - } catch (ZkNoNodeException e) { - log.error("Topic [" + topic + "] could not be created. The Kafka cluster is not setup.", e); - // Kafka throws this when the server isn't running (and perhaps - // hasn't ever run) - throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable, - "The Kafka cluster is not setup."); - } catch (kafka.admin.AdminOperationException e) { - // Kafka throws this when the server isn't running (and perhaps - // hasn't ever run) - log.error("The Kafka cluster can't handle your request. Talk to the administrators: " + e.getMessage(), - e); - throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable, - "The Kafka cluster can't handle your request. Talk to the administrators."); - } finally { - log.info("Closing zookeeper connection."); - if (zkClient != null) - zkClient.close(); - } - - log.info("Creating topic entry for topic: " + topic); - // underlying Kafka topic created. now setup our API info - return createTopicEntry(topic, desc, ownerApiKey, transactionEnabled); - } catch (ConfigDbException excp) { - log.error("Failed to create topic data. Talk to the administrators: " + excp.getMessage(), excp); - throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable, - "Failed to create topic data. Talk to the administrators."); - } - } - - @Override - public void deleteTopic(String topic) throws CambriaApiException, TopicExistsException { - log.info("Deleting topic: " + topic); - ZkClient zkClient = null; - try { - log.info("Loading zookeeper client for topic deletion."); - // FIXME: use of this scala module$ thing is a goofy hack to make - // Kafka aware of the - // topic creation. (Otherwise, the topic is only partially created - // in ZK.) - zkClient = ZkClientFactory.createZkClient(); - - log.info("Zookeeper client loaded successfully. Deleting topic."); - AdminUtils.deleteTopic(zkClient, topic); - } catch (kafka.common.TopicExistsException e) { - log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e); - throw new TopicExistsException(topic); - } catch (ZkNoNodeException e) { - log.error("Failed to delete topic [" + topic + "]. The Kafka cluster is not setup." + e.getMessage(), e); - // Kafka throws this when the server isn't running (and perhaps - // hasn't ever run) - throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable, "The Kafka cluster is not setup."); - } catch (kafka.admin.AdminOperationException e) { - // Kafka throws this when the server isn't running (and perhaps - // hasn't ever run) - log.error("The Kafka cluster can't handle your request. Talk to the administrators." + e.getMessage(), e); - throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable, - "The Kafka cluster can't handle your request. Talk to the administrators."); - } finally { - log.info("Closing zookeeper connection."); - if (zkClient != null) - zkClient.close(); - } - - // throw new UnsupportedOperationException ( "We can't programmatically - // delete Kafka topics yet." ); - } - - - - //private final rrNvReadable fSettings; - private final ZkClient fZk; - private final ConfigDb fCambriaConfig; - private final ConfigPath fBaseTopicData; - - private static final String zkTopicsRoot = "/brokers/topics"; - private static final JSONObject kEmptyAcl = new JSONObject(); - - /** - * method Providing KafkaTopic Object associated with owner and - * transactionenabled or not - * - * @param name - * @param desc - * @param owner - * @param transactionEnabled - * @return - * @throws ConfigDbException - */ - public KafkaTopic createTopicEntry(String name, String desc, String owner, boolean transactionEnabled) - throws ConfigDbException { - return createTopicEntry(fCambriaConfig, fBaseTopicData, name, desc, owner, transactionEnabled); - } - - /** - * static method giving kafka topic object - * - * @param db - * @param basePath - * @param name - * @param desc - * @param owner - * @param transactionEnabled - * @return - * @throws ConfigDbException - */ - public static KafkaTopic createTopicEntry(ConfigDb db, ConfigPath basePath, String name, String desc, String owner, - boolean transactionEnabled) throws ConfigDbException { - final JSONObject o = new JSONObject(); - o.put("owner", owner); - o.put("description", desc); - o.put("txenabled", transactionEnabled); - db.store(basePath.getChild(name), o.toString()); - return new KafkaTopic(name, db, basePath); - } - - /** - * class performing all user opearation like user is eligible to read, - * write. permitting a user to write and read, - * - * @author author - * - */ - public static class KafkaTopic implements Topic { - /** - * constructor initializes - * - * @param name - * @param configdb - * @param baseTopic - * @throws ConfigDbException - */ - public KafkaTopic(String name, ConfigDb configdb, ConfigPath baseTopic) throws ConfigDbException { - fName = name; - fConfigDb = configdb; - fBaseTopicData = baseTopic; - - String data = fConfigDb.load(fBaseTopicData.getChild(fName)); - if (data == null) { - data = "{}"; - } - - final JSONObject o = new JSONObject(data); - fOwner = o.optString("owner", ""); - fDesc = o.optString("description", ""); - fTransactionEnabled = o.optBoolean("txenabled", false);// default - // value is - // false - // if this topic has an owner, it needs both read/write ACLs. If there's no - // owner (or it's empty), null is okay -- this is for existing or implicitly - // created topics. - JSONObject readers = o.optJSONObject ( "readers" ); - if ( readers == null && fOwner.length () > 0 ) readers = kEmptyAcl; - fReaders = fromJson ( readers ); - - JSONObject writers = o.optJSONObject ( "writers" ); - if ( writers == null && fOwner.length () > 0 ) writers = kEmptyAcl; - fWriters = fromJson ( writers ); - } - private NsaAcl fromJson(JSONObject o) { - NsaAcl acl = new NsaAcl(); - if (o != null) { - JSONArray a = o.optJSONArray("allowed"); - if (a != null) { - for (int i = 0; i < a.length(); ++i) { - String user = a.getString(i); - acl.add(user); - } - } - } - return acl; - } - @Override - public String getName() { - return fName; - } - - @Override - public String getOwner() { - return fOwner; - } - - @Override - public String getDescription() { - return fDesc; - } - - @Override - public NsaAcl getReaderAcl() { - return fReaders; - } - - @Override - public NsaAcl getWriterAcl() { - return fWriters; - } - - @Override - public void checkUserRead(NsaApiKey user) throws AccessDeniedException { - NsaAclUtils.checkUserAccess ( fOwner, getReaderAcl(), user ); - } - - @Override - public void checkUserWrite(NsaApiKey user) throws AccessDeniedException { - NsaAclUtils.checkUserAccess ( fOwner, getWriterAcl(), user ); - } - - @Override - public void permitWritesFromUser(String pubId, NsaApiKey asUser) - throws ConfigDbException, AccessDeniedException { - updateAcl(asUser, false, true, pubId); - } - - @Override - public void denyWritesFromUser(String pubId, NsaApiKey asUser) throws ConfigDbException, AccessDeniedException { - updateAcl(asUser, false, false, pubId); - } - - @Override - public void permitReadsByUser(String consumerId, NsaApiKey asUser) - throws ConfigDbException, AccessDeniedException { - updateAcl(asUser, true, true, consumerId); - } - - @Override - public void denyReadsByUser(String consumerId, NsaApiKey asUser) - throws ConfigDbException, AccessDeniedException { - updateAcl(asUser, true, false, consumerId); - } - - private void updateAcl(NsaApiKey asUser, boolean reader, boolean add, String key) - throws ConfigDbException, AccessDeniedException{ - try - { - final NsaAcl acl = NsaAclUtils.updateAcl ( this, asUser, key, reader, add ); - - // we have to assume we have current data, or load it again. for the expected use - // case, assuming we can overwrite the data is fine. - final JSONObject o = new JSONObject (); - o.put ( "owner", fOwner ); - o.put ( "readers", safeSerialize ( reader ? acl : fReaders ) ); - o.put ( "writers", safeSerialize ( reader ? fWriters : acl ) ); - fConfigDb.store ( fBaseTopicData.getChild ( fName ), o.toString () ); - - log.info ( "ACL_UPDATE: " + asUser.getKey () + " " + ( add ? "added" : "removed" ) + ( reader?"subscriber":"publisher" ) + " " + key + " on " + fName ); - - } - catch ( ConfigDbException x ) - { - throw x; - } - catch ( AccessDeniedException x ) - { - throw x; - } - - } - - private JSONObject safeSerialize(NsaAcl acl) { - return acl == null ? null : acl.serialize(); - } - - private final String fName; - private final ConfigDb fConfigDb; - private final ConfigPath fBaseTopicData; - private final String fOwner; - private final String fDesc; - private final NsaAcl fReaders; - private final NsaAcl fWriters; - private boolean fTransactionEnabled; - - public boolean isTransactionEnabled() { - return fTransactionEnabled; - } - - @Override - public Set getOwners() { - final TreeSet owners = new TreeSet (); - owners.add ( fOwner ); - return owners; - } - } - -} diff --git a/src/main/java/com/att/nsa/cambria/beans/DMaaPMetricsSet.java b/src/main/java/com/att/nsa/cambria/beans/DMaaPMetricsSet.java deleted file mode 100644 index 3c3aa6d..0000000 --- a/src/main/java/com/att/nsa/cambria/beans/DMaaPMetricsSet.java +++ /dev/null @@ -1,232 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package com.att.nsa.cambria.beans; - -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.HashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import com.att.nsa.apiServer.metrics.cambria.DMaaPMetricsSender; -import com.att.nsa.cambria.CambriaApiVersionInfo; -import com.att.nsa.cambria.backends.MetricsSet; -import com.att.nsa.drumlin.till.nv.rrNvReadable; -import com.att.nsa.metrics.impl.CdmConstant; -import com.att.nsa.metrics.impl.CdmCounter; -import com.att.nsa.metrics.impl.CdmMetricsRegistryImpl; -import com.att.nsa.metrics.impl.CdmMovingAverage; -import com.att.nsa.metrics.impl.CdmRateTicker; -import com.att.nsa.metrics.impl.CdmSimpleMetric; -import com.att.nsa.metrics.impl.CdmStringConstant; -import com.att.nsa.metrics.impl.CdmTimeSince; - -/*@Component("dMaaPMetricsSet")*/ -/** - * Metrics related information - * - * @author author - * - */ -public class DMaaPMetricsSet extends CdmMetricsRegistryImpl implements MetricsSet { - - private final CdmStringConstant fVersion; - private final CdmConstant fStartTime; - private final CdmTimeSince fUpTime; - - private final CdmCounter fRecvTotal; - private final CdmRateTicker fRecvEpsInstant; - private final CdmRateTicker fRecvEpsShort; - private final CdmRateTicker fRecvEpsLong; - - private final CdmCounter fSendTotal; - private final CdmRateTicker fSendEpsInstant; - private final CdmRateTicker fSendEpsShort; - private final CdmRateTicker fSendEpsLong; - - private final CdmCounter fKafkaConsumerCacheMiss; - private final CdmCounter fKafkaConsumerCacheHit; - - private final CdmCounter fKafkaConsumerClaimed; - private final CdmCounter fKafkaConsumerTimeout; - - private final CdmSimpleMetric fFanOutRatio; - - private final HashMap fPathUseRates; - private final HashMap fPathAvgs; - - private rrNvReadable fSettings; - - private final ScheduledExecutorService fScheduler; - - /** - * Constructor initialization - * - * @param cs - */ - //public DMaaPMetricsSet() { - public DMaaPMetricsSet(rrNvReadable cs) { - //fSettings = cs; - - fVersion = new CdmStringConstant("Version " + CambriaApiVersionInfo.getVersion()); - super.putItem("version", fVersion); - - final long startTime = System.currentTimeMillis(); - final Date d = new Date(startTime); - final String text = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssz").format(d); - fStartTime = new CdmConstant(startTime / 1000, "Start Time (epoch); " + text); - super.putItem("startTime", fStartTime); - - fUpTime = new CdmTimeSince("seconds since start"); - super.putItem("upTime", fUpTime); - - fRecvTotal = new CdmCounter("Total events received since start"); - super.putItem("recvTotalEvents", fRecvTotal); - - fRecvEpsInstant = new CdmRateTicker("recv eps (1 min)", 1, TimeUnit.SECONDS, 1, TimeUnit.MINUTES); - super.putItem("recvEpsInstant", fRecvEpsInstant); - - fRecvEpsShort = new CdmRateTicker("recv eps (10 mins)", 1, TimeUnit.SECONDS, 10, TimeUnit.MINUTES); - super.putItem("recvEpsShort", fRecvEpsShort); - - fRecvEpsLong = new CdmRateTicker("recv eps (1 hr)", 1, TimeUnit.SECONDS, 1, TimeUnit.HOURS); - super.putItem("recvEpsLong", fRecvEpsLong); - - fSendTotal = new CdmCounter("Total events sent since start"); - super.putItem("sendTotalEvents", fSendTotal); - - fSendEpsInstant = new CdmRateTicker("send eps (1 min)", 1, TimeUnit.SECONDS, 1, TimeUnit.MINUTES); - super.putItem("sendEpsInstant", fSendEpsInstant); - - fSendEpsShort = new CdmRateTicker("send eps (10 mins)", 1, TimeUnit.SECONDS, 10, TimeUnit.MINUTES); - super.putItem("sendEpsShort", fSendEpsShort); - - fSendEpsLong = new CdmRateTicker("send eps (1 hr)", 1, TimeUnit.SECONDS, 1, TimeUnit.HOURS); - super.putItem("sendEpsLong", fSendEpsLong); - - fKafkaConsumerCacheMiss = new CdmCounter("Kafka Consumer Cache Misses"); - super.putItem("kafkaConsumerCacheMiss", fKafkaConsumerCacheMiss); - - fKafkaConsumerCacheHit = new CdmCounter("Kafka Consumer Cache Hits"); - super.putItem("kafkaConsumerCacheHit", fKafkaConsumerCacheHit); - - fKafkaConsumerClaimed = new CdmCounter("Kafka Consumers Claimed"); - super.putItem("kafkaConsumerClaims", fKafkaConsumerClaimed); - - fKafkaConsumerTimeout = new CdmCounter("Kafka Consumers Timedout"); - super.putItem("kafkaConsumerTimeouts", fKafkaConsumerTimeout); - - // FIXME: CdmLevel is not exactly a great choice - fFanOutRatio = new CdmSimpleMetric() { - @Override - public String getRawValueString() { - return getRawValue().toString(); - } - - @Override - public Number getRawValue() { - final double s = fSendTotal.getValue(); - final double r = fRecvTotal.getValue(); - return r == 0.0 ? 0.0 : s / r; - } - - @Override - public String summarize() { - return getRawValueString() + " sends per recv"; - } - - }; - super.putItem("fanOut", fFanOutRatio); - - // these are added to the metrics catalog as they're discovered - fPathUseRates = new HashMap(); - fPathAvgs = new HashMap(); - - fScheduler = Executors.newScheduledThreadPool(1); - } - - @Override - public void setupCambriaSender() { - DMaaPMetricsSender.sendPeriodically(fScheduler, this, "cambria.apinode.metrics.dmaap"); - } - - @Override - public void onRouteComplete(String name, long durationMs) { - CdmRateTicker ticker = fPathUseRates.get(name); - if (ticker == null) { - ticker = new CdmRateTicker("calls/min on path " + name + "", 1, TimeUnit.MINUTES, 1, TimeUnit.HOURS); - fPathUseRates.put(name, ticker); - super.putItem("pathUse_" + name, ticker); - } - ticker.tick(); - - CdmMovingAverage durs = fPathAvgs.get(name); - if (durs == null) { - durs = new CdmMovingAverage("ms avg duration on path " + name + ", last 10 minutes", 10, TimeUnit.MINUTES); - fPathAvgs.put(name, durs); - super.putItem("pathDurationMs_" + name, durs); - } - durs.tick(durationMs); - } - - @Override - public void publishTick(int amount) { - if (amount > 0) { - fRecvTotal.bumpBy(amount); - fRecvEpsInstant.tick(amount); - fRecvEpsShort.tick(amount); - fRecvEpsLong.tick(amount); - } - } - - @Override - public void consumeTick(int amount) { - if (amount > 0) { - fSendTotal.bumpBy(amount); - fSendEpsInstant.tick(amount); - fSendEpsShort.tick(amount); - fSendEpsLong.tick(amount); - } - } - - @Override - public void onKafkaConsumerCacheMiss() { - fKafkaConsumerCacheMiss.bump(); - } - - @Override - public void onKafkaConsumerCacheHit() { - fKafkaConsumerCacheHit.bump(); - } - - @Override - public void onKafkaConsumerClaimed() { - fKafkaConsumerClaimed.bump(); - } - - @Override - public void onKafkaConsumerTimeout() { - fKafkaConsumerTimeout.bump(); - } - -} diff --git a/src/main/java/com/att/nsa/cambria/beans/DMaaPNsaApiDb.java b/src/main/java/com/att/nsa/cambria/beans/DMaaPNsaApiDb.java deleted file mode 100644 index ce257d4..0000000 --- a/src/main/java/com/att/nsa/cambria/beans/DMaaPNsaApiDb.java +++ /dev/null @@ -1,139 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package com.att.nsa.cambria.beans; - -import java.security.Key; - -//import org.apache.log4-j.Logger; -import org.springframework.beans.factory.annotation.Autowired; -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; -import com.att.nsa.cambria.constants.CambriaConstants; -import com.att.nsa.configs.ConfigDb; -import com.att.nsa.configs.ConfigDbException; -import com.att.nsa.configs.confimpl.EncryptingLayer; -import com.att.nsa.drumlin.till.nv.rrNvReadable; -import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; -import com.att.nsa.security.db.BaseNsaApiDbImpl; -import com.att.nsa.security.db.EncryptingApiDbImpl; -import com.att.nsa.security.db.NsaApiDb; -import com.att.nsa.security.db.simple.NsaSimpleApiKey; -import com.att.nsa.security.db.simple.NsaSimpleApiKeyFactory; -import com.att.nsa.util.rrConvertor; - -/** - * - * @author author - * - */ -public class DMaaPNsaApiDb { - - //private rrNvReadable settings; - private DMaaPZkConfigDb cdb; - - //private static final Logger log = Logger - // .getLogger(DMaaPNsaApiDb.class.toString()); - private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPNsaApiDb.class); - -/** - * - * Constructor initialized - * @param settings - * @param cdb - */ - @Autowired - public DMaaPNsaApiDb(rrNvReadable settings, DMaaPZkConfigDb cdb) { - //this.setSettings(settings); - this.setCdb(cdb); - } - /** - * - * @param settings - * @param cdb - * @return - * @throws ConfigDbException - * @throws missingReqdSetting - */ - public static NsaApiDb buildApiKeyDb( - rrNvReadable settings, ConfigDb cdb) throws ConfigDbException, - missingReqdSetting { - // Cambria uses an encrypted api key db - - //final String keyBase64 = settings.getString("cambria.secureConfig.key", null); - final String keyBase64 =com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"cambria.secureConfig.key"); - - - // final String initVectorBase64 = settings.getString( "cambria.secureConfig.iv", null); - final String initVectorBase64 =com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"cambria.secureConfig.iv"); - // if neither value was provided, don't encrypt api key db - if (keyBase64 == null && initVectorBase64 == null) { - log.info("This server is configured to use an unencrypted API key database. See the settings documentation."); - return new BaseNsaApiDbImpl(cdb, - new NsaSimpleApiKeyFactory()); - } else if (keyBase64 == null) { - // neither or both, otherwise something's goofed - throw new missingReqdSetting("cambria.secureConfig.key"); - } else if (initVectorBase64 == null) { - // neither or both, otherwise something's goofed - throw new missingReqdSetting("cambria.secureConfig.iv"); - } else { - log.info("This server is configured to use an encrypted API key database."); - final Key key = EncryptingLayer.readSecretKey(keyBase64); - final byte[] iv = rrConvertor.base64Decode(initVectorBase64); - return new EncryptingApiDbImpl(cdb, - new NsaSimpleApiKeyFactory(), key, iv); - } - } - - /** - * @return - * returns settings - */ -/* public rrNvReadable getSettings() { - return settings; - }*/ - - /** - * @param settings - * set settings - */ - /*public void setSettings(rrNvReadable settings) { - this.settings = settings; - }*/ - - /** - * @return - * returns cbd - */ - public DMaaPZkConfigDb getCdb() { - return cdb; - } - /** - * @param cdb - * set cdb - */ - public void setCdb(DMaaPZkConfigDb cdb) { - this.cdb = cdb; - } - - -} diff --git a/src/main/java/com/att/nsa/cambria/beans/DMaaPZkClient.java b/src/main/java/com/att/nsa/cambria/beans/DMaaPZkClient.java deleted file mode 100644 index 590ecd6..0000000 --- a/src/main/java/com/att/nsa/cambria/beans/DMaaPZkClient.java +++ /dev/null @@ -1,45 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package com.att.nsa.cambria.beans; - -import org.I0Itec.zkclient.ZkClient; -import org.springframework.beans.factory.annotation.Qualifier; - -import com.att.nsa.cambria.utils.ConfigurationReader; -import com.att.nsa.drumlin.till.nv.rrNvReadable; - -/** - * Created for Zookeeper client which will read configuration and settings parameter - * @author author - * - */ -public class DMaaPZkClient extends ZkClient { - - /** - * This constructor will get the settings value from rrNvReadable - * and ConfigurationReader's zookeeper connection - * @param settings - */ - public DMaaPZkClient(@Qualifier("propertyReader") rrNvReadable settings) { - super(ConfigurationReader.getMainZookeeperConnectionString()); - } -} diff --git a/src/main/java/com/att/nsa/cambria/beans/DMaaPZkConfigDb.java b/src/main/java/com/att/nsa/cambria/beans/DMaaPZkConfigDb.java deleted file mode 100644 index 8fe96e9..0000000 --- a/src/main/java/com/att/nsa/cambria/beans/DMaaPZkConfigDb.java +++ /dev/null @@ -1,52 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -package com.att.nsa.cambria.beans; - -import org.springframework.beans.factory.annotation.Qualifier; - -import com.att.nsa.cambria.constants.CambriaConstants; -import com.att.nsa.cambria.utils.ConfigurationReader; -import com.att.nsa.configs.confimpl.ZkConfigDb; -import com.att.nsa.drumlin.till.nv.rrNvReadable; -//import com.att.nsa.configs.confimpl.ZkConfigDb; -/** - * Provide the zookeeper config db connection - * @author author - * - */ -public class DMaaPZkConfigDb extends ZkConfigDb { - /** - * This Constructor will provide the configuration details from the property reader - * and DMaaPZkClient - * @param zk - * @param settings - */ - public DMaaPZkConfigDb(@Qualifier("dMaaPZkClient") DMaaPZkClient zk, - @Qualifier("propertyReader") rrNvReadable settings) { - - //super(com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbRoot)==null?CambriaConstants.kDefault_ZkConfigDbRoot:com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbRoot)); - super(ConfigurationReader.getMainZookeeperConnectionString(),ConfigurationReader.getMainZookeeperConnectionSRoot()); - - } - - -} diff --git a/src/main/java/com/att/nsa/cambria/beans/LogDetails.java b/src/main/java/com/att/nsa/cambria/beans/LogDetails.java deleted file mode 100644 index 5a195e9..0000000 --- a/src/main/java/com/att/nsa/cambria/beans/LogDetails.java +++ /dev/null @@ -1,214 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -/** - * - */ -package com.att.nsa.cambria.beans; - -import java.util.Date; - -import com.att.nsa.cambria.constants.CambriaConstants; -import com.att.nsa.cambria.utils.Utils; - -/** - * @author author - * - */ - -public class LogDetails { - - private String publisherId; - private String topicId; - private String subscriberGroupId; - private String subscriberId; - private String publisherIp; - private String messageBatchId; - private String messageSequence; - private String messageTimestamp; - private String consumeTimestamp; - private String transactionIdTs; - private String serverIp; - - private long messageLengthInBytes; - private long totalMessageCount; - - private boolean transactionEnabled; - /** - * This is for transaction enabled logging details - * - */ - public LogDetails() { - super(); - } - - public String getTransactionId() { - StringBuilder transactionId = new StringBuilder(); - transactionId.append(transactionIdTs); - transactionId.append(CambriaConstants.TRANSACTION_ID_SEPARATOR); - transactionId.append(publisherIp); - transactionId.append(CambriaConstants.TRANSACTION_ID_SEPARATOR); - transactionId.append(messageBatchId); - transactionId.append(CambriaConstants.TRANSACTION_ID_SEPARATOR); - transactionId.append(messageSequence); - - return transactionId.toString(); - } - - public String getPublisherId() { - return publisherId; - } - - public void setPublisherId(String publisherId) { - this.publisherId = publisherId; - } - - public String getTopicId() { - return topicId; - } - - public void setTopicId(String topicId) { - this.topicId = topicId; - } - - public String getSubscriberGroupId() { - return subscriberGroupId; - } - - public void setSubscriberGroupId(String subscriberGroupId) { - this.subscriberGroupId = subscriberGroupId; - } - - public String getSubscriberId() { - return subscriberId; - } - - public void setSubscriberId(String subscriberId) { - this.subscriberId = subscriberId; - } - - public String getPublisherIp() { - return publisherIp; - } - - public void setPublisherIp(String publisherIp) { - this.publisherIp = publisherIp; - } - - public String getMessageBatchId() { - return messageBatchId; - } - - public void setMessageBatchId(Long messageBatchId) { - this.messageBatchId = Utils.getFromattedBatchSequenceId(messageBatchId); - } - - public String getMessageSequence() { - return messageSequence; - } - - public void setMessageSequence(String messageSequence) { - this.messageSequence = messageSequence; - } - - public String getMessageTimestamp() { - return messageTimestamp; - } - - public void setMessageTimestamp(String messageTimestamp) { - this.messageTimestamp = messageTimestamp; - } - - public String getPublishTimestamp() { - return Utils.getFormattedDate(new Date()); - } - - public String getConsumeTimestamp() { - return consumeTimestamp; - } - - public void setConsumeTimestamp(String consumeTimestamp) { - this.consumeTimestamp = consumeTimestamp; - } - - public long getMessageLengthInBytes() { - return messageLengthInBytes; - } - - public void setMessageLengthInBytes(long messageLengthInBytes) { - this.messageLengthInBytes = messageLengthInBytes; - } - - public long getTotalMessageCount() { - return totalMessageCount; - } - - public void setTotalMessageCount(long totalMessageCount) { - this.totalMessageCount = totalMessageCount; - } - - public boolean isTransactionEnabled() { - return transactionEnabled; - } - - public void setTransactionEnabled(boolean transactionEnabled) { - this.transactionEnabled = transactionEnabled; - } - - public String getTransactionIdTs() { - return transactionIdTs; - } - - public void setTransactionIdTs(String transactionIdTs) { - this.transactionIdTs = transactionIdTs; - } - - public String getPublisherLogDetails() { - - StringBuilder buffer = new StringBuilder(); - buffer.append("[publisherId=" + publisherId); - buffer.append(", topicId=" + topicId); - buffer.append(", messageTimestamp=" + messageTimestamp); - buffer.append(", publisherIp=" + publisherIp); - buffer.append(", messageBatchId=" + messageBatchId); - buffer.append(", messageSequence=" + messageSequence ); - buffer.append(", messageLengthInBytes=" + messageLengthInBytes); - buffer.append(", transactionEnabled=" + transactionEnabled); - buffer.append(", transactionId=" + getTransactionId()); - buffer.append(", publishTimestamp=" + getPublishTimestamp()); - buffer.append(", serverIp=" + getServerIp()+"]"); - return buffer.toString(); - - } - - public String getServerIp() { - return serverIp; - } - - public void setServerIp(String serverIp) { - this.serverIp = serverIp; - } - - public void setMessageBatchId(String messageBatchId) { - this.messageBatchId = messageBatchId; - } - -} diff --git a/src/main/java/com/att/nsa/cambria/beans/TopicBean.java b/src/main/java/com/att/nsa/cambria/beans/TopicBean.java deleted file mode 100644 index 3303c07..0000000 --- a/src/main/java/com/att/nsa/cambria/beans/TopicBean.java +++ /dev/null @@ -1,155 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ -/** - * - */ -package com.att.nsa.cambria.beans; - -import java.io.Serializable; - -import javax.xml.bind.annotation.XmlRootElement; - -/** - * @author author - * - */ -@XmlRootElement -public class TopicBean implements Serializable { - - private static final long serialVersionUID = -8620390377775457949L; - private String topicName; - private String topicDescription; - - private int partitionCount = 1; //default values - private int replicationCount = 1; //default value - - private boolean transactionEnabled; - - /** - * constructor - */ - public TopicBean() { - super(); - } - - /** - * constructor initialization with topic details name, description, - * partition, replication, transaction - * - * @param topicName - * @param description - * @param partitionCount - * @param replicationCount - * @param transactionEnabled - */ - public TopicBean(String topicName, String topicDescription, int partitionCount, int replicationCount, - boolean transactionEnabled) { - super(); - this.topicName = topicName; - this.topicDescription = topicDescription; - this.partitionCount = partitionCount; - this.replicationCount = replicationCount; - this.transactionEnabled = transactionEnabled; - } - - /** - * @return - * returns topic name which is of String type - */ - public String getTopicName() { - return topicName; - } - - /** - * @param topicName - * set topic name - */ - public void setTopicName(String topicName) { - this.topicName = topicName; - } - - - /** - * @return - * returns partition count which is of int type - */ - public int getPartitionCount() { - return partitionCount; - } - - /** - * @param partitionCount - * set partition Count - */ - public void setPartitionCount(int partitionCount) { - this.partitionCount = partitionCount; - } - - /** - * @return - * returns replication count which is of int type - */ - public int getReplicationCount() { - return replicationCount; - } - - /** - * @param - * set replication count which is of int type - */ - public void setReplicationCount(int replicationCount) { - this.replicationCount = replicationCount; - } - - /** - * @return - * returns boolean value which indicates whether transaction is Enabled - */ - public boolean isTransactionEnabled() { - return transactionEnabled; - } - - /** - * @param - * sets boolean value which indicates whether transaction is Enabled - */ - public void setTransactionEnabled(boolean transactionEnabled) { - this.transactionEnabled = transactionEnabled; - } - - /** - * - * @return returns description which is of String type - */ - public String getTopicDescription() { - return topicDescription; - } - /** - * - * @param topicDescription - * set description which is of String type - */ - public void setTopicDescription(String topicDescription) { - this.topicDescription = topicDescription; - } - -} diff --git a/src/main/java/com/att/nsa/cambria/beans/ZkClientFactory.java b/src/main/java/com/att/nsa/cambria/beans/ZkClientFactory.java deleted file mode 100644 index 2aedb95..0000000 --- a/src/main/java/com/att/nsa/cambria/beans/ZkClientFactory.java +++ /dev/null @@ -1,36 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP Policy Engine - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package com.att.nsa.cambria.beans; - -import org.I0Itec.zkclient.ZkClient; - -import com.att.nsa.cambria.utils.ConfigurationReader; - -import kafka.utils.ZKStringSerializer$; - -public class ZkClientFactory { - - public static ZkClient createZkClient(){ - return new ZkClient(ConfigurationReader.getMainZookeeperConnectionString(), 10000, 10000, - ZKStringSerializer$.MODULE$); - - } - -} -- cgit 1.2.3-korg