summaryrefslogtreecommitdiffstats
path: root/src/main/java/com/att/dmf/mr/utils
diff options
context:
space:
mode:
authorsunil unnava <su622b@att.com>2018-08-14 09:34:46 -0400
committersunil unnava <su622b@att.com>2018-08-14 09:39:23 -0400
commitb32effcaf5684d5e2f338a4537b71a2375c534e5 (patch)
treee1b80407f414509ffcc766b987ec6a95f7254b4e /src/main/java/com/att/dmf/mr/utils
parent0823cb186012c8e6b7de3d979dfabb9f838da7c2 (diff)
update the testcases after the kafka 11 changes
Issue-ID: DMAAP-526 Change-Id: I477a8ee05fb3cdd76af726b6ca0d1a69aa9eef93 Signed-off-by: sunil unnava <su622b@att.com>
Diffstat (limited to 'src/main/java/com/att/dmf/mr/utils')
-rw-r--r--src/main/java/com/att/dmf/mr/utils/ConfigurationReader.java498
-rw-r--r--src/main/java/com/att/dmf/mr/utils/DMaaPCuratorFactory.java69
-rw-r--r--src/main/java/com/att/dmf/mr/utils/DMaaPResponseBuilder.java377
-rw-r--r--src/main/java/com/att/dmf/mr/utils/Emailer.java213
-rw-r--r--src/main/java/com/att/dmf/mr/utils/PropertyReader.java125
-rw-r--r--src/main/java/com/att/dmf/mr/utils/Utils.java145
6 files changed, 1427 insertions, 0 deletions
diff --git a/src/main/java/com/att/dmf/mr/utils/ConfigurationReader.java b/src/main/java/com/att/dmf/mr/utils/ConfigurationReader.java
new file mode 100644
index 0000000..dd1e4eb
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/utils/ConfigurationReader.java
@@ -0,0 +1,498 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.utils;
+
+import javax.servlet.ServletException;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.curator.framework.CuratorFramework;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Component;
+
+import com.att.dmf.mr.backends.ConsumerFactory;
+import com.att.dmf.mr.backends.MetricsSet;
+import com.att.dmf.mr.backends.Publisher;
+import com.att.dmf.mr.backends.kafka.KafkaConsumerCache.KafkaConsumerCacheException;
+import com.att.dmf.mr.backends.memory.MemoryConsumerFactory;
+import com.att.dmf.mr.backends.memory.MemoryMetaBroker;
+import com.att.dmf.mr.backends.memory.MemoryQueue;
+import com.att.dmf.mr.backends.memory.MemoryQueuePublisher;
+import com.att.dmf.mr.beans.DMaaPCambriaLimiter;
+import com.att.dmf.mr.beans.DMaaPKafkaMetaBroker;
+import com.att.dmf.mr.beans.DMaaPZkConfigDb;
+import com.att.dmf.mr.constants.CambriaConstants;
+import com.att.dmf.mr.metabroker.Broker;
+
+import com.att.dmf.mr.metabroker.Broker1;
+import com.att.dmf.mr.security.DMaaPAuthenticator;
+import com.att.dmf.mr.security.impl.DMaaPOriginalUebAuthenticator;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import com.att.nsa.configs.ConfigDbException;
+import com.att.nsa.configs.confimpl.MemConfigDb;
+import com.att.nsa.drumlin.till.nv.rrNvReadable;
+import com.att.nsa.drumlin.till.nv.rrNvReadable.invalidSettingValue;
+import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
+import com.att.nsa.limits.Blacklist;
+import com.att.nsa.security.NsaAuthenticatorService;
+//import com.att.nsa.security.authenticators.OriginalUebAuthenticator;
+import com.att.nsa.security.db.BaseNsaApiDbImpl;
+import com.att.nsa.security.db.NsaApiDb;
+import com.att.nsa.security.db.NsaApiDb.KeyExistsException;
+import com.att.nsa.security.db.simple.NsaSimpleApiKey;
+import com.att.nsa.security.db.simple.NsaSimpleApiKeyFactory;
+
+/**
+ * Class is created for all the configuration for rest and service layer
+ * integration.
+ *
+ */
+@Component
+public class ConfigurationReader {
+
+// private rrNvReadable settings;
+ private Broker1 fMetaBroker;
+ private ConsumerFactory fConsumerFactory;
+ private Publisher fPublisher;
+ private MetricsSet fMetrics;
+ @Autowired
+ private DMaaPCambriaLimiter fRateLimiter;
+ private NsaApiDb<NsaSimpleApiKey> fApiKeyDb;
+ /* private DMaaPTransactionObjDB<DMaaPTransactionObj> fTranDb; */
+ private DMaaPAuthenticator<NsaSimpleApiKey> fSecurityManager;
+ private NsaAuthenticatorService<NsaSimpleApiKey> nsaSecurityManager;
+ private static CuratorFramework curator;
+ private ZkClient zk;
+ private DMaaPZkConfigDb fConfigDb;
+ private MemoryQueue q;
+ private MemoryMetaBroker mmb;
+ private Blacklist fIpBlackList;
+ private Emailer fEmailer;
+
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class);
+ //private static final Logger log = Logger.getLogger(ConfigurationReader.class.toString());
+
+ /**
+ * constructor to initialize all the values
+ *
+ * @param settings
+ * @param fMetrics
+ * @param zk
+ * @param fConfigDb
+ * @param fPublisher
+ * @param curator
+ * @param fConsumerFactory
+ * @param fMetaBroker
+ * @param q
+ * @param mmb
+ * @param fApiKeyDb
+ * @param fSecurityManager
+ * @throws missingReqdSetting
+ * @throws invalidSettingValue
+ * @throws ServletException
+ * @throws KafkaConsumerCacheException
+ * @throws ConfigDbException
+ */
+ @Autowired
+ public ConfigurationReader(@Qualifier("propertyReader") rrNvReadable settings,
+ @Qualifier("dMaaPMetricsSet") MetricsSet fMetrics, @Qualifier("dMaaPZkClient") ZkClient zk,
+ @Qualifier("dMaaPZkConfigDb") DMaaPZkConfigDb fConfigDb, @Qualifier("kafkaPublisher") Publisher fPublisher,
+ @Qualifier("curator") CuratorFramework curator,
+ @Qualifier("dMaaPKafkaConsumerFactory") ConsumerFactory fConsumerFactory,
+ @Qualifier("dMaaPKafkaMetaBroker") Broker1 fMetaBroker,
+ @Qualifier("q") MemoryQueue q,
+ @Qualifier("mmb") MemoryMetaBroker mmb, @Qualifier("dMaaPNsaApiDb") NsaApiDb<NsaSimpleApiKey> fApiKeyDb,
+ /*
+ * @Qualifier("dMaaPTranDb")
+ * DMaaPTransactionObjDB<DMaaPTransactionObj> fTranDb,
+ */
+ @Qualifier("dMaaPAuthenticatorImpl") DMaaPAuthenticator<NsaSimpleApiKey> fSecurityManager
+ )
+ throws missingReqdSetting, invalidSettingValue, ServletException, KafkaConsumerCacheException, ConfigDbException {
+ //this.settings = settings;
+ this.fMetrics = fMetrics;
+ this.zk = zk;
+ this.fConfigDb = fConfigDb;
+ this.fPublisher = fPublisher;
+ ConfigurationReader.curator = curator;
+ this.fConsumerFactory = fConsumerFactory;
+ this.fMetaBroker = fMetaBroker;
+ //System.out.println("SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSs " + fMetaBroker);
+ this.q = q;
+ this.mmb = mmb;
+ this.fApiKeyDb = fApiKeyDb;
+ /* this.fTranDb = fTranDb; */
+ this.fSecurityManager = fSecurityManager;
+
+ long allowedtimeSkewMs=600000L;
+ String strallowedTimeSkewM= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"authentication.allowedTimeSkewMs");
+ if(null!=strallowedTimeSkewM)allowedtimeSkewMs= Long.parseLong(strallowedTimeSkewM);
+
+ // boolean requireSecureChannel = true;
+ //String strrequireSecureChannel= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"aauthentication.requireSecureChannel");
+ //if(strrequireSecureChannel!=null)requireSecureChannel=Boolean.parseBoolean(strrequireSecureChannel);
+ //this.nsaSecurityManager = new NsaAuthenticatorService<NsaSimpleApiKey>(this.fApiKeyDb, settings.getLong("authentication.allowedTimeSkewMs", 600000L), settings.getBoolean("authentication.requireSecureChannel", true));
+ //this.nsaSecurityManager = new NsaAuthenticatorService<NsaSimpleApiKey>(this.fApiKeyDb, allowedtimeSkewMs, requireSecureChannel);
+
+ servletSetup();
+ }
+
+ protected void servletSetup()
+ throws rrNvReadable.missingReqdSetting, rrNvReadable.invalidSettingValue, ServletException, ConfigDbException {
+ try {
+
+ fMetrics.toJson();
+ fMetrics.setupCambriaSender();
+ // add the admin authenticator
+ // final String adminSecret = settings.getString ( CambriaConstants.kSetting_AdminSecret, null );
+ final String adminSecret = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_AdminSecret);
+ //adminSecret = "fe3cCompound";
+ if ( adminSecret != null && adminSecret.length () > 0 )
+ {
+ try
+ {
+
+ final NsaApiDb<NsaSimpleApiKey> adminDb = new BaseNsaApiDbImpl<NsaSimpleApiKey> ( new MemConfigDb(), new NsaSimpleApiKeyFactory() );
+ adminDb.createApiKey ( "admin", adminSecret );
+ //nsaSecurityManager.addAuthenticator ( new OriginalUebAuthenticator<NsaSimpleApiKey> ( adminDb, 10*60*1000 ) );
+ fSecurityManager.addAuthenticator ( new DMaaPOriginalUebAuthenticator<NsaSimpleApiKey> ( adminDb, 10*60*1000 ) );
+ }
+
+ catch ( KeyExistsException e )
+ {
+ throw new RuntimeException ( "This key can't exist in a fresh in-memory DB!", e );
+ }
+ }
+
+ // setup a backend
+ //final String type = settings.getString(CambriaConstants.kBrokerType, CambriaConstants.kBrokerType_Kafka);
+ String type = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kBrokerType);
+ if (type==null) type = CambriaConstants.kBrokerType_Kafka;
+ if (CambriaConstants.kBrokerType_Kafka.equalsIgnoreCase(type)) {
+ log.info("Broker Type is:" + CambriaConstants.kBrokerType_Kafka);
+ } else if (CambriaConstants.kBrokerType_Memory.equalsIgnoreCase(type)) {
+ log.info("Broker Type is:" + CambriaConstants.kBrokerType_Memory);
+ fPublisher = new MemoryQueuePublisher(q, mmb);
+ //Ramkumar remove below
+ // fMetaBroker = mmb;
+ fConsumerFactory = new MemoryConsumerFactory(q);
+ } else {
+ throw new IllegalArgumentException(
+ "Unrecognized type for " + CambriaConstants.kBrokerType + ": " + type + ".");
+ }
+ fIpBlackList = new Blacklist ( getfConfigDb(), getfConfigDb().parse ( "/ipBlacklist" ) );
+ this.fEmailer = new Emailer();
+ log.info("Broker Type is:" + type);
+
+ } catch (SecurityException e) {
+ throw new ServletException(e);
+ }
+ }
+
+ /**
+ * method returns metaBroker
+ *
+ * @return
+ */
+ public Broker1 getfMetaBroker() {
+ return fMetaBroker;
+ }
+
+ /**
+ * method to set the metaBroker
+ *
+ * @param fMetaBroker
+ */
+ public void setfMetaBroker(Broker1 fMetaBroker) {
+ this.fMetaBroker = fMetaBroker;
+ }
+
+ /**
+ * method to get ConsumerFactory Object
+ *
+ * @return
+ */
+ public ConsumerFactory getfConsumerFactory() {
+ return fConsumerFactory;
+ }
+
+ /**
+ * method to set the consumerfactory object
+ *
+ * @param fConsumerFactory
+ */
+ public void setfConsumerFactory(ConsumerFactory fConsumerFactory) {
+ this.fConsumerFactory = fConsumerFactory;
+ }
+
+ /**
+ * method to get Publisher object
+ *
+ * @return
+ */
+ public Publisher getfPublisher() {
+ return fPublisher;
+ }
+
+ /**
+ * method to set Publisher object
+ *
+ * @param fPublisher
+ */
+ public void setfPublisher(Publisher fPublisher) {
+ this.fPublisher = fPublisher;
+ }
+
+ /**
+ * method to get MetricsSet Object
+ *
+ * @return
+ */
+ public MetricsSet getfMetrics() {
+ return fMetrics;
+ }
+
+ /**
+ * method to set MetricsSet Object
+ *
+ * @param fMetrics
+ */
+ public void setfMetrics(MetricsSet fMetrics) {
+ this.fMetrics = fMetrics;
+ }
+
+ /**
+ * method to get DMaaPCambriaLimiter object
+ *
+ * @return
+ */
+ public DMaaPCambriaLimiter getfRateLimiter() {
+ return fRateLimiter;
+ }
+
+ /**
+ * method to set DMaaPCambriaLimiter object
+ *
+ * @param fRateLimiter
+ */
+ public void setfRateLimiter(DMaaPCambriaLimiter fRateLimiter) {
+ this.fRateLimiter = fRateLimiter;
+ }
+
+ /**
+ * Method to get DMaaPAuthenticator object
+ *
+ * @return
+ */
+ public DMaaPAuthenticator<NsaSimpleApiKey> getfSecurityManager() {
+ return fSecurityManager;
+ }
+
+ /**
+ * method to set DMaaPAuthenticator object
+ *
+ * @param fSecurityManager
+ */
+ public void setfSecurityManager(DMaaPAuthenticator<NsaSimpleApiKey> fSecurityManager) {
+ this.fSecurityManager = fSecurityManager;
+ }
+
+ /**
+ * method to get rrNvReadable object
+ *
+ * @return
+ */
+ /*public rrNvReadable getSettings() {
+ return settings;
+ }*/
+
+ /**
+ * method to set rrNvReadable object
+ *
+ * @param settings
+ */
+ /*public void setSettings(rrNvReadable settings) {
+ this.settings = settings;
+ }*/
+
+ /**
+ * method to get CuratorFramework object
+ *
+ * @return
+ */
+ public static CuratorFramework getCurator() {
+ return curator;
+ }
+
+ /**
+ * method to set CuratorFramework object
+ *
+ * @param curator
+ */
+ public static void setCurator(CuratorFramework curator) {
+ ConfigurationReader.curator = curator;
+ }
+
+ /**
+ * method to get ZkClient object
+ *
+ * @return
+ */
+ public ZkClient getZk() {
+ return zk;
+ }
+
+ /**
+ * method to set ZkClient object
+ *
+ * @param zk
+ */
+ public void setZk(ZkClient zk) {
+ this.zk = zk;
+ }
+
+ /**
+ * method to get DMaaPZkConfigDb object
+ *
+ * @return
+ */
+ public DMaaPZkConfigDb getfConfigDb() {
+ return fConfigDb;
+ }
+
+ /**
+ * method to set DMaaPZkConfigDb object
+ *
+ * @param fConfigDb
+ */
+ public void setfConfigDb(DMaaPZkConfigDb fConfigDb) {
+ this.fConfigDb = fConfigDb;
+ }
+
+ /**
+ * method to get MemoryQueue object
+ *
+ * @return
+ */
+ public MemoryQueue getQ() {
+ return q;
+ }
+
+ /**
+ * method to set MemoryQueue object
+ *
+ * @param q
+ */
+ public void setQ(MemoryQueue q) {
+ this.q = q;
+ }
+
+ /**
+ * method to get MemoryMetaBroker object
+ *
+ * @return
+ */
+ public MemoryMetaBroker getMmb() {
+ return mmb;
+ }
+
+ /**
+ * method to set MemoryMetaBroker object
+ *
+ * @param mmb
+ */
+ public void setMmb(MemoryMetaBroker mmb) {
+ this.mmb = mmb;
+ }
+
+ /**
+ * method to get NsaApiDb object
+ *
+ * @return
+ */
+ public NsaApiDb<NsaSimpleApiKey> getfApiKeyDb() {
+ return fApiKeyDb;
+ }
+
+ /**
+ * method to set NsaApiDb object
+ *
+ * @param fApiKeyDb
+ */
+ public void setfApiKeyDb(NsaApiDb<NsaSimpleApiKey> fApiKeyDb) {
+ this.fApiKeyDb = fApiKeyDb;
+ }
+
+ /*
+ * public DMaaPTransactionObjDB<DMaaPTransactionObj> getfTranDb() { return
+ * fTranDb; }
+ *
+ * public void setfTranDb(DMaaPTransactionObjDB<DMaaPTransactionObj>
+ * fTranDb) { this.fTranDb = fTranDb; }
+ */
+ /**
+ * method to get the zookeeper connection String
+ *
+ * @param settings
+ * @return
+ */
+ public static String getMainZookeeperConnectionString() {
+ //return settings.getString(CambriaConstants.kSetting_ZkConfigDbServers, CambriaConstants.kDefault_ZkConfigDbServers);
+
+ String zkServername = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbServers);
+ if (zkServername==null) zkServername=CambriaConstants.kDefault_ZkConfigDbServers;
+ return zkServername;
+ }
+
+ public static String getMainZookeeperConnectionSRoot(){
+ String strVal=com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,CambriaConstants.kSetting_ZkConfigDbRoot);
+
+ if (null==strVal)
+ strVal=CambriaConstants.kDefault_ZkConfigDbRoot;
+
+ return strVal;
+ }
+
+ public Blacklist getfIpBlackList() {
+ return fIpBlackList;
+ }
+
+ public void setfIpBlackList(Blacklist fIpBlackList) {
+ this.fIpBlackList = fIpBlackList;
+ }
+
+ public NsaAuthenticatorService<NsaSimpleApiKey> getNsaSecurityManager() {
+ return nsaSecurityManager;
+ }
+
+ public void setNsaSecurityManager(NsaAuthenticatorService<NsaSimpleApiKey> nsaSecurityManager) {
+ this.nsaSecurityManager = nsaSecurityManager;
+ }
+
+ public Emailer getSystemEmailer()
+ {
+ return this.fEmailer;
+ }
+
+
+}
diff --git a/src/main/java/com/att/dmf/mr/utils/DMaaPCuratorFactory.java b/src/main/java/com/att/dmf/mr/utils/DMaaPCuratorFactory.java
new file mode 100644
index 0000000..5a9968d
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/utils/DMaaPCuratorFactory.java
@@ -0,0 +1,69 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.utils;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+
+import com.att.dmf.mr.constants.CambriaConstants;
+import com.att.nsa.drumlin.till.nv.rrNvReadable;
+
+/**
+ *
+ *
+ * @author anowarul.islam
+ *
+ *
+ */
+public class DMaaPCuratorFactory {
+ /**
+ *
+ * method provide CuratorFramework object
+ *
+ * @param settings
+ * @return
+ *
+ *
+ *
+ */
+ public static CuratorFramework getCurator(rrNvReadable settings) {
+ String Setting_ZkConfigDbServers =com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, CambriaConstants.kSetting_ZkConfigDbServers);
+
+ if(null==Setting_ZkConfigDbServers)
+ Setting_ZkConfigDbServers =CambriaConstants.kDefault_ZkConfigDbServers;
+
+ String strSetting_ZkSessionTimeoutMs = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, CambriaConstants.kSetting_ZkSessionTimeoutMs);
+ if (strSetting_ZkSessionTimeoutMs==null) strSetting_ZkSessionTimeoutMs = CambriaConstants.kDefault_ZkSessionTimeoutMs+"";
+ int Setting_ZkSessionTimeoutMs = Integer.parseInt(strSetting_ZkSessionTimeoutMs);
+
+ String str_ZkConnectionTimeoutMs = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, CambriaConstants.kSetting_ZkSessionTimeoutMs);
+ if (str_ZkConnectionTimeoutMs==null) str_ZkConnectionTimeoutMs = CambriaConstants.kDefault_ZkConnectionTimeoutMs+"";
+ int setting_ZkConnectionTimeoutMs = Integer.parseInt(str_ZkConnectionTimeoutMs);
+
+
+ CuratorFramework curator = CuratorFrameworkFactory.newClient(
+ Setting_ZkConfigDbServers,Setting_ZkSessionTimeoutMs,setting_ZkConnectionTimeoutMs
+ ,new ExponentialBackoffRetry(1000, 5));
+ return curator;
+ }
+}
diff --git a/src/main/java/com/att/dmf/mr/utils/DMaaPResponseBuilder.java b/src/main/java/com/att/dmf/mr/utils/DMaaPResponseBuilder.java
new file mode 100644
index 0000000..4c38d57
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/utils/DMaaPResponseBuilder.java
@@ -0,0 +1,377 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.io.Writer;
+
+import javax.servlet.http.HttpServletResponse;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import com.att.dmf.mr.beans.DMaaPContext;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+/**
+ * class is used to create response object which is given to user
+ *
+ * @author nilanjana.maity
+ *
+ */
+
+public class DMaaPResponseBuilder {
+
+ //private static Logger log = Logger.getLogger(DMaaPResponseBuilder.class);
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPResponseBuilder.class);
+ protected static final int kBufferLength = 4096;
+
+ public static void setNoCacheHeadings(DMaaPContext ctx) {
+ HttpServletResponse response = ctx.getResponse();
+ response.addHeader("Cache-Control", "no-store, no-cache, must-revalidate");
+ response.addHeader("Pragma", "no-cache");
+ response.addHeader("Expires", "0");
+ }
+
+ /**
+ * static method is used to create response object associated with
+ * JSONObject
+ *
+ * @param ctx
+ * @param result
+ * @throws JSONException
+ * @throws IOException
+ */
+ public static void respondOk(DMaaPContext ctx, JSONObject result) throws JSONException, IOException {
+
+ respondOkWithStream(ctx, "application/json", new ByteArrayInputStream(result.toString(4).getBytes()));
+
+ }
+
+ /**
+ * method used to set staus to 204
+ *
+ * @param ctx
+ */
+ public static void respondOkNoContent(DMaaPContext ctx) {
+ try {
+ ctx.getResponse().setStatus(204);
+ } catch (Exception excp) {
+ log.error(excp.getMessage(), excp);
+ }
+ }
+
+ /**
+ * static method is used to create response object associated with html
+ *
+ * @param ctx
+ * @param html
+ */
+ public static void respondOkWithHtml(DMaaPContext ctx, String html) {
+ try {
+ respondOkWithStream(ctx, "text/html", new ByteArrayInputStream(html.toString().getBytes()));
+ } catch (Exception excp) {
+ log.error(excp.getMessage(), excp);
+ }
+ }
+
+ /**
+ * method used to create response object associated with InputStream
+ *
+ * @param ctx
+ * @param mediaType
+ * @param is
+ * @throws IOException
+ */
+ public static void respondOkWithStream(DMaaPContext ctx, String mediaType, final InputStream is)
+ throws IOException {
+ /*
+ * creates response object associated with streamwriter
+ */
+ respondOkWithStream(ctx, mediaType, new StreamWriter() {
+
+ public void write(OutputStream os) throws IOException {
+ copyStream(is, os);
+ }
+ });
+
+ }
+
+ /**
+ *
+ * @param ctx
+ * @param mediaType
+ * @param writer
+ * @throws IOException
+ */
+ public static void respondOkWithStream(DMaaPContext ctx, String mediaType, StreamWriter writer) throws IOException {
+ ctx.getResponse().setStatus(200);
+ OutputStream os = getStreamForBinaryResponse(ctx, mediaType);
+ writer.write(os);
+ os.close();
+
+
+ }
+
+ /**
+ * static method to create error objects
+ *
+ * @param ctx
+ * @param errCode
+ * @param msg
+ */
+ public static void respondWithError(DMaaPContext ctx, int errCode, String msg) {
+ try {
+ ctx.getResponse().sendError(errCode, msg);
+ } catch (IOException excp) {
+ log.error(excp.getMessage(), excp);
+ }
+ }
+
+ /**
+ * method to create error objects
+ *
+ * @param ctx
+ * @param errCode
+ * @param body
+ */
+ public static void respondWithError(DMaaPContext ctx, int errCode, JSONObject body) {
+ try {
+ sendErrorAndBody(ctx, errCode, body.toString(4), "application/json");
+ } catch (Exception excp) {
+ log.error(excp.getMessage(), excp);
+ }
+ }
+
+ /**
+ * static method creates error object in JSON
+ *
+ * @param ctx
+ * @param errCode
+ * @param msg
+ */
+ public static void respondWithErrorInJson(DMaaPContext ctx, int errCode, String msg) {
+ try {
+ JSONObject o = new JSONObject();
+ o.put("status", errCode);
+ o.put("message", msg);
+ respondWithError(ctx, errCode, o);
+
+ } catch (Exception excp) {
+ log.error(excp.getMessage(), excp);
+ }
+ }
+
+ /**
+ * static method used to copy the stream with the help of another method
+ * copystream
+ *
+ * @param in
+ * @param out
+ * @throws IOException
+ */
+ public static void copyStream(InputStream in, OutputStream out) throws IOException {
+ copyStream(in, out, 4096);
+ }
+
+ /**
+ * static method to copy the streams
+ *
+ * @param in
+ * @param out
+ * @param bufferSize
+ * @throws IOException
+ */
+ public static void copyStream(InputStream in, OutputStream out, int bufferSize) throws IOException {
+ byte[] buffer = new byte[bufferSize];
+ int len;
+ while ((len = in.read(buffer)) != -1) {
+ out.write(buffer, 0, len);
+ }
+ out.close();
+ }
+
+ /**
+ * interface used to define write method for outputStream
+ */
+ public static abstract interface StreamWriter {
+ /**
+ * abstract method used to write the response
+ *
+ * @param paramOutputStream
+ * @throws IOException
+ */
+ public abstract void write(OutputStream paramOutputStream) throws IOException;
+ }
+
+ /**
+ * static method returns stream for binary response
+ *
+ * @param ctx
+ * @return
+ * @throws IOException
+ */
+ public static OutputStream getStreamForBinaryResponse(DMaaPContext ctx) throws IOException {
+ return getStreamForBinaryResponse(ctx, "application/octet-stream");
+ }
+
+ /**
+ * static method returns stream for binaryResponses
+ *
+ * @param ctx
+ * @param contentType
+ * @return
+ * @throws IOException
+ */
+ public static OutputStream getStreamForBinaryResponse(DMaaPContext ctx, String contentType) throws IOException {
+ ctx.getResponse().setContentType(contentType);
+
+
+ boolean fResponseEntityAllowed = (!(ctx.getRequest().getMethod().equalsIgnoreCase("HEAD")));
+
+ OutputStream os = null;
+ try{
+ if (fResponseEntityAllowed) {
+ os = ctx.getResponse().getOutputStream();
+ return os;
+ } else {
+ os = new NullStream();
+ return os;
+ }
+ }catch (Exception e){
+ throw new IOException();
+
+ }
+ finally{
+ if(null != os){
+ try{
+ os.close();
+ }catch(Exception e) {
+
+ }
+ }
+ }
+ }
+
+ /**
+ *
+ * @author anowarul.islam
+ *
+ */
+ private static class NullStream extends OutputStream {
+ /**
+ * @param b
+ * integer
+ */
+ public void write(int b) {
+ }
+ }
+
+ private static class NullWriter extends Writer {
+ /**
+ * write method
+ * @param cbuf
+ * @param off
+ * @param len
+ */
+ public void write(char[] cbuf, int off, int len) {
+ }
+
+ /**
+ * flush method
+ */
+ public void flush() {
+ }
+
+ /**
+ * close method
+ */
+ public void close() {
+ }
+ }
+
+ /**
+ * sttaic method fetch stream for text
+ *
+ * @param ctx
+ * @param err
+ * @param content
+ * @param mimeType
+ */
+ public static void sendErrorAndBody(DMaaPContext ctx, int err, String content, String mimeType) {
+ try {
+ setStatus(ctx, err);
+ getStreamForTextResponse(ctx, mimeType).println(content);
+ } catch (IOException e) {
+ log.error(new StringBuilder().append("Error sending error response: ").append(e.getMessage()).toString(),
+ e);
+ }
+ }
+
+ /**
+ * method to set the code
+ *
+ * @param ctx
+ * @param code
+ */
+ public static void setStatus(DMaaPContext ctx, int code) {
+ ctx.getResponse().setStatus(code);
+ }
+
+ /**
+ * static method returns stream for text response
+ *
+ * @param ctx
+ * @return
+ * @throws IOException
+ */
+ public static PrintWriter getStreamForTextResponse(DMaaPContext ctx) throws IOException {
+ return getStreamForTextResponse(ctx, "text/html");
+ }
+
+ /**
+ * static method returns stream for text response
+ *
+ * @param ctx
+ * @param contentType
+ * @return
+ * @throws IOException
+ */
+ public static PrintWriter getStreamForTextResponse(DMaaPContext ctx, String contentType) throws IOException {
+ ctx.getResponse().setContentType(contentType);
+
+ PrintWriter pw = null;
+ boolean fResponseEntityAllowed = (!(ctx.getRequest().getMethod().equalsIgnoreCase("HEAD")));
+
+ if (fResponseEntityAllowed) {
+ pw = ctx.getResponse().getWriter();
+ } else {
+ pw = new PrintWriter(new NullWriter());
+ }
+ return pw;
+ }
+} \ No newline at end of file
diff --git a/src/main/java/com/att/dmf/mr/utils/Emailer.java b/src/main/java/com/att/dmf/mr/utils/Emailer.java
new file mode 100644
index 0000000..4229d94
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/utils/Emailer.java
@@ -0,0 +1,213 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.utils;
+
+import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.mail.BodyPart;
+import javax.mail.Message;
+import javax.mail.Multipart;
+import javax.mail.PasswordAuthentication;
+import javax.mail.Session;
+import javax.mail.Transport;
+import javax.mail.internet.InternetAddress;
+import javax.mail.internet.MimeBodyPart;
+import javax.mail.internet.MimeMessage;
+import javax.mail.internet.MimeMultipart;
+
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+
+import com.att.ajsc.filemonitor.AJSCPropertiesMap;
+import com.att.dmf.mr.constants.CambriaConstants;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+/**
+ * Send an email from a message.
+ *
+ * @author peter
+ */
+public class Emailer
+{
+ public static final String kField_To = "to";
+ public static final String kField_Subject = "subject";
+ public static final String kField_Message = "message";
+
+ public Emailer()
+ {
+ fExec = Executors.newCachedThreadPool ();
+ // fSettings = settings;
+ }
+
+ public void send ( String to, String subj, String body ) throws IOException
+ {
+ final String[] addrs = to.split ( "," );
+
+ if ( to.length () > 0 )
+ {
+ final MailTask mt = new MailTask ( addrs, subj, body );
+ fExec.submit ( mt );
+ }
+ else
+ {
+ log.warn ( "At least one address is required." );
+ }
+ }
+
+ public void close ()
+ {
+ fExec.shutdown ();
+ }
+
+ private final ExecutorService fExec;
+ //private final rrNvReadable fSettings;
+
+ //private static final Logger log = LoggerFactory.getLogger ( Emailer.class );
+
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(Emailer.class);
+
+ public static final String kSetting_MailAuthUser = "mailLogin";
+ public static final String kSetting_MailAuthPwd = "mailPassword";
+ public static final String kSetting_MailFromEmail = "mailFromEmail";
+ public static final String kSetting_MailFromName = "mailFromName";
+ public static final String kSetting_SmtpServer = "mailSmtpServer";
+ public static final String kSetting_SmtpServerPort = "mailSmtpServerPort";
+ public static final String kSetting_SmtpServerSsl = "mailSmtpServerSsl";
+ public static final String kSetting_SmtpServerUseAuth = "mailSmtpServerUseAuth";
+
+ private class MailTask implements Runnable
+ {
+ public MailTask ( String[] to, String subject, String msgBody )
+ {
+ fToAddrs = to;
+ fSubject = subject;
+ fBody = msgBody;
+ }
+
+ private String getSetting ( String settingKey, String defval )
+ {
+ //return fSettings.getString ( settingKey, defval );
+ String strSet = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,settingKey);
+ if(strSet==null)strSet=defval;
+ return strSet;
+ }
+
+ // we need to get setting values from the evaluator but also the channel config
+ private void makeSetting ( Properties props, String propKey, String settingKey, String defval )
+ {
+ props.put ( propKey, getSetting ( settingKey, defval ) );
+ }
+
+ private void makeSetting ( Properties props, String propKey, String settingKey, int defval )
+ {
+ makeSetting ( props, propKey, settingKey, "" + defval );
+ }
+
+ private void makeSetting ( Properties props, String propKey, String settingKey, boolean defval )
+ {
+ makeSetting ( props, propKey, settingKey, "" + defval );
+ }
+
+ @Override
+ public void run ()
+ {
+ final StringBuffer tag = new StringBuffer ();
+ final StringBuffer addrList = new StringBuffer ();
+ tag.append ( "(" );
+ for ( String to : fToAddrs )
+ {
+ if ( addrList.length () > 0 )
+ {
+ addrList.append ( ", " );
+ }
+ addrList.append ( to );
+ }
+ tag.append ( addrList.toString () );
+ tag.append ( ") \"" );
+ tag.append ( fSubject );
+ tag.append ( "\"" );
+
+ log.info ( "sending mail to " + tag );
+
+ try
+ {
+ final Properties prop = new Properties ();
+ makeSetting ( prop, "mail.smtp.port", kSetting_SmtpServerPort, 587 );
+ prop.put ( "mail.smtp.socketFactory.fallback", "false" );
+ prop.put ( "mail.smtp.quitwait", "false" );
+ makeSetting ( prop, "mail.smtp.host", kSetting_SmtpServer, "smtp.it.att.com" );
+ makeSetting ( prop, "mail.smtp.auth", kSetting_SmtpServerUseAuth, true );
+ makeSetting ( prop, "mail.smtp.starttls.enable", kSetting_SmtpServerSsl, true );
+
+ final String un = getSetting ( kSetting_MailAuthUser, "" );
+ final String pw = getSetting ( kSetting_MailAuthPwd, "" );
+ final Session session = Session.getInstance ( prop,
+ new javax.mail.Authenticator()
+ {
+ @Override
+ protected PasswordAuthentication getPasswordAuthentication()
+ {
+ return new PasswordAuthentication ( un, pw );
+ }
+ }
+ );
+
+ final Message msg = new MimeMessage ( session );
+
+ final InternetAddress from = new InternetAddress (
+ getSetting ( kSetting_MailFromEmail, "team@sa2020.it.att.com" ),
+ getSetting ( kSetting_MailFromName, "The GFP/SA2020 Team" ) );
+ msg.setFrom ( from );
+ msg.setReplyTo ( new InternetAddress[] { from } );
+ msg.setSubject ( fSubject );
+
+ for ( String toAddr : fToAddrs )
+ {
+ final InternetAddress to = new InternetAddress ( toAddr );
+ msg.addRecipient ( Message.RecipientType.TO, to );
+ }
+
+ final Multipart multipart = new MimeMultipart ( "related" );
+ final BodyPart htmlPart = new MimeBodyPart ();
+ htmlPart.setContent ( fBody, "text/plain" );
+ multipart.addBodyPart ( htmlPart );
+ msg.setContent ( multipart );
+
+ Transport.send ( msg );
+
+ log.info ( "mailing " + tag + " off without error" );
+ }
+ catch ( Exception e )
+ {
+ log.warn ( "Exception caught for " + tag, e );
+ }
+ }
+
+ private final String[] fToAddrs;
+ private final String fSubject;
+ private final String fBody;
+ }
+}
diff --git a/src/main/java/com/att/dmf/mr/utils/PropertyReader.java b/src/main/java/com/att/dmf/mr/utils/PropertyReader.java
new file mode 100644
index 0000000..58c9fc9
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/utils/PropertyReader.java
@@ -0,0 +1,125 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.utils;
+
+import java.util.Map;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import com.att.nsa.drumlin.till.nv.impl.nvReadableStack;
+
+/**
+ *
+ * @author nilesh.labde
+ *
+ *
+ */
+public class PropertyReader extends nvReadableStack {
+ /**
+ *
+ * initializing logger
+ *
+ */
+ //private static final Logger LOGGER = Logger.getLogger(PropertyReader.class);
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(PropertyReader.class);
+// private static final String MSGRTR_PROPERTIES_FILE = "msgRtrApi.properties";
+
+ /**
+ * constructor initialization
+ *
+ * @throws loadException
+ *
+ */
+ public PropertyReader() throws loadException {
+ /* Map<String, String> argMap = new HashMap<String, String>();
+ final String config = getSetting(argMap, CambriaConstants.kConfig, MSGRTR_PROPERTIES_FILE);
+ final URL settingStream = findStream(config, ConfigurationReader.class);
+ push(new nvPropertiesFile(settingStream));
+ push(new nvReadableTable(argMap));*/
+ }
+
+ /**
+ *
+ *
+ * @param argMap
+ * @param key
+ * @param defaultValue
+ * @return
+ *
+ */
+ @SuppressWarnings("unused")
+ private static String getSetting(Map<String, String> argMap, final String key, final String defaultValue) {
+ String val = (String) argMap.get(key);
+ if (null == val) {
+ return defaultValue;
+ }
+ return val;
+ }
+
+ /**
+ *
+ * @param resourceName
+ * @param clazz
+ * @return
+ * @exception MalformedURLException
+ *
+ */
+ /*public static URL findStream(final String resourceName, Class<?> clazz) {
+ try {
+ File file = new File(resourceName);
+
+ if (file.isAbsolute()) {
+ return file.toURI().toURL();
+ }
+
+ String filesRoot = System.getProperty("RRWT_FILES", null);
+
+ if (null != filesRoot) {
+
+ String fullPath = filesRoot + "/" + resourceName;
+
+ LOGGER.debug("Looking for [" + fullPath + "].");
+
+ file = new File(fullPath);
+ if (file.exists()) {
+ return file.toURI().toURL();
+ }
+ }
+
+ URL res = clazz.getClassLoader().getResource(resourceName);
+
+ if (null != res) {
+ return res;
+ }
+
+ res = ClassLoader.getSystemResource(resourceName);
+
+ if (null != res) {
+ return res;
+ }
+ } catch (MalformedURLException e) {
+ LOGGER.error("Unexpected failure to convert a local filename into a URL: " + e.getMessage(), e);
+ }
+ return null;
+ }
+*/
+}
diff --git a/src/main/java/com/att/dmf/mr/utils/Utils.java b/src/main/java/com/att/dmf/mr/utils/Utils.java
new file mode 100644
index 0000000..70691cf
--- /dev/null
+++ b/src/main/java/com/att/dmf/mr/utils/Utils.java
@@ -0,0 +1,145 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.dmf.mr.utils;
+
+import java.text.DecimalFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Enumeration;
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.servlet.http.HttpServletRequest;
+
+import com.att.dmf.mr.beans.DMaaPContext;
+/**
+ * This is an utility class for various operations for formatting
+ * @author nilanjana.maity
+ *
+ */
+public class Utils {
+
+ private static final String DATE_FORMAT = "dd-MM-yyyy::hh:mm:ss:SSS";
+ public static final String CAMBRIA_AUTH_HEADER = "X-CambriaAuth";
+ private static final String BATCH_ID_FORMAT = "000000";
+
+ private Utils() {
+ super();
+ }
+
+ /**
+ * Formatting the date
+ * @param date
+ * @return date or null
+ */
+ public static String getFormattedDate(Date date) {
+ SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT);
+ if (null != date){
+ return sdf.format(date);
+ }
+ return null;
+ }
+ /**
+ * to get the details of User Api Key
+ * @param request
+ * @return authkey or null
+ */
+ public static String getUserApiKey(HttpServletRequest request) {
+ final String auth = request.getHeader(CAMBRIA_AUTH_HEADER);
+ if (null != auth) {
+ final String[] splittedAuthKey = auth.split(":");
+ return splittedAuthKey[0];
+ }else if (null!=request.getHeader("Authorization")){
+ /**
+ * AAF implementation enhancement
+ */
+ String user= request.getUserPrincipal().getName().toString();
+ return user.substring(0, user.lastIndexOf("@"));
+ }
+ return null;
+ }
+ /**
+ * to format the batch sequence id
+ * @param batchId
+ * @return batchId
+ */
+ public static String getFromattedBatchSequenceId(Long batchId) {
+ DecimalFormat format = new DecimalFormat(BATCH_ID_FORMAT);
+ return format.format(batchId);
+ }
+
+ /**
+ * to get the message length in bytes
+ * @param message
+ * @return bytes or 0
+ */
+ public static long messageLengthInBytes(String message) {
+ if (null != message) {
+ return message.getBytes().length;
+ }
+ return 0;
+ }
+ /**
+ * To get transaction id details
+ * @param transactionId
+ * @return transactionId or null
+ */
+ public static String getResponseTransactionId(String transactionId) {
+ if (null != transactionId && !transactionId.isEmpty()) {
+ return transactionId.substring(0, transactionId.lastIndexOf("::"));
+ }
+ return null;
+ }
+
+ /**
+ * get the thread sleep time
+ * @param ratePerMinute
+ * @return ratePerMinute or 0
+ */
+ public static long getSleepMsForRate ( double ratePerMinute )
+ {
+ if ( ratePerMinute <= 0.0 ) return 0;
+ return Math.max ( 1000, Math.round ( 60 * 1000 / ratePerMinute ) );
+ }
+
+ public static String getRemoteAddress(DMaaPContext ctx)
+ {
+ String reqAddr = ctx.getRequest().getRemoteAddr();
+ String fwdHeader = getFirstHeader("X-Forwarded-For",ctx);
+ return ((fwdHeader != null) ? fwdHeader : reqAddr);
+ }
+ public static String getFirstHeader(String h,DMaaPContext ctx)
+ {
+ List l = getHeader(h,ctx);
+ return ((l.size() > 0) ? (String)l.iterator().next() : null);
+ }
+ public static List<String> getHeader(String h,DMaaPContext ctx)
+ {
+ LinkedList list = new LinkedList();
+ Enumeration e = ctx.getRequest().getHeaders(h);
+ while (e.hasMoreElements())
+ {
+ list.add(e.nextElement().toString());
+ }
+ return list;
+ }
+}