summaryrefslogtreecommitdiffstats
path: root/src/main/java/com/att/dmf/mr/service/impl
diff options
context:
space:
mode:
authorsunil unnava <sunil.unnava@att.com>2018-10-23 12:18:59 -0400
committersunil unnava <sunil.unnava@att.com>2018-10-23 12:22:02 -0400
commit3504265229c589ecc166e3ad4c33bb198b11e4ce (patch)
tree022235018aa3ad863eaf24862543bbd509f35a21 /src/main/java/com/att/dmf/mr/service/impl
parent8a3dfd3fe521f18ce07c2d24202a51b28d424fa2 (diff)
update the package name1.1.11
Issue-ID: DMAAP-858 Change-Id: I49ae6eb9c51a261b64b911e607fcbbca46c5423c Signed-off-by: sunil unnava <sunil.unnava@att.com>
Diffstat (limited to 'src/main/java/com/att/dmf/mr/service/impl')
-rw-r--r--src/main/java/com/att/dmf/mr/service/impl/AdminServiceImpl.java190
-rw-r--r--src/main/java/com/att/dmf/mr/service/impl/ApiKeysServiceImpl.java320
-rw-r--r--src/main/java/com/att/dmf/mr/service/impl/BaseTransactionDbImpl.java153
-rw-r--r--src/main/java/com/att/dmf/mr/service/impl/EventsServiceImpl.java867
-rw-r--r--src/main/java/com/att/dmf/mr/service/impl/MMServiceImpl.java600
-rw-r--r--src/main/java/com/att/dmf/mr/service/impl/MetricsServiceImpl.java115
-rw-r--r--src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java694
-rw-r--r--src/main/java/com/att/dmf/mr/service/impl/TransactionServiceImpl.java100
-rw-r--r--src/main/java/com/att/dmf/mr/service/impl/UIServiceImpl.java210
9 files changed, 0 insertions, 3249 deletions
diff --git a/src/main/java/com/att/dmf/mr/service/impl/AdminServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/AdminServiceImpl.java
deleted file mode 100644
index f7c48de..0000000
--- a/src/main/java/com/att/dmf/mr/service/impl/AdminServiceImpl.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
-*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.dmf.mr.service.impl;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Set;
-
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.springframework.stereotype.Component;
-
-import com.att.dmf.mr.backends.Consumer;
-import com.att.dmf.mr.backends.ConsumerFactory;
-import com.att.dmf.mr.beans.DMaaPContext;
-import com.att.dmf.mr.security.DMaaPAuthenticatorImpl;
-import com.att.dmf.mr.service.AdminService;
-import com.att.dmf.mr.utils.DMaaPResponseBuilder;
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-import com.att.nsa.configs.ConfigDbException;
-import com.att.nsa.limits.Blacklist;
-import com.att.nsa.security.NsaApiKey;
-import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
-
-
-/**
- * @author muzainulhaque.qazi
- *
- */
-@Component
-public class AdminServiceImpl implements AdminService {
-
- //private Logger log = Logger.getLogger(AdminServiceImpl.class.toString());
- private static final EELFLogger log = EELFManager.getInstance().getLogger(AdminServiceImpl.class);
- /**
- * getConsumerCache returns consumer cache
- * @param dMaaPContext context
- * @throws IOException ex
- * @throws AccessDeniedException
- */
- @Override
- public void showConsumerCache(DMaaPContext dMaaPContext) throws IOException, AccessDeniedException {
- adminAuthenticate(dMaaPContext);
-
- JSONObject consumers = new JSONObject();
- JSONArray jsonConsumersList = new JSONArray();
-
- for (Consumer consumer : getConsumerFactory(dMaaPContext).getConsumers()) {
- JSONObject consumerObject = new JSONObject();
- consumerObject.put("name", consumer.getName());
- consumerObject.put("created", consumer.getCreateTimeMs());
- consumerObject.put("accessed", consumer.getLastAccessMs());
- jsonConsumersList.put(consumerObject);
- }
-
- consumers.put("consumers", jsonConsumersList);
- log.info("========== AdminServiceImpl: getConsumerCache: " + jsonConsumersList.toString() + "===========");
- DMaaPResponseBuilder.respondOk(dMaaPContext, consumers);
- }
-
- /**
- *
- * dropConsumerCache() method clears consumer cache
- * @param dMaaPContext context
- * @throws JSONException ex
- * @throws IOException ex
- * @throws AccessDeniedException
- *
- */
- @Override
- public void dropConsumerCache(DMaaPContext dMaaPContext) throws JSONException, IOException, AccessDeniedException {
- adminAuthenticate(dMaaPContext);
- getConsumerFactory(dMaaPContext).dropCache();
- DMaaPResponseBuilder.respondOkWithHtml(dMaaPContext, "Consumer cache cleared successfully");
- // log.info("========== AdminServiceImpl: dropConsumerCache: Consumer
- // Cache successfully dropped.===========");
- }
-
- /**
- * getfConsumerFactory returns CosnumerFactory details
- * @param dMaaPContext contxt
- * @return ConsumerFactory obj
- *
- */
- private ConsumerFactory getConsumerFactory(DMaaPContext dMaaPContext) {
- return dMaaPContext.getConfigReader().getfConsumerFactory();
- }
-
- /**
- * return ipblacklist
- * @param dMaaPContext context
- * @return blacklist obj
- */
- private static Blacklist getIpBlacklist(DMaaPContext dMaaPContext) {
- return dMaaPContext.getConfigReader().getfIpBlackList();
- }
-
-
- /**
- * Get list of blacklisted ips
- */
- @Override
- public void getBlacklist ( DMaaPContext dMaaPContext ) throws IOException, AccessDeniedException
- {
- adminAuthenticate ( dMaaPContext );
-
- DMaaPResponseBuilder.respondOk ( dMaaPContext,
- new JSONObject().put ( "blacklist",
- setToJsonArray ( getIpBlacklist (dMaaPContext).asSet() ) ) );
- }
-
- public static JSONArray setToJsonArray ( Set<?> fields )
- {
- return collectionToJsonArray ( fields );
- }
-
- public static JSONArray collectionToJsonArray ( Collection<?> fields )
- {
- final JSONArray a = new JSONArray ();
- if ( fields != null )
- {
- for ( Object o : fields )
- {
- a.put ( o );
- }
- }
- return a;
- }
-
- /**
- * Add ip to blacklist
- */
- @Override
- public void addToBlacklist ( DMaaPContext dMaaPContext, String ip ) throws IOException, ConfigDbException, AccessDeniedException
- {
- adminAuthenticate ( dMaaPContext );
-
- getIpBlacklist (dMaaPContext).add ( ip );
- DMaaPResponseBuilder.respondOkNoContent ( dMaaPContext );
- }
-
- /**
- * Remove ip from blacklist
- */
- @Override
- public void removeFromBlacklist ( DMaaPContext dMaaPContext, String ip ) throws IOException, ConfigDbException, AccessDeniedException
- {
- adminAuthenticate ( dMaaPContext );
-
- getIpBlacklist (dMaaPContext).remove ( ip );
- DMaaPResponseBuilder.respondOkNoContent ( dMaaPContext );
- }
-
- /**
- * Authenticate if user is admin
- * @param dMaaPContext context
- * @throws AccessDeniedException ex
- */
- private static void adminAuthenticate ( DMaaPContext dMaaPContext ) throws AccessDeniedException
- {
-
- final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dMaaPContext);
- if ( user == null || !user.getKey ().equals ( "admin" ) )
- {
- throw new AccessDeniedException ();
- }
- }
-
-}
diff --git a/src/main/java/com/att/dmf/mr/service/impl/ApiKeysServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/ApiKeysServiceImpl.java
deleted file mode 100644
index b0e8a86..0000000
--- a/src/main/java/com/att/dmf/mr/service/impl/ApiKeysServiceImpl.java
+++ /dev/null
@@ -1,320 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
-*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.dmf.mr.service.impl;
-
-import java.io.IOException;
-
-import org.json.JSONArray;
-import org.json.JSONObject;
-import org.springframework.stereotype.Service;
-
-import com.att.dmf.mr.beans.ApiKeyBean;
-import com.att.dmf.mr.beans.DMaaPContext;
-import com.att.dmf.mr.constants.CambriaConstants;
-import com.att.dmf.mr.security.DMaaPAuthenticatorImpl;
-import com.att.dmf.mr.service.ApiKeysService;
-import com.att.dmf.mr.utils.ConfigurationReader;
-import com.att.dmf.mr.utils.DMaaPResponseBuilder;
-import com.att.dmf.mr.utils.Emailer;
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-import com.att.nsa.configs.ConfigDbException;
-import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
-import com.att.nsa.security.NsaApiKey;
-import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
-import com.att.nsa.security.db.NsaApiDb;
-import com.att.nsa.security.db.NsaApiDb.KeyExistsException;
-import com.att.nsa.security.db.simple.NsaSimpleApiKey;
-
-/**
- * Implementation of the ApiKeysService, this will provide the below operations,
- * getAllApiKeys, getApiKey, createApiKey, updateApiKey, deleteApiKey
- *
- * @author nilanjana.maity
- */
-@Service
-public class ApiKeysServiceImpl implements ApiKeysService {
-
-
- private static final EELFLogger log = EELFManager.getInstance().getLogger(ApiKeysServiceImpl.class.toString());
- /**
- * This method will provide all the ApiKeys present in kafka server.
- *
- * @param dmaapContext
- * @throws ConfigDbException
- * @throws IOException
- */
- public void getAllApiKeys(DMaaPContext dmaapContext)
- throws ConfigDbException, IOException {
-
- ConfigurationReader configReader = dmaapContext.getConfigReader();
-
- log.info("configReader : " + configReader.toString());
-
- final JSONObject result = new JSONObject();
- final JSONArray keys = new JSONArray();
- result.put("apiKeys", keys);
-
- NsaApiDb<NsaSimpleApiKey> apiDb = configReader.getfApiKeyDb();
-
- for (String key : apiDb.loadAllKeys()) {
- keys.put(key);
- }
- log.info("========== ApiKeysServiceImpl: getAllApiKeys: Api Keys are : "
- + keys.toString() + "===========");
- DMaaPResponseBuilder.respondOk(dmaapContext, result);
- }
-
- /**
- * @param dmaapContext
- * @param apikey
- * @throws ConfigDbException
- * @throws IOException
- */
- @Override
- public void getApiKey(DMaaPContext dmaapContext, String apikey)
- throws ConfigDbException, IOException {
-
- String errorMsg = "Api key name is not mentioned.";
- int errorCode = HttpStatusCodes.k400_badRequest;
-
- if (null != apikey) {
- NsaSimpleApiKey simpleApiKey = getApiKeyDb(dmaapContext)
- .loadApiKey(apikey);
-
-
- if (null != simpleApiKey) {
- JSONObject result = simpleApiKey.asJsonObject();
- DMaaPResponseBuilder.respondOk(dmaapContext, result);
- log.info("========== ApiKeysServiceImpl: getApiKey : "
- + result.toString() + "===========");
- return;
- } else {
- errorMsg = "Api key [" + apikey + "] does not exist.";
- errorCode = HttpStatusCodes.k404_notFound;
- log.info("========== ApiKeysServiceImpl: getApiKey: Error : API Key does not exist. "
- + "===========");
- DMaaPResponseBuilder.respondWithError(dmaapContext, errorCode,
- errorMsg);
- throw new IOException();
- }
- }
-
- }
-
- /**
- * @param dmaapContext
- * @param nsaApiKey
- * @throws KeyExistsException
- * @throws ConfigDbException
- * @throws IOException
- */
- @Override
- public void createApiKey(DMaaPContext dmaapContext, ApiKeyBean nsaApiKey)
- throws KeyExistsException, ConfigDbException, IOException {
-
- log.debug("TopicService: : createApiKey....");
-
- String contactEmail = nsaApiKey.getEmail();
- final boolean emailProvided = contactEmail != null && contactEmail.length() > 0 && contactEmail.indexOf("@") > 1 ;
- String kSetting_AllowAnonymousKeys= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"apiKeys.allowAnonymous");
- if(null==kSetting_AllowAnonymousKeys) kSetting_AllowAnonymousKeys ="false";
-
-
- if ( kSetting_AllowAnonymousKeys.equalsIgnoreCase("true") && !emailProvided )
- {
- DMaaPResponseBuilder.respondWithErrorInJson(dmaapContext, 400, "You must provide an email address.");
- return;
- }
-
-
- final NsaApiDb<NsaSimpleApiKey> apiKeyDb = getApiKeyDb(dmaapContext);
- String apiKey = nsaApiKey.getKey();
- String sharedSecret = nsaApiKey.getSharedSecret();
- final NsaSimpleApiKey key = apiKeyDb.createApiKey(apiKey,
- sharedSecret);
- if (null != key) {
-
- if (null != nsaApiKey.getEmail()) {
- key.setContactEmail(nsaApiKey.getEmail());
- }
-
- if (null != nsaApiKey.getDescription()) {
- key.setDescription(nsaApiKey.getDescription());
- }
-
- log.debug("=======ApiKeysServiceImpl: createApiKey : saving api key : "
- + key.toString() + "=====");
- apiKeyDb.saveApiKey(key);
-
- // email out the secret to validate the email address
- if ( emailProvided )
- {
- String body = "\n" + "Your email address was provided as the creator of new API key \""
- + apiKey + "\".\n" + "\n" + "If you did not make this request, please let us know."
- + " See http://sa2020.it.att.com:8888 for contact information, " + "but don't worry -"
- + " the API key is useless without the information below, which has been provided "
- + "only to you.\n" + "\n\n" + "For API key \"" + apiKey + "\", use API key secret:\n\n\t"
- + sharedSecret + "\n\n" + "Note that it's normal to share the API key"
- + " (" + apiKey + "). "
- + "This is how you are granted access to resources " + "like a UEB topic or Flatiron scope. "
- + "However, you should NOT share the API key's secret. " + "The API key is associated with your"
- + " email alone. ALL access to data made with this " + "key will be your responsibility. If you "
- + "share the secret, someone else can use the API key " + "to access proprietary data with your "
- + "identity.\n" + "\n" + "Enjoy!\n" + "\n" + "The GFP/SA-2020 Team";
-
- Emailer em = dmaapContext.getConfigReader().getSystemEmailer();
- em.send(contactEmail, "New API Key", body);
- }
- log.debug("TopicService: : sending response.");
-
- JSONObject o = key.asJsonObject();
-
- o.put ( NsaSimpleApiKey.kApiSecretField,
- emailProvided ?
- "Emailed to " + contactEmail + "." :
- key.getSecret ()
- );
- DMaaPResponseBuilder.respondOk(dmaapContext,
- o);
-
- return;
- } else {
- log.debug("=======ApiKeysServiceImpl: createApiKey : Error in creating API Key.=====");
- DMaaPResponseBuilder.respondWithError(dmaapContext,
- HttpStatusCodes.k500_internalServerError,
- "Failed to create api key.");
- throw new KeyExistsException(apiKey);
- }
- }
-
- /**
- * @param dmaapContext
- * @param apikey
- * @param nsaApiKey
- * @throws ConfigDbException
- * @throws IOException
- * @throws AccessDeniedException
- */
- @Override
- public void updateApiKey(DMaaPContext dmaapContext, String apikey,
- ApiKeyBean nsaApiKey) throws ConfigDbException, IOException, AccessDeniedException {
-
- String errorMsg = "Api key name is not mentioned.";
- int errorCode = HttpStatusCodes.k400_badRequest;
-
- if (null != apikey) {
- final NsaApiDb<NsaSimpleApiKey> apiKeyDb = getApiKeyDb(dmaapContext);
- final NsaSimpleApiKey key = apiKeyDb.loadApiKey(apikey);
- boolean shouldUpdate = false;
-
- if (null != key) {
- final NsaApiKey user = DMaaPAuthenticatorImpl
- .getAuthenticatedUser(dmaapContext);
-
- if (user == null || !user.getKey().equals(key.getKey())) {
- throw new AccessDeniedException("You must authenticate with the key you'd like to update.");
- }
-
- if (null != nsaApiKey.getEmail()) {
- key.setContactEmail(nsaApiKey.getEmail());
- shouldUpdate = true;
- }
-
- if (null != nsaApiKey.getDescription()) {
- key.setDescription(nsaApiKey.getDescription());
- shouldUpdate = true;
- }
-
- if (shouldUpdate) {
- apiKeyDb.saveApiKey(key);
- }
-
- log.info("======ApiKeysServiceImpl : updateApiKey : Key Updated Successfully :"
- + key.toString() + "=========");
- DMaaPResponseBuilder.respondOk(dmaapContext,
- key.asJsonObject());
- return;
- }
- } else {
- errorMsg = "Api key [" + apikey + "] does not exist.";
- errorCode = HttpStatusCodes.k404_notFound;
- DMaaPResponseBuilder.respondWithError(dmaapContext, errorCode,
- errorMsg);
- log.info("======ApiKeysServiceImpl : updateApiKey : Error in Updating Key.============");
- throw new IOException();
- }
- }
-
- /**
- * @param dmaapContext
- * @param apikey
- * @throws ConfigDbException
- * @throws IOException
- * @throws AccessDeniedException
- */
- @Override
- public void deleteApiKey(DMaaPContext dmaapContext, String apikey)
- throws ConfigDbException, IOException, AccessDeniedException {
-
- String errorMsg = "Api key name is not mentioned.";
- int errorCode = HttpStatusCodes.k400_badRequest;
-
- if (null != apikey) {
- final NsaApiDb<NsaSimpleApiKey> apiKeyDb = getApiKeyDb(dmaapContext);
- final NsaSimpleApiKey key = apiKeyDb.loadApiKey(apikey);
-
- if (null != key) {
-
- final NsaApiKey user = DMaaPAuthenticatorImpl
- .getAuthenticatedUser(dmaapContext);
- if (user == null || !user.getKey().equals(key.getKey())) {
- throw new AccessDeniedException("You don't own the API key.");
- }
-
- apiKeyDb.deleteApiKey(key);
- log.info("======ApiKeysServiceImpl : deleteApiKey : Deleted Key successfully.============");
- DMaaPResponseBuilder.respondOkWithHtml(dmaapContext,
- "Api key [" + apikey + "] deleted successfully.");
- return;
- }
- } else {
- errorMsg = "Api key [" + apikey + "] does not exist.";
- errorCode = HttpStatusCodes.k404_notFound;
- DMaaPResponseBuilder.respondWithError(dmaapContext, errorCode,
- errorMsg);
- log.info("======ApiKeysServiceImpl : deleteApiKey : Error while deleting key.============");
- throw new IOException();
- }
- }
-
- /**
- *
- * @param dmaapContext
- * @return
- */
- private NsaApiDb<NsaSimpleApiKey> getApiKeyDb(DMaaPContext dmaapContext) {
- ConfigurationReader configReader = dmaapContext.getConfigReader();
- return configReader.getfApiKeyDb();
- }
-
-}
diff --git a/src/main/java/com/att/dmf/mr/service/impl/BaseTransactionDbImpl.java b/src/main/java/com/att/dmf/mr/service/impl/BaseTransactionDbImpl.java
deleted file mode 100644
index 104d7de..0000000
--- a/src/main/java/com/att/dmf/mr/service/impl/BaseTransactionDbImpl.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
-*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.dmf.mr.service.impl;
-
-import java.util.Set;
-import java.util.TreeSet;
-
-import com.att.dmf.mr.transaction.DMaaPTransactionFactory;
-import com.att.dmf.mr.transaction.DMaaPTransactionObj;
-import com.att.dmf.mr.transaction.DMaaPTransactionObjDB;
-import com.att.dmf.mr.transaction.TransactionObj;
-import com.att.nsa.configs.ConfigDb;
-import com.att.nsa.configs.ConfigDbException;
-import com.att.nsa.configs.ConfigPath;
-
-/**
- * Persistent storage for Transaction objects built over an abstract config db.
- *
- * @author anowarul.islam
- *
- * @param <K>
- */
-public class BaseTransactionDbImpl<K extends DMaaPTransactionObj> implements DMaaPTransactionObjDB<K> {
-
- private final ConfigDb fDb;
- private final ConfigPath fBasePath;
- private final DMaaPTransactionFactory<K> fKeyFactory;
-
- private static final String kStdRootPath = "/transaction";
-
- private ConfigPath makePath(String transactionId) {
- return fBasePath.getChild(transactionId);
- }
-
- /**
- * Construct an Transaction db over the given config db at the standard
- * location
- *
- * @param db
- * @param keyFactory
- * @throws ConfigDbException
- */
- public BaseTransactionDbImpl(ConfigDb db, DMaaPTransactionFactory<K> keyFactory) throws ConfigDbException {
- this(db, kStdRootPath, keyFactory);
- }
-
- /**
- * Construct an Transaction db over the given config db using the given root
- * location
- *
- * @param db
- * @param rootPath
- * @param keyFactory
- * @throws ConfigDbException
- */
- public BaseTransactionDbImpl(ConfigDb db, String rootPath, DMaaPTransactionFactory<K> keyFactory)
- throws ConfigDbException {
- fDb = db;
- fBasePath = db.parse(rootPath);
- fKeyFactory = keyFactory;
-
- if (!db.exists(fBasePath)) {
- db.store(fBasePath, "");
- }
- }
-
- /**
- * Create a new Transaction Obj. If one exists,
- *
- * @param id
- * @return the new Transaction record
- * @throws ConfigDbException
- */
- public synchronized K createTransactionObj(String id) throws KeyExistsException, ConfigDbException {
- final ConfigPath path = makePath(id);
- if (fDb.exists(path)) {
- throw new KeyExistsException(id);
- }
-
- // make one, store it, return it
- final K newKey = fKeyFactory.makeNewTransactionId(id);
- fDb.store(path, newKey.serialize());
- return newKey;
- }
-
- /**
- * Save an Transaction record. This must be used after changing auxiliary
- * data on the record. Note that the transaction object must exist (via
- * createTransactionObj).
- *
- * @param transaction
- * object
- * @throws ConfigDbException
- */
- @Override
- public synchronized void saveTransactionObj(K trnObj) throws ConfigDbException {
- final ConfigPath path = makePath(trnObj.getId());
- if (!fDb.exists(path) || !(trnObj instanceof TransactionObj)) {
- throw new IllegalStateException(trnObj.getId() + " is not known to this database");
- }
- fDb.store(path, ((TransactionObj) trnObj).serialize());
- }
-
- /**
- * Load an Transaction record based on the Transaction Id value
- *
- * @param transactionId
- * @return an Transaction Object record or null
- * @throws ConfigDbException
- */
- @Override
- public synchronized K loadTransactionObj(String transactionId) throws ConfigDbException {
- final String data = fDb.load(makePath(transactionId));
- if (data != null) {
- return fKeyFactory.makeNewTransactionObj(data);
- }
- return null;
- }
-
- /**
- * Load all transactions known to this database. (This could be expensive.)
- *
- * @return a set of all Transaction objects
- * @throws ConfigDbException
- */
- public synchronized Set<String> loadAllTransactionObjs() throws ConfigDbException {
- final TreeSet<String> result = new TreeSet<>();
- for (ConfigPath cp : fDb.loadChildrenNames(fBasePath)) {
- result.add(cp.getName());
- }
- return result;
- }
-
-}
diff --git a/src/main/java/com/att/dmf/mr/service/impl/EventsServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/EventsServiceImpl.java
deleted file mode 100644
index 73a373e..0000000
--- a/src/main/java/com/att/dmf/mr/service/impl/EventsServiceImpl.java
+++ /dev/null
@@ -1,867 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
-*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.dmf.mr.service.impl;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Properties;
-
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.core.MediaType;
-
-import org.apache.http.HttpStatus;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.errors.TopicExistsException;
-import org.json.JSONObject;
-import org.json.JSONTokener;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.stereotype.Service;
-
-import com.att.ajsc.filemonitor.AJSCPropertiesMap;
-import com.att.dmf.mr.CambriaApiException;
-import com.att.dmf.mr.backends.Consumer;
-import com.att.dmf.mr.backends.ConsumerFactory;
-import com.att.dmf.mr.backends.ConsumerFactory.UnavailableException;
-import com.att.dmf.mr.backends.MetricsSet;
-import com.att.dmf.mr.backends.Publisher;
-import com.att.dmf.mr.backends.Publisher.message;
-import com.att.dmf.mr.backends.kafka.KafkaLiveLockAvoider2;
-import com.att.dmf.mr.beans.DMaaPCambriaLimiter;
-import com.att.dmf.mr.beans.DMaaPContext;
-import com.att.dmf.mr.beans.LogDetails;
-import com.att.dmf.mr.constants.CambriaConstants;
-import com.att.dmf.mr.exception.DMaaPAccessDeniedException;
-import com.att.dmf.mr.exception.DMaaPErrorMessages;
-import com.att.dmf.mr.exception.DMaaPResponseCode;
-import com.att.dmf.mr.exception.ErrorResponse;
-
-import com.att.dmf.mr.metabroker.Topic;
-import com.att.dmf.mr.resources.CambriaEventSet;
-import com.att.dmf.mr.resources.CambriaOutboundEventStream;
-import com.att.dmf.mr.security.DMaaPAAFAuthenticator;
-import com.att.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
-import com.att.dmf.mr.security.DMaaPAuthenticatorImpl;
-import com.att.dmf.mr.service.EventsService;
-import com.att.dmf.mr.utils.DMaaPResponseBuilder;
-import com.att.dmf.mr.utils.Utils;
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-import com.att.nsa.configs.ConfigDbException;
-import com.att.nsa.drumlin.service.standards.MimeTypes;
-import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
-import com.att.nsa.security.NsaApiKey;
-import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
-import com.att.nsa.util.rrConvertor;
-
-/**
- * This class provides the functinality to publish and subscribe message to
- * kafka
- *
- * @author Ramkumar Sembaiyam
- *
- */
-@Service
-public class EventsServiceImpl implements EventsService {
- // private static final Logger LOG =
-
- private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventsServiceImpl.class);
-
- private static final String BATCH_LENGTH = "event.batch.length";
- private static final String TRANSFER_ENCODING = "Transfer-Encoding";
- @Autowired
- private DMaaPErrorMessages errorMessages;
-
- //@Autowired
-
-
- // @Value("${metrics.send.cambria.topic}")
-
-
- public DMaaPErrorMessages getErrorMessages() {
- return errorMessages;
- }
-
- public void setErrorMessages(DMaaPErrorMessages errorMessages) {
- this.errorMessages = errorMessages;
- }
-
- /**
- * @param ctx
- * @param topic
- * @param consumerGroup
- * @param clientId
- * @throws ConfigDbException,
- * TopicExistsException, AccessDeniedException,
- * UnavailableException, CambriaApiException, IOException
- *
- *
- */
- @Override
- public void getEvents(DMaaPContext ctx, String topic, String consumerGroup, String clientId)
- throws ConfigDbException, TopicExistsException, AccessDeniedException, UnavailableException,
- CambriaApiException, IOException, DMaaPAccessDeniedException {
- final long startTime = System.currentTimeMillis();
- final HttpServletRequest req = ctx.getRequest();
-
- boolean isAAFTopic = false;
- // was this host blacklisted?
- final String remoteAddr = Utils.getRemoteAddress(ctx);
- if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) {
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
- "Source address [" + remoteAddr + "] is blacklisted. Please contact the cluster management team.",
- null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
- ctx.getRequest().getRemoteHost(), null, null);
- LOG.info(errRes.toString());
- throw new CambriaApiException(errRes);
- }
-
- int limit = CambriaConstants.kNoLimit;
- if (req.getParameter("limit") != null) {
- limit = Integer.parseInt(req.getParameter("limit"));
- }
-
- int timeoutMs = CambriaConstants.kNoTimeout;
- String strtimeoutMS = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "timeout");
- if (strtimeoutMS != null)
- timeoutMs = Integer.parseInt(strtimeoutMS);
- // int timeoutMs = ctx.getConfigReader().getSettings().getInt("timeout",
-
- if (req.getParameter("timeout") != null) {
- timeoutMs = Integer.parseInt(req.getParameter("timeout"));
- }
-
- // By default no filter is applied if filter is not passed as a
- // parameter in the request URI
- String topicFilter = CambriaConstants.kNoFilter;
- if (null != req.getParameter("filter")) {
- topicFilter = req.getParameter("filter");
- }
- // pretty to print the messaages in new line
- String prettyval = "0";
- String strPretty = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "pretty");
- if (null != strPretty)
- prettyval = strPretty;
-
- String metaval = "0";
- String strmeta = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "meta");
- if (null != strmeta)
- metaval = strmeta;
-
- final boolean pretty = rrConvertor.convertToBooleanBroad(prettyval);
- // withMeta to print offset along with message
- final boolean withMeta = rrConvertor.convertToBooleanBroad(metaval);
-
- final LogWrap logger = new LogWrap(topic, consumerGroup, clientId);
- logger.info("fetch: timeout=" + timeoutMs + ", limit=" + limit + ", filter=" + topicFilter + " from Remote host "+ctx.getRequest().getRemoteHost());
-
- // is this user allowed to read this topic?
- final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
- final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
-
- if (metatopic == null) {
- // no such topic.
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
- DMaaPResponseCode.RESOURCE_NOT_FOUND.getResponseCode(),
- errorMessages.getTopicNotExist() + "-[" + topic + "]", null, Utils.getFormattedDate(new Date()),
- topic, null, null, consumerGroup + "/" + clientId, ctx.getRequest().getRemoteHost());
- LOG.info(errRes.toString());
- throw new CambriaApiException(errRes);
- }
- String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
- "metrics.send.cambria.topic");
- if (null == metricTopicname)
- metricTopicname = "msgrtr.apinode.metrics.dmaap";
-
- if (null == ctx.getRequest().getHeader("Authorization") && !topic.equalsIgnoreCase(metricTopicname)) {
- if (null != metatopic.getOwner() && !("".equals(metatopic.getOwner()))) {
- // check permissions
- metatopic.checkUserRead(user);
- }
- }
- // if headers are not provided then user will be null
- if (user == null && null != ctx.getRequest().getHeader("Authorization")) {
- // the topic name will be sent by the client
-
- DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
- String permission = aaf.aafPermissionString(topic, "sub");
- if (!aaf.aafAuthentication(ctx.getRequest(), permission)) {
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
- DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
- errorMessages.getNotPermitted1() + " read " + errorMessages.getNotPermitted2() + topic + " on "
- + permission,
- null, Utils.getFormattedDate(new Date()), topic, null, null, consumerGroup + "/" + clientId,
- ctx.getRequest().getRemoteHost());
- LOG.info(errRes.toString());
- throw new DMaaPAccessDeniedException(errRes);
-
- }
- isAAFTopic = true;
- }
- final long elapsedMs1 = System.currentTimeMillis() - startTime;
- logger.info("Time taken in getEvents Authorization " + elapsedMs1 + " ms for " + topic + " " + consumerGroup
- + " " + clientId);
- Consumer c = null;
-
- String lhostId = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
- "clusterhostid");
- if (null == lhostId) {
- try {
- lhostId = InetAddress.getLocalHost().getCanonicalHostName();
- } catch (UnknownHostException e) {
- LOG.info("Unknown Host Exception error occured while getting getting hostid");
- }
-
- }
- CambriaOutboundEventStream coes = null;
- try {
- final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
- final DMaaPCambriaLimiter rl = ctx.getConfigReader().getfRateLimiter();
- rl.onCall(topic, consumerGroup, clientId, ctx.getRequest().getRemoteHost());
- c = ctx.getConfigReader().getfConsumerFactory().getConsumerFor(topic, consumerGroup, clientId, timeoutMs,
- ctx.getRequest().getRemoteHost());
- coes = new CambriaOutboundEventStream.Builder(c).timeout(timeoutMs)
- .limit(limit).filter(topicFilter).pretty(pretty).withMeta(withMeta).build();
- coes.setDmaapContext(ctx);
- coes.setTopic(metatopic);
- if (isTransEnabled() || isAAFTopic) {
- coes.setTransEnabled(true);
- } else {
- coes.setTransEnabled(false);
- }
- coes.setTopicStyle(isAAFTopic);
- final long elapsedMs2 = System.currentTimeMillis() - startTime;
- logger.info("Time taken in getEvents getConsumerFor " + elapsedMs2 + " ms for " + topic + " "
- + consumerGroup + " " + clientId);
-
- DMaaPResponseBuilder.setNoCacheHeadings(ctx);
-
- DMaaPResponseBuilder.respondOkWithStream(ctx, MediaType.APPLICATION_JSON, coes);
- // No IOException thrown during respondOkWithStream, so commit the
- // new offsets to all the brokers
- c.commitOffsets();
- final int sent = coes.getSentCount();
-
- metricsSet.consumeTick(sent);
- rl.onSend(topic, consumerGroup, clientId, sent);
- final long elapsedMs = System.currentTimeMillis() - startTime;
- logger.info("Sent " + sent + " msgs in " + elapsedMs + " ms; committed to offset " + c.getOffset() + " for "
- + topic + " " + consumerGroup + " " + clientId + " on to the server "
- + ctx.getRequest().getRemoteHost());
-
- } catch (UnavailableException excp) {
- logger.warn(excp.getMessage(), excp);
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
- DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
- errorMessages.getServerUnav() + excp.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
- null, null, consumerGroup + "-" + clientId, ctx.getRequest().getRemoteHost());
- LOG.info(errRes.toString());
- throw new CambriaApiException(errRes);
-
- } catch (java.util.ConcurrentModificationException excp1) {
- LOG.info(excp1.getMessage() + "on " + topic + " " + consumerGroup + " ****** " + clientId + " from Remote"+ctx.getRequest().getRemoteHost());
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
- DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(),
- "Couldn't respond to client, possible of consumer requests from more than one server. Please contact MR team if you see this issue occurs continously", null,
- Utils.getFormattedDate(new Date()), topic, null, null, clientId, ctx.getRequest().getRemoteHost());
- logger.info(errRes.toString());
- throw new CambriaApiException(errRes);
-
- } catch (CambriaApiException excp) {
- LOG.info(excp.getMessage() + "on " + topic + " " + consumerGroup + " ****** " + clientId);
-
- throw excp;
- }
- catch (Exception excp) {
- // System.out.println(excp + "------------------ " + topic+"
- // "+consumerGroup+" "+clientId);
-
- logger.info("Couldn't respond to client, closing cambria consumer " + " " + topic + " " + consumerGroup
- + " " + clientId + " " + HttpStatus.SC_SERVICE_UNAVAILABLE + " ****** " + excp);
-
- ctx.getConfigReader().getfConsumerFactory().destroyConsumer(topic, consumerGroup, clientId);
-
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
- DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
- "Couldn't respond to client, closing cambria consumer" + excp.getMessage(), null,
- Utils.getFormattedDate(new Date()), topic, null, null, clientId, ctx.getRequest().getRemoteHost());
- logger.info(errRes.toString());
- throw new CambriaApiException(errRes);
- } finally {
- coes = null;
- // If no cache, close the consumer now that we're done with it.
- boolean kSetting_EnableCache = ConsumerFactory.kDefault_IsCacheEnabled;
- String strkSetting_EnableCache = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
- ConsumerFactory.kSetting_EnableCache);
- if (null != strkSetting_EnableCache)
- kSetting_EnableCache = Boolean.parseBoolean(strkSetting_EnableCache);
- // if
- // (!ctx.getConfigReader().getSettings().getBoolean(ConsumerFactory.kSetting_EnableCache,
- // ConsumerFactory.kDefault_IsCacheEnabled) && (c != null)) {
- if (!kSetting_EnableCache && (c != null)) {
- try {
- c.close();
- } catch (Exception e) {
- logger.info("***Exception occured in getEvents finaly block while closing the consumer " + " "
- + topic + " " + consumerGroup + " " + clientId + " " + HttpStatus.SC_SERVICE_UNAVAILABLE
- + " " + e);
- }
- }
- }
- }
-
- /**
- * @throws missingReqdSetting
- *
- */
- @Override
- public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition,
- final String requestTime) throws ConfigDbException, AccessDeniedException, TopicExistsException,
- CambriaApiException, IOException, missingReqdSetting, DMaaPAccessDeniedException {
-
- // is this user allowed to write to this topic?
- final long startMs = System.currentTimeMillis();
- final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
- final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
- boolean isAAFTopic = false;
-
- // was this host blacklisted?
- final String remoteAddr = Utils.getRemoteAddress(ctx);
-
- if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) {
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
- "Source address [" + remoteAddr + "] is blacklisted. Please contact the cluster management team.",
- null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
- ctx.getRequest().getRemoteHost(), null, null);
- LOG.info(errRes.toString());
- throw new CambriaApiException(errRes);
- }
-
- String topicNameStd = null;
-
- // topicNameStd=
-
- topicNameStd = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
- "enforced.topic.name.AAF");
- String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
- "metrics.send.cambria.topic");
- if (null == metricTopicname)
- metricTopicname = "msgrtr.apinode.metrics.dmaap";
- boolean topicNameEnforced = false;
- if (null != topicNameStd && topic.startsWith(topicNameStd)) {
- topicNameEnforced = true;
- }
-
- // Here check if the user has rights to publish on the topic
- // ( This will be called when no auth is added or when UEB API Key
- // Authentication is used)
- // checkUserWrite(user) method will throw an error when there is no Auth
- // header added or when the
- // user has no publish rights
-
- if (null != metatopic && null != metatopic.getOwner() && !("".equals(metatopic.getOwner()))
- && null == ctx.getRequest().getHeader("Authorization") && !topic.equalsIgnoreCase(metricTopicname)) {
- metatopic.checkUserWrite(user);
- }
-
- // if headers are not provided then user will be null
- if (topicNameEnforced || (user == null && null != ctx.getRequest().getHeader("Authorization")
- && !topic.equalsIgnoreCase(metricTopicname))) {
- // the topic name will be sent by the client
-
- DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
- String permission = aaf.aafPermissionString(topic, "pub");
- if (!aaf.aafAuthentication(ctx.getRequest(), permission)) {
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
- DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
- errorMessages.getNotPermitted1() + " publish " + errorMessages.getNotPermitted2() + topic
- + " on " + permission,
- null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
- ctx.getRequest().getRemoteHost(), null, null);
- LOG.info(errRes.toString());
- throw new DMaaPAccessDeniedException(errRes);
- }
- isAAFTopic = true;
- }
-
- final HttpServletRequest req = ctx.getRequest();
-
- // check for chunked input
- boolean chunked = false;
- if (null != req.getHeader(TRANSFER_ENCODING)) {
- chunked = req.getHeader(TRANSFER_ENCODING).contains("chunked");
- }
- // get the media type, or set it to a generic value if it wasn't
- // provided
- String mediaType = req.getContentType();
- if (mediaType == null || mediaType.length() == 0) {
- mediaType = MimeTypes.kAppGenericBinary;
- }
-
- if (mediaType.contains("charset=UTF-8")) {
- mediaType = mediaType.replace("; charset=UTF-8", "").trim();
- }
-
- String istransidUEBtopicreqd = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
- "transidUEBtopicreqd");
- boolean istransidreqd = false;
- if (null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true")) {
- istransidreqd = true;
- }
-
- if (isAAFTopic || istransidreqd) {
- pushEventsWithTransaction(ctx, msg, topic, defaultPartition, requestTime, chunked, mediaType);
- } else {
- pushEvents(ctx, topic, msg, defaultPartition, chunked, mediaType);
- }
- final long endMs = System.currentTimeMillis();
- final long totalMs = endMs - startMs;
-
- LOG.info("Overall Response time - Published " + " msgs in " + totalMs + " ms for topic " + topic);
-
- }
-
- /**
- *
- * @param ctx
- * @param topic
- * @param msg
- * @param defaultPartition
- * @param chunked
- * @param mediaType
- * @throws ConfigDbException
- * @throws AccessDeniedException
- * @throws TopicExistsException
- * @throws CambriaApiException
- * @throws IOException
- */
- private void pushEvents(DMaaPContext ctx, String topic, InputStream msg, String defaultPartition, boolean chunked,
- String mediaType)
- throws ConfigDbException, AccessDeniedException, TopicExistsException, CambriaApiException, IOException {
- final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
- // setup the event set
- final CambriaEventSet events = new CambriaEventSet(mediaType, msg, chunked, defaultPartition);
-
- // start processing, building a batch to push to the backend
- final long startMs = System.currentTimeMillis();
- long count = 0;
- long maxEventBatch = 1024L* 16;
- String batchlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH);
- if (null != batchlen)
- maxEventBatch = Long.parseLong(batchlen);
- // long maxEventBatch =
-
- final LinkedList<Publisher.message> batch = new LinkedList<>();
- // final ArrayList<KeyedMessage<String, String>> kms = new
-
- final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<>();
- try {
- // for each message...
- Publisher.message m = null;
- while ((m = events.next()) != null) {
- // add the message to the batch
- batch.add(m);
- // final KeyedMessage<String, String> data = new
- // KeyedMessage<String, String>(topic, m.getKey(),
-
- // kms.add(data);
- final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
- m.getMessage());
-
- pms.add(data);
- // check if the batch is full
- final int sizeNow = batch.size();
- if (sizeNow > maxEventBatch) {
- // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
-
- // kms.clear();
- ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
- pms.clear();
- batch.clear();
- metricsSet.publishTick(sizeNow);
- count += sizeNow;
- }
- }
-
- // send the pending batch
- final int sizeNow = batch.size();
- if (sizeNow > 0) {
- // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
-
- // kms.clear();
- ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
- pms.clear();
- batch.clear();
- metricsSet.publishTick(sizeNow);
- count += sizeNow;
- }
-
- final long endMs = System.currentTimeMillis();
- final long totalMs = endMs - startMs;
-
- LOG.info("Published " + count + " msgs in " + totalMs + " ms for topic " + topic + " from server "
- + ctx.getRequest().getRemoteHost());
-
- // build a responseP
- final JSONObject response = new JSONObject();
- response.put("count", count);
- response.put("serverTimeMs", totalMs);
- DMaaPResponseBuilder.respondOk(ctx, response);
-
- } catch (Exception excp) {
- int status = HttpStatus.SC_NOT_FOUND;
- String errorMsg = null;
- if (excp instanceof CambriaApiException) {
- status = ((CambriaApiException) excp).getStatus();
- JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
- JSONObject errObject = new JSONObject(jsonTokener);
- errorMsg = (String) errObject.get("message");
-
- }
- ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
- errorMessages.getPublishMsgError() + ":" + topic + "." + errorMessages.getPublishMsgCount() + count
- + "." + errorMsg,
- null, Utils.getFormattedDate(new Date()), topic, null, ctx.getRequest().getRemoteHost(), null,
- null);
- LOG.info(errRes.toString());
- throw new CambriaApiException(errRes);
-
- }
- }
-
- /**
- *
- * @param ctx
- * @param inputStream
- * @param topic
- * @param partitionKey
- * @param requestTime
- * @param chunked
- * @param mediaType
- * @throws ConfigDbException
- * @throws AccessDeniedException
- * @throws TopicExistsException
- * @throws IOException
- * @throws CambriaApiException
- */
- private void pushEventsWithTransaction(DMaaPContext ctx, InputStream inputStream, final String topic,
- final String partitionKey, final String requestTime, final boolean chunked, final String mediaType)
- throws ConfigDbException, AccessDeniedException, TopicExistsException, IOException, CambriaApiException {
-
- final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
-
- // setup the event set
- final CambriaEventSet events = new CambriaEventSet(mediaType, inputStream, chunked, partitionKey);
-
- // start processing, building a batch to push to the backend
- final long startMs = System.currentTimeMillis();
- long count = 0;
- long maxEventBatch = 1024L * 16;
- String evenlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH);
- if (null != evenlen)
- maxEventBatch = Long.parseLong(evenlen);
- // final long maxEventBatch =
-
- final LinkedList<Publisher.message> batch = new LinkedList<Publisher.message>();
- // final ArrayList<KeyedMessage<String, String>> kms = new
-
- final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<ProducerRecord<String, String>>();
- Publisher.message m = null;
- int messageSequence = 1;
- Long batchId = 1L;
- final boolean transactionEnabled = true;
- int publishBatchCount = 0;
- SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SS");
-
- // LOG.warn("Batch Start Id: " +
-
- try {
- // for each message...
- batchId = DMaaPContext.getBatchID();
-
- String responseTransactionId = null;
-
- while ((m = events.next()) != null) {
-
- // LOG.warn("Batch Start Id: " +
-
-
- addTransactionDetailsToMessage(m, topic, ctx.getRequest(), requestTime, messageSequence, batchId,
- transactionEnabled);
- messageSequence++;
-
-
- batch.add(m);
-
- responseTransactionId = m.getLogDetails().getTransactionId();
-
- JSONObject jsonObject = new JSONObject();
- jsonObject.put("msgWrapMR", m.getMessage());
- jsonObject.put("transactionId", responseTransactionId);
- // final KeyedMessage<String, String> data = new
- // KeyedMessage<String, String>(topic, m.getKey(),
-
- // kms.add(data);
- final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
- m.getMessage());
-
- pms.add(data);
- // check if the batch is full
- final int sizeNow = batch.size();
- if (sizeNow >= maxEventBatch) {
- String startTime = sdf.format(new Date());
- LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id="
- + batchId + "]");
- try {
- // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
- // kms);
- ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
- // transactionLogs(batch);
- for (message msg : batch) {
- LogDetails logDetails = msg.getLogDetails();
- LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
- }
- } catch (Exception excp) {
-
- int status = HttpStatus.SC_NOT_FOUND;
- String errorMsg = null;
- if (excp instanceof CambriaApiException) {
- status = ((CambriaApiException) excp).getStatus();
- JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
- JSONObject errObject = new JSONObject(jsonTokener);
- errorMsg = (String) errObject.get("message");
- }
- ErrorResponse errRes = new ErrorResponse(status,
- DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
- "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
- + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
- null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
- ctx.getRequest().getRemoteHost(), null, null);
- LOG.info(errRes.toString());
- throw new CambriaApiException(errRes);
- }
- pms.clear();
- batch.clear();
- metricsSet.publishTick(sizeNow);
- publishBatchCount = sizeNow;
- count += sizeNow;
- // batchId++;
- String endTime = sdf.format(new Date());
- LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id="
- + batchId + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime
- + ",Batch End Time=" + endTime + "]");
- batchId = DMaaPContext.getBatchID();
- }
- }
-
- // send the pending batch
- final int sizeNow = batch.size();
- if (sizeNow > 0) {
- String startTime = sdf.format(new Date());
- LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id="
- + batchId + "]");
- try {
- // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic,
- // kms);
- ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
- // transactionLogs(batch);
- for (message msg : batch) {
- LogDetails logDetails = msg.getLogDetails();
- LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
- }
- } catch (Exception excp) {
- int status = HttpStatus.SC_NOT_FOUND;
- String errorMsg = null;
- if (excp instanceof CambriaApiException) {
- status = ((CambriaApiException) excp).getStatus();
- JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
- JSONObject errObject = new JSONObject(jsonTokener);
- errorMsg = (String) errObject.get("message");
- }
-
- ErrorResponse errRes = new ErrorResponse(status,
- DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
- "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
- + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
- null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
- ctx.getRequest().getRemoteHost(), null, null);
- LOG.info(errRes.toString());
- throw new CambriaApiException(errRes);
- }
- pms.clear();
- metricsSet.publishTick(sizeNow);
- count += sizeNow;
- // batchId++;
- String endTime = sdf.format(new Date());
- publishBatchCount = sizeNow;
- LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id=" + batchId
- + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime + ",Batch End Time="
- + endTime + "]");
- }
-
- final long endMs = System.currentTimeMillis();
- final long totalMs = endMs - startMs;
-
- LOG.info("Published " + count + " msgs(with transaction id) in " + totalMs + " ms for topic " + topic);
-
- if (null != responseTransactionId) {
- ctx.getResponse().setHeader("transactionId", Utils.getResponseTransactionId(responseTransactionId));
- }
-
- // build a response
- final JSONObject response = new JSONObject();
- response.put("count", count);
- response.put("serverTimeMs", totalMs);
- DMaaPResponseBuilder.respondOk(ctx, response);
-
- } catch (Exception excp) {
- int status = HttpStatus.SC_NOT_FOUND;
- String errorMsg = null;
- if (excp instanceof CambriaApiException) {
- status = ((CambriaApiException) excp).getStatus();
- JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
- JSONObject errObject = new JSONObject(jsonTokener);
- errorMsg = (String) errObject.get("message");
- }
-
- ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
- "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
- + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
- null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
- ctx.getRequest().getRemoteHost(), null, null);
- LOG.info(errRes.toString());
- throw new CambriaApiException(errRes);
- }
- }
-
- /**
- *
- * @param msg
- * @param topic
- * @param request
- * @param messageCreationTime
- * @param messageSequence
- * @param batchId
- * @param transactionEnabled
- */
- private static void addTransactionDetailsToMessage(message msg, final String topic, HttpServletRequest request,
- final String messageCreationTime, final int messageSequence, final Long batchId,
- final boolean transactionEnabled) {
- LogDetails logDetails = generateLogDetails(topic, request, messageCreationTime, messageSequence, batchId,
- transactionEnabled);
- logDetails.setMessageLengthInBytes(Utils.messageLengthInBytes(msg.getMessage()));
- msg.setTransactionEnabled(transactionEnabled);
- msg.setLogDetails(logDetails);
- }
-
- /**
- *
- * @author anowarul.islam
- *
- */
- private static class LogWrap {
- private final String fId;
-
- /**
- * constructor initialization
- *
- * @param topic
- * @param cgroup
- * @param cid
- */
- public LogWrap(String topic, String cgroup, String cid) {
- fId = "[" + topic + "/" + cgroup + "/" + cid + "] ";
- }
-
- /**
- *
- * @param msg
- */
- public void info(String msg) {
- LOG.info(fId + msg);
- }
-
- /**
- *
- * @param msg
- * @param t
- */
- public void warn(String msg, Exception t) {
- LOG.warn(fId + msg, t);
- }
-
- }
-
- public boolean isTransEnabled() {
- String istransidUEBtopicreqd = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
- "transidUEBtopicreqd");
- boolean istransidreqd = false;
- if ((null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true"))) {
- istransidreqd = true;
- }
-
- return istransidreqd;
-
- }
-
- private static LogDetails generateLogDetails(final String topicName, HttpServletRequest request,
- final String messageTimestamp, int messageSequence, Long batchId, final boolean transactionEnabled) {
- LogDetails logDetails = new LogDetails();
- logDetails.setTopicId(topicName);
- logDetails.setMessageTimestamp(messageTimestamp);
- logDetails.setPublisherId(Utils.getUserApiKey(request));
- logDetails.setPublisherIp(request.getRemoteHost());
- logDetails.setMessageBatchId(batchId);
- logDetails.setMessageSequence(String.valueOf(messageSequence));
- logDetails.setTransactionEnabled(transactionEnabled);
- logDetails.setTransactionIdTs(Utils.getFormattedDate(new Date()));
- logDetails.setServerIp(request.getLocalAddr());
- return logDetails;
- }
-
- /*
- * public String getMetricsTopic() { return metricsTopic; }
- *
- * public void setMetricsTopic(String metricsTopic) { this.metricsTopic =
- * metricsTopic; }
- */
-
-
-
-} \ No newline at end of file
diff --git a/src/main/java/com/att/dmf/mr/service/impl/MMServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/MMServiceImpl.java
deleted file mode 100644
index 387d8b1..0000000
--- a/src/main/java/com/att/dmf/mr/service/impl/MMServiceImpl.java
+++ /dev/null
@@ -1,600 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
-*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.dmf.mr.service.impl;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.LinkedList;
-
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import javax.ws.rs.core.Context;
-
-import org.apache.http.HttpStatus;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.json.JSONObject;
-import org.json.JSONTokener;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.stereotype.Service;
-
-import com.att.ajsc.filemonitor.AJSCPropertiesMap;
-import com.att.dmf.mr.CambriaApiException;
-import com.att.dmf.mr.backends.Consumer;
-import com.att.dmf.mr.backends.ConsumerFactory;
-import com.att.dmf.mr.backends.ConsumerFactory.UnavailableException;
-import com.att.dmf.mr.backends.MetricsSet;
-import com.att.dmf.mr.backends.Publisher;
-import com.att.dmf.mr.backends.Publisher.message;
-import com.att.dmf.mr.beans.DMaaPContext;
-import com.att.dmf.mr.beans.LogDetails;
-import com.att.dmf.mr.constants.CambriaConstants;
-import com.att.dmf.mr.exception.DMaaPErrorMessages;
-import com.att.dmf.mr.exception.DMaaPResponseCode;
-import com.att.dmf.mr.exception.ErrorResponse;
-import com.att.dmf.mr.metabroker.Broker.TopicExistsException;
-import com.att.dmf.mr.metabroker.Topic;
-import com.att.dmf.mr.resources.CambriaEventSet;
-import com.att.dmf.mr.resources.CambriaOutboundEventStream;
-import com.att.dmf.mr.service.MMService;
-import com.att.dmf.mr.utils.ConfigurationReader;
-import com.att.dmf.mr.utils.DMaaPResponseBuilder;
-import com.att.dmf.mr.utils.Utils;
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-import com.att.nsa.configs.ConfigDbException;
-import com.att.nsa.drumlin.service.standards.MimeTypes;
-import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
-import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
-import com.att.nsa.util.rrConvertor;
-
-
-
-@Service
-public class MMServiceImpl implements MMService {
- private static final String BATCH_LENGTH = "event.batch.length";
- private static final String TRANSFER_ENCODING = "Transfer-Encoding";
- //private static final Logger LOG = Logger.getLogger(MMServiceImpl.class);
- private static final EELFLogger LOG = EELFManager.getInstance().getLogger(MMServiceImpl.class);
- @Autowired
- private DMaaPErrorMessages errorMessages;
-
- @Autowired
- @Qualifier("configurationReader")
- private ConfigurationReader configReader;
-
- // HttpServletRequest object
- @Context
- private HttpServletRequest request;
-
- // HttpServletResponse object
- @Context
- private HttpServletResponse response;
-
- @Override
- public void addWhiteList() {
-
- }
-
- @Override
- public void removeWhiteList() {
-
- }
-
- @Override
- public void listWhiteList() {
-
- }
-
- @Override
- public String subscribe(DMaaPContext ctx, String topic, String consumerGroup, String clientId)
- throws ConfigDbException, TopicExistsException, AccessDeniedException, UnavailableException,
- CambriaApiException, IOException {
-
-
- final HttpServletRequest req = ctx.getRequest();
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
- // was this host blacklisted?
- final String remoteAddr = Utils.getRemoteAddress(ctx);
-
- if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) {
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
- "Source address [" + remoteAddr + "] is blacklisted. Please contact the cluster management team.",
- null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
- ctx.getRequest().getRemoteHost(), null, null);
- LOG.info(errRes.toString());
- throw new CambriaApiException(errRes);
- }
-
- int limit = CambriaConstants.kNoLimit;
-
- if (req.getParameter("limit") != null) {
- limit = Integer.parseInt(req.getParameter("limit"));
- }
- limit = 1;
-
- int timeoutMs = CambriaConstants.kNoTimeout;
- String strtimeoutMS = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "timeout");
- if (strtimeoutMS != null)
- timeoutMs = Integer.parseInt(strtimeoutMS);
- // int timeoutMs = ctx.getConfigReader().getSettings().getInt("timeout",
-
- if (req.getParameter("timeout") != null) {
- timeoutMs = Integer.parseInt(req.getParameter("timeout"));
- }
-
- // By default no filter is applied if filter is not passed as a
- // parameter in the request URI
- String topicFilter = CambriaConstants.kNoFilter;
- if (null != req.getParameter("filter")) {
- topicFilter = req.getParameter("filter");
- }
- // pretty to print the messaages in new line
- String prettyval = "0";
- String strPretty = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "pretty");
- if (null != strPretty)
- prettyval = strPretty;
-
- String metaval = "0";
- String strmeta = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "meta");
- if (null != strmeta)
- metaval = strmeta;
-
- final boolean pretty = rrConvertor.convertToBooleanBroad(prettyval);
- // withMeta to print offset along with message
- final boolean withMeta = rrConvertor.convertToBooleanBroad(metaval);
-
- // is this user allowed to read this topic?
- //final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
- final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
-
- if (metatopic == null) {
- // no such topic.
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
- DMaaPResponseCode.RESOURCE_NOT_FOUND.getResponseCode(),
- errorMessages.getTopicNotExist() + "-[" + topic + "]", null, Utils.getFormattedDate(new Date()),
- topic, null, null, clientId, ctx.getRequest().getRemoteHost());
- LOG.info(errRes.toString());
- throw new CambriaApiException(errRes);
- }
- //String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "metrics.send.cambria.topic");
- /*
- * if (null==metricTopicname)
- * metricTopicname="msgrtr.apinode.metrics.dmaap"; //else if(user!=null)
- * if(null==ctx.getRequest().getHeader("Authorization")&&
- * !topic.equalsIgnoreCase(metricTopicname)) { if (null !=
- * metatopic.getOwner() && !("".equals(metatopic.getOwner()))){ // check
- * permissions metatopic.checkUserRead(user); } }
- */
-
- Consumer c = null;
- try {
- final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
-
- c = ctx.getConfigReader().getfConsumerFactory().getConsumerFor(topic, consumerGroup, clientId, timeoutMs,ctx.getRequest().getRemoteHost());
-
- final CambriaOutboundEventStream coes = new CambriaOutboundEventStream.Builder(c).timeout(timeoutMs)
- .limit(limit).filter(topicFilter).pretty(pretty).withMeta(withMeta).build();
- coes.setDmaapContext(ctx);
- coes.setTopic(metatopic);
-
- DMaaPResponseBuilder.setNoCacheHeadings(ctx);
-
- try {
- coes.write(baos);
- } catch (Exception ex) {
-
- }
-
- c.commitOffsets();
- final int sent = coes.getSentCount();
-
- metricsSet.consumeTick(sent);
-
- } catch (UnavailableException excp) {
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
- DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
- errorMessages.getServerUnav() + excp.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
- null, null, clientId, ctx.getRequest().getRemoteHost());
- LOG.info(errRes.toString());
- throw new CambriaApiException(errRes);
-
- } catch (CambriaApiException excp) {
-
- throw excp;
- } catch (Exception excp) {
-
- ctx.getConfigReader().getfConsumerFactory().destroyConsumer(topic, consumerGroup, clientId);
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
- DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
- "Couldn't respond to client, closing cambria consumer" + excp.getMessage(), null,
- Utils.getFormattedDate(new Date()), topic, null, null, clientId, ctx.getRequest().getRemoteHost());
- LOG.info(errRes.toString());
- throw new CambriaApiException(errRes);
- } finally {
-
- boolean kSetting_EnableCache = ConsumerFactory.kDefault_IsCacheEnabled;
- String strkSetting_EnableCache = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
- ConsumerFactory.kSetting_EnableCache);
- if (null != strkSetting_EnableCache)
- kSetting_EnableCache = Boolean.parseBoolean(strkSetting_EnableCache);
-
- if (!kSetting_EnableCache && (c != null)) {
- c.close();
-
- }
- }
- return baos.toString();
- }
-
- @Override
- public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition,
- final String requestTime) throws ConfigDbException, AccessDeniedException, TopicExistsException,
- CambriaApiException, IOException, missingReqdSetting {
-
- //final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
- //final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
-
- final String remoteAddr = Utils.getRemoteAddress(ctx);
-
- if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) {
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
- "Source address [" + remoteAddr + "] is blacklisted. Please contact the cluster management team.",
- null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
- ctx.getRequest().getRemoteHost(), null, null);
- LOG.info(errRes.toString());
- throw new CambriaApiException(errRes);
- }
-
- String topicNameStd = null;
-
- topicNameStd = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
- "enforced.topic.name.AAF");
- String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
- "metrics.send.cambria.topic");
- if (null == metricTopicname)
- metricTopicname = "msgrtr.apinode.metrics.dmaap";
- boolean topicNameEnforced = false;
- if (null != topicNameStd && topic.startsWith(topicNameStd)) {
- topicNameEnforced = true;
- }
-
- final HttpServletRequest req = ctx.getRequest();
-
- boolean chunked = false;
- if (null != req.getHeader(TRANSFER_ENCODING)) {
- chunked = req.getHeader(TRANSFER_ENCODING).contains("chunked");
- }
-
- String mediaType = req.getContentType();
- if (mediaType == null || mediaType.length() == 0) {
- mediaType = MimeTypes.kAppGenericBinary;
- }
-
- if (mediaType.contains("charset=UTF-8")) {
- mediaType = mediaType.replace("; charset=UTF-8", "").trim();
- }
-
- if (!topic.equalsIgnoreCase(metricTopicname)) {
- pushEventsWithTransaction(ctx, msg, topic, defaultPartition, requestTime, chunked, mediaType);
- } else {
- pushEvents(ctx, topic, msg, defaultPartition, chunked, mediaType);
- }
- }
-
- private static void addTransactionDetailsToMessage(message msg, final String topic, HttpServletRequest request,
- final String messageCreationTime, final int messageSequence, final Long batchId,
- final boolean transactionEnabled) {
- LogDetails logDetails = generateLogDetails(topic, request, messageCreationTime, messageSequence, batchId,
- transactionEnabled);
- logDetails.setMessageLengthInBytes(Utils.messageLengthInBytes(msg.getMessage()));
- msg.setTransactionEnabled(transactionEnabled);
- msg.setLogDetails(logDetails);
- }
-
- private static LogDetails generateLogDetails(final String topicName, HttpServletRequest request,
- final String messageTimestamp, int messageSequence, Long batchId, final boolean transactionEnabled) {
- LogDetails logDetails = new LogDetails();
- logDetails.setTopicId(topicName);
- logDetails.setMessageTimestamp(messageTimestamp);
- logDetails.setPublisherId(Utils.getUserApiKey(request));
- logDetails.setPublisherIp(request.getRemoteHost());
- logDetails.setMessageBatchId(batchId);
- logDetails.setMessageSequence(String.valueOf(messageSequence));
- logDetails.setTransactionEnabled(transactionEnabled);
- logDetails.setTransactionIdTs(Utils.getFormattedDate(new Date()));
- logDetails.setServerIp(request.getLocalAddr());
- return logDetails;
- }
-
- private void pushEvents(DMaaPContext ctx, String topic, InputStream msg, String defaultPartition, boolean chunked,
- String mediaType) throws ConfigDbException, AccessDeniedException, TopicExistsException,
- CambriaApiException, IOException {
- final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
-
- // setup the event set
- final CambriaEventSet events = new CambriaEventSet(mediaType, msg, chunked, defaultPartition);
-
- // start processing, building a batch to push to the backend
- final long startMs = System.currentTimeMillis();
- long count = 0;
-
- long maxEventBatch = 1024 * 16;
- String batchlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH);
- if (null != batchlen)
- maxEventBatch = Long.parseLong(batchlen);
-
- // long maxEventBatch =
- // ctx.getConfigReader().getSettings().getLong(BATCH_LENGTH, 1024 * 16);
- final LinkedList<Publisher.message> batch = new LinkedList<Publisher.message>();
- final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<ProducerRecord<String, String>>();
- //final ArrayList<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();
-
- try {
- // for each message...
- Publisher.message m = null;
- while ((m = events.next()) != null) {
- // add the message to the batch
- batch.add(m);
- final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
- m.getMessage());
- // check if the batch is full
- final int sizeNow = batch.size();
- if (sizeNow > maxEventBatch) {
- ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
- pms.clear();
- batch.clear();
- metricsSet.publishTick(sizeNow);
- count += sizeNow;
- }
- }
-
- // send the pending batch
- final int sizeNow = batch.size();
- if (sizeNow > 0) {
- ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
- pms.clear();
- batch.clear();
- metricsSet.publishTick(sizeNow);
- count += sizeNow;
- }
-
- final long endMs = System.currentTimeMillis();
- final long totalMs = endMs - startMs;
-
- LOG.info("Published " + count + " msgs in " + totalMs + "ms for topic " + topic);
-
- // build a responseP
- final JSONObject response = new JSONObject();
- response.put("count", count);
- response.put("serverTimeMs", totalMs);
- // DMaaPResponseBuilder.respondOk(ctx, response);
-
- } catch (Exception excp) {
-
- int status = HttpStatus.SC_NOT_FOUND;
- String errorMsg = null;
- if (excp.getClass().toString().contains("CambriaApiException")) {
- status = ((CambriaApiException) excp).getStatus();
- JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
- JSONObject errObject = new JSONObject(jsonTokener);
- errorMsg = (String) errObject.get("message");
-
- }
- ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
- errorMessages.getPublishMsgError() + ":" + topic + "." + errorMessages.getPublishMsgCount() + count
- + "." + errorMsg,
- null, Utils.getFormattedDate(new Date()), topic, null, ctx.getRequest().getRemoteHost(), null,
- null);
- LOG.info(errRes.toString());
- throw new CambriaApiException(errRes);
-
- }
- }
-
- private void pushEventsWithTransaction(DMaaPContext ctx, InputStream inputStream, final String topic,
- final String partitionKey, final String requestTime, final boolean chunked, final String mediaType)
- throws ConfigDbException, AccessDeniedException, TopicExistsException, IOException,
- CambriaApiException {
-
- final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
-
- // setup the event set
- final CambriaEventSet events = new CambriaEventSet(mediaType, inputStream, chunked, partitionKey);
-
- // start processing, building a batch to push to the backend
- final long startMs = System.currentTimeMillis();
- long count = 0;
- long maxEventBatch = 1024 * 16;
- String evenlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH);
- if (null != evenlen)
- maxEventBatch = Long.parseLong(evenlen);
-
- final LinkedList<Publisher.message> batch = new LinkedList<Publisher.message>();
- final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<ProducerRecord<String, String>>();
-
- Publisher.message m = null;
- int messageSequence = 1;
- Long batchId = 1L;
- final boolean transactionEnabled = true;
- int publishBatchCount = 0;
- SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SS");
-
- // LOG.warn("Batch Start Id: " +
- // Utils.getFromattedBatchSequenceId(batchId));
- try {
- // for each message...
- batchId = DMaaPContext.getBatchID();
-
- String responseTransactionId = null;
-
- while ((m = events.next()) != null) {
-
- // LOG.warn("Batch Start Id: " +
- // Utils.getFromattedBatchSequenceId(batchId));
-
- addTransactionDetailsToMessage(m, topic, ctx.getRequest(), requestTime, messageSequence, batchId,
- transactionEnabled);
- messageSequence++;
-
- // add the message to the batch
- batch.add(m);
-
- responseTransactionId = m.getLogDetails().getTransactionId();
-
- JSONObject jsonObject = new JSONObject();
- jsonObject.put("message", m.getMessage());
- jsonObject.put("transactionId", responseTransactionId);
- final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(),
- m.getMessage());
- pms.add(data);
-
- // check if the batch is full
- final int sizeNow = batch.size();
- if (sizeNow >= maxEventBatch) {
- String startTime = sdf.format(new Date());
- LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id="
- + batchId + "]");
- try {
- ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
- // transactionLogs(batch);
- for (message msg : batch) {
- LogDetails logDetails = msg.getLogDetails();
- LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
- }
- } catch (Exception excp) {
-
- int status = HttpStatus.SC_NOT_FOUND;
- String errorMsg = null;
- if (excp.getClass().toString().contains("CambriaApiException")) {
- status = ((CambriaApiException) excp).getStatus();
- JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
- JSONObject errObject = new JSONObject(jsonTokener);
- errorMsg = (String) errObject.get("message");
- }
- ErrorResponse errRes = new ErrorResponse(status,
- DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
- "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
- + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
- null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
- ctx.getRequest().getRemoteHost(), null, null);
- LOG.info(errRes.toString());
- throw new CambriaApiException(errRes);
- }
- pms.clear();
- batch.clear();
- metricsSet.publishTick(sizeNow);
- publishBatchCount = sizeNow;
- count += sizeNow;
- // batchId++;
- String endTime = sdf.format(new Date());
- LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id="
- + batchId + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime
- + ",Batch End Time=" + endTime + "]");
- batchId = DMaaPContext.getBatchID();
- }
- }
-
- // send the pending batch
- final int sizeNow = batch.size();
- if (sizeNow > 0) {
- String startTime = sdf.format(new Date());
- LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id="
- + batchId + "]");
- try {
- ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms);
- // transactionLogs(batch);
- for (message msg : batch) {
- LogDetails logDetails = msg.getLogDetails();
- LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
- }
- } catch (Exception excp) {
- int status = HttpStatus.SC_NOT_FOUND;
- String errorMsg = null;
- if (excp.getClass().toString().contains("CambriaApiException")) {
- status = ((CambriaApiException) excp).getStatus();
- JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
- JSONObject errObject = new JSONObject(jsonTokener);
- errorMsg = (String) errObject.get("message");
- }
-
- ErrorResponse errRes = new ErrorResponse(status,
- DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
- "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
- + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
- null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
- ctx.getRequest().getRemoteHost(), null, null);
- LOG.info(errRes.toString());
- throw new CambriaApiException(errRes);
- }
- pms.clear();
- metricsSet.publishTick(sizeNow);
- count += sizeNow;
- // batchId++;
- String endTime = sdf.format(new Date());
- publishBatchCount = sizeNow;
- LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id=" + batchId
- + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime + ",Batch End Time="
- + endTime + "]");
- }
-
- final long endMs = System.currentTimeMillis();
- final long totalMs = endMs - startMs;
-
- LOG.info("Published " + count + " msgs in " + totalMs + "ms for topic " + topic);
-
- // build a response
- final JSONObject response = new JSONObject();
- response.put("count", count);
- response.put("serverTimeMs", totalMs);
-
- } catch (Exception excp) {
- int status = HttpStatus.SC_NOT_FOUND;
- String errorMsg = null;
- if (excp.getClass().toString().contains("CambriaApiException")) {
- status = ((CambriaApiException) excp).getStatus();
- JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
- JSONObject errObject = new JSONObject(jsonTokener);
- errorMsg = (String) errObject.get("message");
- }
-
- ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
- "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
- + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
- null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
- ctx.getRequest().getRemoteHost(), null, null);
- LOG.info(errRes.toString());
- throw new CambriaApiException(errRes);
- }
- }
-}
diff --git a/src/main/java/com/att/dmf/mr/service/impl/MetricsServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/MetricsServiceImpl.java
deleted file mode 100644
index d867ea8..0000000
--- a/src/main/java/com/att/dmf/mr/service/impl/MetricsServiceImpl.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
-*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.dmf.mr.service.impl;
-
-import java.io.IOException;
-
-import org.json.JSONObject;
-import org.springframework.stereotype.Component;
-
-import com.att.dmf.mr.CambriaApiException;
-import com.att.dmf.mr.backends.MetricsSet;
-import com.att.dmf.mr.beans.DMaaPContext;
-import com.att.dmf.mr.service.MetricsService;
-import com.att.dmf.mr.utils.DMaaPResponseBuilder;
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-import com.att.nsa.metrics.CdmMeasuredItem;
-
-/**
- *
- *
- * This will provide all the generated metrics details also it can provide the
- * get metrics details
- *
- *
- * @author nilanjana.maity
- *
- *
- */
-@Component
-public class MetricsServiceImpl implements MetricsService {
-
-
- private static final EELFLogger LOG = EELFManager.getInstance().getLogger(MetricsService.class);
- /**
- *
- *
- * @param ctx
- * @throws IOException
- *
- *
- * get Metric details
- *
- */
- @Override
-
- public void get(DMaaPContext ctx) throws IOException {
- LOG.info("Inside : MetricsServiceImpl : get()");
- final MetricsSet metrics = ctx.getConfigReader().getfMetrics();
- DMaaPResponseBuilder.setNoCacheHeadings(ctx);
- final JSONObject result = metrics.toJson();
- DMaaPResponseBuilder.respondOk(ctx, result);
- LOG.info("============ Metrics generated : " + result.toString() + "=================");
-
- }
-
-
- @Override
- /**
- *
- * get Metric by name
- *
- *
- * @param ctx
- * @param name
- * @throws IOException
- * @throws CambriaApiException
- *
- *
- */
- public void getMetricByName(DMaaPContext ctx, String name) throws IOException, CambriaApiException {
- LOG.info("Inside : MetricsServiceImpl : getMetricByName()");
- final MetricsSet metrics = ctx.getConfigReader().getfMetrics();
-
- final CdmMeasuredItem item = metrics.getItem(name);
- /**
- * check if item is null
- */
- if (item == null) {
- throw new CambriaApiException(404, "No metric named [" + name + "].");
- }
-
- final JSONObject entry = new JSONObject();
- entry.put("summary", item.summarize());
- entry.put("raw", item.getRawValueString());
-
- DMaaPResponseBuilder.setNoCacheHeadings(ctx);
-
- final JSONObject result = new JSONObject();
- result.put(name, entry);
-
- DMaaPResponseBuilder.respondOk(ctx, result);
- LOG.info("============ Metrics generated : " + entry.toString() + "=================");
- }
-
-}
diff --git a/src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java
deleted file mode 100644
index 983af7e..0000000
--- a/src/main/java/com/att/dmf/mr/service/impl/TopicServiceImpl.java
+++ /dev/null
@@ -1,694 +0,0 @@
-/**
- *
- */
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
-*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.dmf.mr.service.impl;
-
-import java.io.IOException;
-
-import org.apache.http.HttpStatus;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-import com.att.ajsc.filemonitor.AJSCPropertiesMap;
-import com.att.dmf.mr.CambriaApiException;
-import com.att.dmf.mr.beans.DMaaPContext;
-import com.att.dmf.mr.beans.DMaaPKafkaMetaBroker;
-import com.att.dmf.mr.beans.TopicBean;
-import com.att.dmf.mr.constants.CambriaConstants;
-import com.att.dmf.mr.exception.DMaaPAccessDeniedException;
-import com.att.dmf.mr.exception.DMaaPErrorMessages;
-import com.att.dmf.mr.exception.DMaaPResponseCode;
-import com.att.dmf.mr.exception.ErrorResponse;
-import com.att.dmf.mr.metabroker.Broker.TopicExistsException;
-import com.att.dmf.mr.metabroker.Broker1;
-
-import com.att.dmf.mr.metabroker.Topic;
-import com.att.dmf.mr.security.DMaaPAAFAuthenticator;
-import com.att.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
-import com.att.dmf.mr.security.DMaaPAuthenticatorImpl;
-import com.att.dmf.mr.service.TopicService;
-import com.att.dmf.mr.utils.DMaaPResponseBuilder;
-import com.att.dmf.mr.utils.Utils;
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-import com.att.nsa.configs.ConfigDbException;
-import com.att.nsa.security.NsaAcl;
-import com.att.nsa.security.NsaApiKey;
-import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
-
-/**
- * @author muzainulhaque.qazi
- *
- */
-@Service
-public class TopicServiceImpl implements TopicService {
-
- // private static final Logger LOGGER =
-
- private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicServiceImpl.class);
- @Autowired
- private DMaaPErrorMessages errorMessages;
-
- // @Value("${msgRtr.topicfactory.aaf}")
-
-
- public DMaaPErrorMessages getErrorMessages() {
- return errorMessages;
- }
-
- public void setErrorMessages(DMaaPErrorMessages errorMessages) {
- this.errorMessages = errorMessages;
- }
-
- /**
- * @param dmaapContext
- * @throws JSONException
- * @throws ConfigDbException
- * @throws IOException
- *
- */
- @Override
- public void getTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
- LOGGER.info("Fetching list of all the topics.");
- JSONObject json = new JSONObject();
-
- JSONArray topicsList = new JSONArray();
-
- for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
- topicsList.put(topic.getName());
- }
-
- json.put("topics", topicsList);
-
- LOGGER.info("Returning list of all the topics.");
- DMaaPResponseBuilder.respondOk(dmaapContext, json);
-
- }
-
- /**
- * @param dmaapContext
- * @throws JSONException
- * @throws ConfigDbException
- * @throws IOException
- *
- */
- public void getAllTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
-
- LOGGER.info("Fetching list of all the topics.");
- JSONObject json = new JSONObject();
-
- JSONArray topicsList = new JSONArray();
-
- for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
- JSONObject obj = new JSONObject();
- obj.put("topicName", topic.getName());
-
- obj.put("owner", topic.getOwner());
- obj.put("txenabled", topic.isTransactionEnabled());
- topicsList.put(obj);
- }
-
- json.put("topics", topicsList);
-
- LOGGER.info("Returning list of all the topics.");
- DMaaPResponseBuilder.respondOk(dmaapContext, json);
-
- }
-
- /**
- * @param dmaapContext
- * @param topicName
- * @throws ConfigDbException
- * @throws IOException
- * @throws TopicExistsException
- */
- @Override
- public void getTopic(DMaaPContext dmaapContext, String topicName)
- throws ConfigDbException, IOException, TopicExistsException {
-
- LOGGER.info("Fetching details of topic " + topicName);
- Topic t = getMetaBroker(dmaapContext).getTopic(topicName);
-
- if (null == t) {
- LOGGER.error("Topic [" + topicName + "] does not exist.");
- throw new TopicExistsException("Topic [" + topicName + "] does not exist.");
- }
-
- JSONObject o = new JSONObject();
- o.put("name", t.getName());
- o.put("description", t.getDescription());
-
- if (null != t.getOwners())
- o.put("owner", t.getOwners().iterator().next());
- if (null != t.getReaderAcl())
- o.put("readerAcl", aclToJson(t.getReaderAcl()));
- if (null != t.getWriterAcl())
- o.put("writerAcl", aclToJson(t.getWriterAcl()));
-
- LOGGER.info("Returning details of topic " + topicName);
- DMaaPResponseBuilder.respondOk(dmaapContext, o);
-
- }
-
- /**
- * @param dmaapContext
- * @param topicBean
- * @throws CambriaApiException
- * @throws AccessDeniedException
- * @throws IOException
- * @throws TopicExistsException
- * @throws JSONException
- *
- *
- *
- */
- @Override
- public void createTopic(DMaaPContext dmaapContext, TopicBean topicBean)
- throws CambriaApiException, DMaaPAccessDeniedException, IOException, TopicExistsException {
- LOGGER.info("Creating topic " + topicBean.getTopicName());
-
- final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
- String key = null;
- String appName = dmaapContext.getRequest().getHeader("AppName");
- String enfTopicName = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
- "enforced.topic.name.AAF");
-
- if (user != null) {
- key = user.getKey();
-
- if (enfTopicName != null && topicBean.getTopicName().indexOf(enfTopicName) >= 0) {
-
- LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed.");
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
- DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
- "Failed to create topic: Access Denied.User does not have permission to perform create topic");
-
- LOGGER.info(errRes.toString());
- // throw new DMaaPAccessDeniedException(errRes);
-
- }
- }
- // else if (user==null &&
- // (null==dmaapContext.getRequest().getHeader("Authorization") && null
- // == dmaapContext.getRequest().getHeader("cookie")) ) {
- else if (Utils.isCadiEnabled()&&user == null && null == dmaapContext.getRequest().getHeader("Authorization")
- && (null == appName && null == dmaapContext.getRequest().getHeader("cookie"))) {
- LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed.");
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
- DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
- "Failed to create topic: Access Denied.User does not have permission to perform create topic");
-
- LOGGER.info(errRes.toString());
- // throw new DMaaPAccessDeniedException(errRes);
- }
-
- if (user == null && (null != dmaapContext.getRequest().getHeader("Authorization")
- )) {
- // if (user == null &&
- // (null!=dmaapContext.getRequest().getHeader("Authorization") ||
- // null != dmaapContext.getRequest().getHeader("cookie"))) {
- // ACL authentication is not provided so we will use the aaf
- // authentication
- LOGGER.info("Authorization the topic");
-
- String permission = "";
- String nameSpace = "";
- if (topicBean.getTopicName().indexOf(".") > 1)
- nameSpace = topicBean.getTopicName().substring(0, topicBean.getTopicName().lastIndexOf("."));
-
- String mrFactoryVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
- "msgRtr.topicfactory.aaf");
-
- // AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSettings_KafkaZookeeper);
-
- permission = mrFactoryVal + nameSpace + "|create";
- DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
-
- if (!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) {
-
- LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed.");
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
- DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
- "Failed to create topic: Access Denied.User does not have permission to create topic with perm "
- + permission);
-
- LOGGER.info(errRes.toString());
- throw new DMaaPAccessDeniedException(errRes);
-
- } else {
- // if user is null and aaf authentication is ok then key should
- // be ""
- // key = "";
- /**
- * Added as part of AAF user it should return username
- */
-
- key = dmaapContext.getRequest().getUserPrincipal().getName().toString();
- LOGGER.info("key ==================== " + key);
-
- }
- }
-
- try {
- final String topicName = topicBean.getTopicName();
- final String desc = topicBean.getTopicDescription();
- int partition = topicBean.getPartitionCount();
- // int replica = topicBean.getReplicationCount();
- if (partition == 0) {
- partition = 1;
- }
- final int partitions = partition;
-
- int replica = topicBean.getReplicationCount();
- if (replica == 0) {
- replica = 1;
- }
- final int replicas = replica;
- boolean transactionEnabled = topicBean.isTransactionEnabled();
-
- final Broker1 metabroker = getMetaBroker(dmaapContext);
- final Topic t = metabroker.createTopic(topicName, desc, key, partitions, replicas, transactionEnabled);
-
- LOGGER.info("Topic created successfully. Sending response");
- DMaaPResponseBuilder.respondOk(dmaapContext, topicToJson(t));
- } catch (JSONException excp) {
-
- LOGGER.error("Failed to create topic. Couldn't parse JSON data.", excp);
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
- DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson());
- LOGGER.info(errRes.toString());
- throw new CambriaApiException(errRes);
-
- } catch (ConfigDbException excp1) {
-
- LOGGER.error("Failed to create topic. Config DB Exception", excp1);
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
- DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson());
- LOGGER.info(errRes.toString());
- throw new CambriaApiException(errRes);
- } catch (com.att.dmf.mr.metabroker.Broker1.TopicExistsException e) {
- // TODO Auto-generated catch block
- LOGGER.error( e.getMessage());
- }
- }
-
- /**
- * @param dmaapContext
- * @param topicName
- * @throws ConfigDbException
- * @throws IOException
- * @throws TopicExistsException
- * @throws CambriaApiException
- * @throws AccessDeniedException
- */
- @Override
- public void deleteTopic(DMaaPContext dmaapContext, String topicName) throws IOException, ConfigDbException,
- CambriaApiException, TopicExistsException, DMaaPAccessDeniedException, AccessDeniedException {
-
-
- LOGGER.info(" Deleting topic " + topicName);
- /*if (true) { // {
- LOGGER.error("Failed to delete topi" + topicName + ". Authentication failed.");
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), errorMessages.getCreateTopicFail() + " "
- + errorMessages.getNotPermitted1() + " delete " + errorMessages.getNotPermitted2());
- LOGGER.info(errRes.toString());
- throw new DMaaPAccessDeniedException(errRes);
- }*/
-
- final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
-
- if (user == null && null != dmaapContext.getRequest().getHeader("Authorization")) {
- LOGGER.info("Authenticating the user, as ACL authentication is not provided");
- // String permission =
-
- String permission = "";
- String nameSpace = topicName.substring(0, topicName.lastIndexOf("."));
- String mrFactoryVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
- "msgRtr.topicfactory.aaf");
-
- permission = mrFactoryVal + nameSpace + "|destroy";
- DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
- if (!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) {
- LOGGER.error("Failed to delete topi" + topicName + ". Authentication failed.");
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
- errorMessages.getCreateTopicFail() + " " + errorMessages.getNotPermitted1() + " delete "
- + errorMessages.getNotPermitted2());
- LOGGER.info(errRes.toString());
- throw new DMaaPAccessDeniedException(errRes);
- }
-
- }
-
- final Broker1 metabroker = getMetaBroker(dmaapContext);
- final Topic topic = metabroker.getTopic(topicName);
-
- if (topic == null) {
- LOGGER.error("Failed to delete topic. Topic [" + topicName + "] does not exist.");
- throw new TopicExistsException("Failed to delete topic. Topic [" + topicName + "] does not exist.");
- }
-
- // metabroker.deleteTopic(topicName);
-
- LOGGER.info("Topic [" + topicName + "] deleted successfully. Sending response.");
- DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Topic [" + topicName + "] deleted successfully");
- }
-
- /**
- *
- * @param dmaapContext
- * @return
- */
- private DMaaPKafkaMetaBroker getMetaBroker(DMaaPContext dmaapContext) {
- return (DMaaPKafkaMetaBroker) dmaapContext.getConfigReader().getfMetaBroker();
- }
-
- /**
- * @param dmaapContext
- * @param topicName
- * @throws ConfigDbException
- * @throws IOException
- * @throws TopicExistsException
- *
- */
- @Override
- public void getPublishersByTopicName(DMaaPContext dmaapContext, String topicName)
- throws ConfigDbException, IOException, TopicExistsException {
- LOGGER.info("Retrieving list of all the publishers for topic " + topicName);
- Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
-
- if (topic == null) {
- LOGGER.error("Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
- throw new TopicExistsException(
- "Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
- }
-
- final NsaAcl acl = topic.getWriterAcl();
-
- LOGGER.info("Returning list of all the publishers for topic " + topicName + ". Sending response.");
- DMaaPResponseBuilder.respondOk(dmaapContext, aclToJson(acl));
-
- }
-
- /**
- *
- * @param acl
- * @return
- */
- private static JSONObject aclToJson(NsaAcl acl) {
- final JSONObject o = new JSONObject();
- if (acl == null) {
- o.put("enabled", false);
- o.put("users", new JSONArray());
- } else {
- o.put("enabled", acl.isActive());
-
- final JSONArray a = new JSONArray();
- for (String user : acl.getUsers()) {
- a.put(user);
- }
- o.put("users", a);
- }
- return o;
- }
-
- /**
- * @param dmaapContext
- * @param topicName
- */
- @Override
- public void getConsumersByTopicName(DMaaPContext dmaapContext, String topicName)
- throws IOException, ConfigDbException, TopicExistsException {
- LOGGER.info("Retrieving list of all the consumers for topic " + topicName);
- Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
-
- if (topic == null) {
- LOGGER.error("Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
- throw new TopicExistsException(
- "Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
- }
-
- final NsaAcl acl = topic.getReaderAcl();
-
- LOGGER.info("Returning list of all the consumers for topic " + topicName + ". Sending response.");
- DMaaPResponseBuilder.respondOk(dmaapContext, aclToJson(acl));
-
- }
-
- /**
- *
- * @param t
- * @return
- */
- private static JSONObject topicToJson(Topic t) {
- final JSONObject o = new JSONObject();
-
- o.put("name", t.getName());
- o.put("description", t.getDescription());
- o.put("owner", t.getOwner());
- o.put("readerAcl", aclToJson(t.getReaderAcl()));
- o.put("writerAcl", aclToJson(t.getWriterAcl()));
-
- return o;
- }
-
- /**
- * @param dmaapContext
- * @param topicName @param producerId @throws
- * ConfigDbException @throws IOException @throws
- * TopicExistsException @throws AccessDeniedException @throws
- *
- */
- @Override
- public void permitPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
- throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, CambriaApiException {
-
- LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName);
- final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
-
-
- //
- // LOGGER.info("Authenticating the user, as ACL authentication is not
-
- //// String permission =
-
- //
-
-
-
- // {
- // LOGGER.error("Failed to permit write access to producer [" +
- // producerId + "] for topic " + topicName
-
- // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
- // errorMessages.getNotPermitted1()+" <Grant publish permissions>
-
-
-
- // }
- // }
-
- Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
-
- if (null == topic) {
- LOGGER.error("Failed to permit write access to producer [" + producerId + "] for topic. Topic [" + topicName
- + "] does not exist.");
- throw new TopicExistsException("Failed to permit write access to producer [" + producerId
- + "] for topic. Topic [" + topicName + "] does not exist.");
- }
-
- topic.permitWritesFromUser(producerId, user);
-
- LOGGER.info("Write access has been granted to producer [" + producerId + "] for topic [" + topicName
- + "]. Sending response.");
- DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Write access has been granted to publisher.");
-
- }
-
- /**
- * @param dmaapContext
- * @param topicName
- * @param producerId
- * @throws ConfigDbException
- * @throws IOException
- * @throws TopicExistsException
- * @throws AccessDeniedException
- * @throws DMaaPAccessDeniedException
- *
- */
- @Override
- public void denyPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
- throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
- DMaaPAccessDeniedException {
-
- LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName);
- final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
-
- //
- //// String permission =
-
- // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
- // String permission = aaf.aafPermissionString(topicName, "manage");
- // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
- // {
- // LOGGER.error("Failed to revoke write access to producer [" +
- // producerId + "] for topic " + topicName
-
- // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
- // errorMessages.getNotPermitted1()+" <Revoke publish permissions>
-
-
- // throw new DMaaPAccessDeniedException(errRes);
- //
-
- // }
-
- Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
-
- if (null == topic) {
- LOGGER.error("Failed to revoke write access to producer [" + producerId + "] for topic. Topic [" + topicName
- + "] does not exist.");
- throw new TopicExistsException("Failed to revoke write access to producer [" + producerId
- + "] for topic. Topic [" + topicName + "] does not exist.");
- }
-
- topic.denyWritesFromUser(producerId, user);
-
- LOGGER.info("Write access has been revoked to producer [" + producerId + "] for topic [" + topicName
- + "]. Sending response.");
- DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Write access has been revoked for publisher.");
-
- }
-
- /**
- * @param dmaapContext
- * @param topicName
- * @param consumerId
- * @throws DMaaPAccessDeniedException
- */
- @Override
- public void permitConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
- throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
- DMaaPAccessDeniedException {
-
- LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName);
- final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
-
- //
- //// String permission =
-
-
- // String permission = aaf.aafPermissionString(topicName, "manage");
- // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
- // {
- // LOGGER.error("Failed to permit read access to consumer [" +
- // consumerId + "] for topic " + topicName
-
- // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
- // errorMessages.getNotPermitted1()+" <Grant consume permissions>
-
-
-
- // }
- // }
-
- Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
-
- if (null == topic) {
- LOGGER.error("Failed to permit read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
- + "] does not exist.");
- throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
- + "] for topic. Topic [" + topicName + "] does not exist.");
- }
-
- topic.permitReadsByUser(consumerId, user);
-
- LOGGER.info("Read access has been granted to consumer [" + consumerId + "] for topic [" + topicName
- + "]. Sending response.");
- DMaaPResponseBuilder.respondOkWithHtml(dmaapContext,
- "Read access has been granted for consumer [" + consumerId + "] for topic [" + topicName + "].");
- }
-
- /**
- * @param dmaapContext
- * @param topicName
- * @param consumerId
- * @throws DMaaPAccessDeniedException
- */
- @Override
- public void denyConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
- throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
- DMaaPAccessDeniedException {
-
- LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName);
- final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
-
- //// String permission =
-
-
- // String permission = aaf.aafPermissionString(topicName, "manage");
- // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
- // {
- // LOGGER.error("Failed to revoke read access to consumer [" +
- // consumerId + "] for topic " + topicName
-
- // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
- // errorMessages.getNotPermitted1()+" <Grant consume permissions>
-
-
- // throw new DMaaPAccessDeniedException(errRes);
- // }
- //
- //
-
- Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
-
- if (null == topic) {
- LOGGER.error("Failed to revoke read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
- + "] does not exist.");
- throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
- + "] for topic. Topic [" + topicName + "] does not exist.");
- }
-
- topic.denyReadsByUser(consumerId, user);
-
- LOGGER.info("Read access has been revoked to consumer [" + consumerId + "] for topic [" + topicName
- + "]. Sending response.");
- DMaaPResponseBuilder.respondOkWithHtml(dmaapContext,
- "Read access has been revoked for consumer [" + consumerId + "] for topic [" + topicName + "].");
-
- }
-
-}
diff --git a/src/main/java/com/att/dmf/mr/service/impl/TransactionServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/TransactionServiceImpl.java
deleted file mode 100644
index 3065928..0000000
--- a/src/main/java/com/att/dmf/mr/service/impl/TransactionServiceImpl.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
-*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.dmf.mr.service.impl;
-
-import java.io.IOException;
-
-import org.springframework.stereotype.Service;
-
-import com.att.aft.dme2.internal.jettison.json.JSONException;
-import com.att.dmf.mr.beans.DMaaPContext;
-import com.att.dmf.mr.service.TransactionService;
-import com.att.dmf.mr.transaction.TransactionObj;
-import com.att.nsa.configs.ConfigDbException;
-
-/**
- * Once the transaction rest gateway will be using that time it will provide all
- * the transaction details like fetching all the transactional objects or get
- * any particular transaction object details
- *
- * @author nilanjana.maity
- *
- */
-@Service
-public class TransactionServiceImpl implements TransactionService {
-
- @Override
- public void checkTransaction(TransactionObj trnObj) {
- /* Need to implement the method */
- }
-
- @Override
- public void getAllTransactionObjs(DMaaPContext dmaapContext)
- throws ConfigDbException, IOException {
-
- /*
-
- *
- * LOG.info("configReader : "+configReader.toString());
- *
- * final JSONObject result = new JSONObject (); final JSONArray
- * transactionIds = new JSONArray (); result.put ( "transactionIds",
- * transactionIds );
- *
- * DMaaPTransactionObjDB<DMaaPTransactionObj> transDb =
- * configReader.getfTranDb();
- *
- * for (String transactionId : transDb.loadAllTransactionObjs()) {
- * transactionIds.put (transactionId); } LOG.info(
- * "========== TransactionServiceImpl: getAllTransactionObjs: Transaction objects are : "
- * + transactionIds.toString()+"===========");
- * DMaaPResponseBuilder.respondOk(dmaapContext, result);
- */
- }
-
- @Override
- public void getTransactionObj(DMaaPContext dmaapContext,
- String transactionId) throws ConfigDbException, JSONException,
- IOException {
-
- /*
-
- *
- * ConfigurationReader configReader = dmaapContext.getConfigReader();
- *
- * DMaaPTransactionObj trnObj;
- *
- * trnObj = configReader.getfTranDb().loadTransactionObj(transactionId);
- *
- *
- * if (null != trnObj) { trnObj.serialize(); JSONObject result =
- * trnObj.asJsonObject(); DMaaPResponseBuilder.respondOk(dmaapContext,
- * result);
- * LOG.info("========== TransactionServiceImpl: getTransactionObj : "+
- * result.toString()+"==========="); return; }
- *
- * } LOG.info(
- * "========== TransactionServiceImpl: getTransactionObj: Error : Transaction object does not exist. "
- * +"===========");
- */
- }
-}
diff --git a/src/main/java/com/att/dmf/mr/service/impl/UIServiceImpl.java b/src/main/java/com/att/dmf/mr/service/impl/UIServiceImpl.java
deleted file mode 100644
index 73ad83b..0000000
--- a/src/main/java/com/att/dmf/mr/service/impl/UIServiceImpl.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
-*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package com.att.dmf.mr.service.impl;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-
-import org.apache.kafka.common.errors.TopicExistsException;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.springframework.stereotype.Service;
-
-import com.att.dmf.mr.CambriaApiException;
-import com.att.dmf.mr.beans.DMaaPContext;
-import com.att.dmf.mr.beans.DMaaPKafkaMetaBroker;
-import com.att.dmf.mr.metabroker.Topic;
-import com.att.dmf.mr.service.UIService;
-import com.att.dmf.mr.utils.DMaaPResponseBuilder;
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-import com.att.nsa.configs.ConfigDbException;
-import com.att.nsa.security.db.NsaApiDb;
-import com.att.nsa.security.db.simple.NsaSimpleApiKey;
-/**
- * @author muzainulhaque.qazi
- *
- */
-@Service
-public class UIServiceImpl implements UIService {
-
-
- private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(UIServiceImpl.class);
- /**
- * Returning template of hello page
- * @param dmaapContext
- * @throws IOException
- */
- @Override
- public void hello(DMaaPContext dmaapContext) throws IOException {
- LOGGER.info("Returning template of hello page.");
- DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "templates/hello.html");
- }
-
- /**
- * Fetching list of all api keys and returning in a templated form for display.
- * @param dmaapContext
- * @throws ConfigDbException
- * @throws IOException
- */
- @Override
- public void getApiKeysTable(DMaaPContext dmaapContext) throws ConfigDbException, IOException {
- // TODO - We need to work on the templates and how data will be set in
- // the template
- LOGGER.info("Fetching list of all api keys and returning in a templated form for display.");
- Map<String, NsaSimpleApiKey> keyMap = getApiKeyDb(dmaapContext).loadAllKeyRecords();
-
- LinkedList<JSONObject> keyList = new LinkedList<>();
-
- JSONObject jsonList = new JSONObject();
-
- for (Entry<String, NsaSimpleApiKey> e : keyMap.entrySet()) {
- final NsaSimpleApiKey key = e.getValue();
- final JSONObject jsonObject = new JSONObject();
- jsonObject.put("key", key.getKey());
- jsonObject.put("email", key.getContactEmail());
- jsonObject.put("description", key.getDescription());
- keyList.add(jsonObject);
- }
-
- jsonList.put("apiKeys", keyList);
-
- LOGGER.info("Returning list of all the api keys in JSON format for the template.");
- // "templates/apiKeyList.html"
- DMaaPResponseBuilder.respondOk(dmaapContext, jsonList);
-
- }
-
- /**
- * @param dmaapContext
- * @param apiKey
- * @throws ConfigDbException
- * @throws IOException
- * @throws JSONException
- * @throws Exception
- */
- @Override
- public void getApiKey(DMaaPContext dmaapContext, String apiKey) throws CambriaApiException, ConfigDbException, JSONException, IOException {
- // TODO - We need to work on the templates and how data will be set in
- // the template
- LOGGER.info("Fetching detials of apikey: " + apiKey);
- final NsaSimpleApiKey key = getApiKeyDb(dmaapContext).loadApiKey(apiKey);
-
- if (null != key) {
- LOGGER.info("Details of apikey [" + apiKey + "] found. Returning response");
- DMaaPResponseBuilder.respondOk(dmaapContext, key.asJsonObject());
- } else {
- LOGGER.info("Details of apikey [" + apiKey + "] not found. Returning response");
- throw new CambriaApiException(400,"Key [" + apiKey + "] not found.");
- }
-
- }
-
- /**
- * Fetching list of all the topics
- * @param dmaapContext
- * @throws ConfigDbException
- * @throws IOException
- */
- @Override
- public void getTopicsTable(DMaaPContext dmaapContext) throws ConfigDbException, IOException {
- // TODO - We need to work on the templates and how data will be set in
- // the template
- LOGGER.info("Fetching list of all the topics and returning in a templated form for display");
- List<Topic> topicsList = getMetaBroker(dmaapContext).getAllTopics();
-
- JSONObject jsonObject = new JSONObject();
-
- JSONArray topicsArray = new JSONArray();
-
- List<Topic> topicList = getMetaBroker(dmaapContext).getAllTopics();
-
- for (Topic topic : topicList) {
- JSONObject obj = new JSONObject();
- obj.put("topicName", topic.getName());
- obj.put("description", topic.getDescription());
- obj.put("owner", topic.getOwner());
- topicsArray.put(obj);
- }
-
- jsonObject.put("topics", topicsList);
-
- LOGGER.info("Returning the list of topics in templated format for display.");
- DMaaPResponseBuilder.respondOk(dmaapContext, jsonObject);
-
- }
-
- /**
- * @param dmaapContext
- * @param topicName
- * @throws ConfigDbException
- * @throws IOException
- * @throws TopicExistsException
- */
- @Override
- public void getTopic(DMaaPContext dmaapContext, String topicName)
- throws ConfigDbException, IOException, TopicExistsException {
- // TODO - We need to work on the templates and how data will be set in
- // the template
- LOGGER.info("Fetching detials of apikey: " + topicName);
- Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
-
- if (null == topic) {
- LOGGER.error("Topic [" + topicName + "] does not exist.");
- throw new TopicExistsException("Topic [" + topicName + "] does not exist.");
- }
-
- JSONObject json = new JSONObject();
- json.put("topicName", topic.getName());
- json.put("description", topic.getDescription());
- json.put("owner", topic.getOwner());
-
- LOGGER.info("Returning details of topic [" + topicName + "]. Sending response.");
- DMaaPResponseBuilder.respondOk(dmaapContext, json);
-
- }
-
- /**
- *
- * @param dmaapContext
- * @return
- */
- private NsaApiDb<NsaSimpleApiKey> getApiKeyDb(DMaaPContext dmaapContext) {
- return dmaapContext.getConfigReader().getfApiKeyDb();
-
- }
-
- /**
- *
- * @param dmaapContext
- * @return
- */
- private DMaaPKafkaMetaBroker getMetaBroker(DMaaPContext dmaapContext) {
- return (DMaaPKafkaMetaBroker) dmaapContext.getConfigReader().getfMetaBroker();
- }
-
-}