summaryrefslogtreecommitdiffstats
path: root/src/main/java
diff options
context:
space:
mode:
authorTomek Kaminski <tomasz.kaminski@nokia.com>2019-07-10 12:17:35 +0200
committerTomek Kaminski <tomasz.kaminski@nokia.com>2019-07-16 13:41:21 +0200
commit08f656f11693e4b172f696d14eec5237217217d0 (patch)
tree4632cfac58774680f1ed94cadcd90fc3d368730c /src/main/java
parent360b04b616c672fd95921f5638e4d843355d082c (diff)
EventsService authorization refactor
-ErrorResponseProvider extracted -subscriber method handle with refactor -publisher method handle with partial refactor Issue-ID: DMAAP-1230 Signed-off-by: Tomek Kaminski <tomasz.kaminski@nokia.com> Change-Id: I92d53edd0c791f8ce90275776b3031bb0047810e Signed-off-by: Tomek Kaminski <tomasz.kaminski@nokia.com>
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/exception/DMaaPErrorMessages.java4
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/resources/CambriaOutboundEventStream.java30
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/security/DMaaPAAFAuthenticator.java4
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/security/DMaaPAAFAuthenticatorImpl.java53
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/service/impl/ErrorResponseProvider.java147
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImpl.java510
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/utils/Utils.java15
7 files changed, 414 insertions, 349 deletions
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/exception/DMaaPErrorMessages.java b/src/main/java/org/onap/dmaap/dmf/mr/exception/DMaaPErrorMessages.java
index 7b68b42..95b0577 100644
--- a/src/main/java/org/onap/dmaap/dmf/mr/exception/DMaaPErrorMessages.java
+++ b/src/main/java/org/onap/dmaap/dmf/mr/exception/DMaaPErrorMessages.java
@@ -58,10 +58,10 @@ public class DMaaPErrorMessages {
private String topicsfailure="Failed to retrieve list of all topics.";
//@Value("${not.permitted.access.1}")
- private String notPermitted1="Access Denied.User does not have permission to perform";
+ private String notPermitted1="Access Denied.User does not have permission to perform ";
//@Value("${not.permitted.access.2}")
- private String notPermitted2="operation on Topic:";
+ private String notPermitted2=" operation on Topic:";
//@Value("${get.topic.details.failure}")
private String topicDetailsFail="Failed to retrieve details of topic:";
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/resources/CambriaOutboundEventStream.java b/src/main/java/org/onap/dmaap/dmf/mr/resources/CambriaOutboundEventStream.java
index 537fc22..2f7f1db 100644
--- a/src/main/java/org/onap/dmaap/dmf/mr/resources/CambriaOutboundEventStream.java
+++ b/src/main/java/org/onap/dmaap/dmf/mr/resources/CambriaOutboundEventStream.java
@@ -548,7 +548,35 @@ public class CambriaOutboundEventStream implements StreamWriter {
private ArrayList<Consumer> fKafkaConsumerList;
private boolean istransType = true;
// private static final Logger log =
-
+
private static final EELFLogger log = EELFManager.getInstance().getLogger(CambriaOutboundEventStream.class);
+
+ public int getfLimit() {
+ return fLimit;
+ }
+
+ public int getfTimeoutMs() {
+ return fTimeoutMs;
+ }
+
+ public boolean isfPretty() {
+ return fPretty;
+ }
+
+ public boolean isfWithMeta() {
+ return fWithMeta;
+ }
+
+ public boolean isAAFTopic() {
+ return isAAFTopic;
+ }
+
+ public boolean isIstransEnable() {
+ return istransEnable;
+ }
+
+ public boolean isIstransType() {
+ return istransType;
+ }
} \ No newline at end of file
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/security/DMaaPAAFAuthenticator.java b/src/main/java/org/onap/dmaap/dmf/mr/security/DMaaPAAFAuthenticator.java
index a7f2376..12f0465 100644
--- a/src/main/java/org/onap/dmaap/dmf/mr/security/DMaaPAAFAuthenticator.java
+++ b/src/main/java/org/onap/dmaap/dmf/mr/security/DMaaPAAFAuthenticator.java
@@ -22,12 +22,8 @@
package org.onap.dmaap.dmf.mr.security;
import javax.servlet.http.HttpServletRequest;
-
import org.onap.dmaap.dmf.mr.CambriaApiException;
-
-
-
/**
*
* @author sneha.d.desai
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/security/DMaaPAAFAuthenticatorImpl.java b/src/main/java/org/onap/dmaap/dmf/mr/security/DMaaPAAFAuthenticatorImpl.java
index 25644a7..0e6b0f6 100644
--- a/src/main/java/org/onap/dmaap/dmf/mr/security/DMaaPAAFAuthenticatorImpl.java
+++ b/src/main/java/org/onap/dmaap/dmf/mr/security/DMaaPAAFAuthenticatorImpl.java
@@ -21,6 +21,7 @@
*******************************************************************************/
package org.onap.dmaap.dmf.mr.security;
+import com.att.ajsc.filemonitor.AJSCPropertiesMap;
import javax.servlet.http.HttpServletRequest;
import org.onap.dmaap.dmf.mr.CambriaApiException;
@@ -34,47 +35,37 @@ import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
*/
public class DMaaPAAFAuthenticatorImpl implements DMaaPAAFAuthenticator {
+ private static final String NAMESPACE_PROPERTY = "defaultNSforUEB";
+ private static final String DEFAULT_NAMESPACE = "org.onap.dmaap.mr";
+ private static final String NAMESPACE_PREFIX = "org.onap";
+
/**
* @param req
* @param role
*/
@Override
public boolean aafAuthentication(HttpServletRequest req, String role) {
- boolean auth = false;
- if(req.isUserInRole(role))
- {
-
- auth = true;
- }
-
- return auth;
+ return req.isUserInRole(role);
}
@Override
public String aafPermissionString(String topicName, String action) throws CambriaApiException {
-
-
- String permission = "";
- String nameSpace ="";
- if(topicName.contains(".") && topicName.contains("org.onap")) {
-
- nameSpace = topicName.substring(0,topicName.lastIndexOf("."));
- }
- else {
- nameSpace = null;
- nameSpace= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"defaultNSforUEB");
-
- if(null==nameSpace)nameSpace="org.onap.dmaap.mr";
-
-
-
- }
-
- permission = nameSpace+".topic|:topic."+topicName+"|"+action;
- return permission;
-
+
+ String nameSpace = topicName.startsWith(NAMESPACE_PREFIX) ? parseNamespace(topicName) :
+ readNamespaceFromProperties();
+
+ nameSpace = !nameSpace.isEmpty()? nameSpace : DEFAULT_NAMESPACE;
+
+ return new StringBuilder(nameSpace).append(".topic|:topic.").append(topicName)
+ .append("|").append(action).toString();
+ }
+
+ String readNamespaceFromProperties() {
+ return AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,NAMESPACE_PROPERTY);
+ }
+
+ private String parseNamespace(String topicName) {
+ return topicName.substring(0,topicName.lastIndexOf('.'));
}
-
-
}
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/service/impl/ErrorResponseProvider.java b/src/main/java/org/onap/dmaap/dmf/mr/service/impl/ErrorResponseProvider.java
new file mode 100644
index 0000000..50a611e
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/dmf/mr/service/impl/ErrorResponseProvider.java
@@ -0,0 +1,147 @@
+/*******************************************************************************
+ * ============LICENSE_START===================================================
+ * org.onap.dmaap
+ * ============================================================================
+ * Copyright © 2019 Nokia Intellectual Property. All rights reserved.
+ * ============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================================
+ ******************************************************************************/
+package org.onap.dmaap.dmf.mr.service.impl;
+
+import com.google.common.base.Preconditions;
+import java.util.Date;
+import org.apache.http.HttpStatus;
+import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
+import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
+import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
+import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
+import org.onap.dmaap.dmf.mr.utils.Utils;
+
+class ErrorResponseProvider {
+
+ private String clientId;
+ private String topicName;
+ private String consumerGroup;
+ private String remoteHost;
+ private String publisherId;
+ private String publisherIp;
+ private DMaaPErrorMessages errorMessages;
+
+ private ErrorResponseProvider() {
+
+ }
+
+ ErrorResponse getIpBlacklistedError(String remoteAddr) {
+ return new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+ DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
+ "Source address [" + remoteAddr + "] is blacklisted. Please contact the cluster management team.",
+ null, Utils.getFormattedDate(new Date()), topicName, publisherId,
+ publisherIp, consumerGroup + "/" + clientId, remoteHost);
+ }
+
+ ErrorResponse getTopicNotFoundError() {
+ return new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.RESOURCE_NOT_FOUND.getResponseCode(),
+ errorMessages.getTopicNotExist() + "-[" + topicName + "]", null, Utils.getFormattedDate(new Date()),
+ topicName, publisherId, publisherIp, consumerGroup + "/" + clientId, remoteHost);
+ }
+
+ ErrorResponse getAafAuthorizationError(String permission, String action) {
+ return new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
+ errorMessages.getNotPermitted1() + action + errorMessages.getNotPermitted2() + topicName + " on "
+ + permission,
+ null, Utils.getFormattedDate(new Date()), topicName, publisherId, publisherIp, consumerGroup + "/" + clientId,
+ remoteHost);
+ }
+
+ ErrorResponse getServiceUnavailableError(String msg) {
+ return new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
+ DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
+ errorMessages.getServerUnav() + msg, null, Utils.getFormattedDate(new Date()), topicName,
+ publisherId, publisherIp, consumerGroup + "/" + clientId, remoteHost);
+ }
+
+ ErrorResponse getConcurrentModificationError() {
+ return new ErrorResponse(HttpStatus.SC_CONFLICT,
+ DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(),
+ "Couldn't respond to client, possible of consumer requests from more than one server. Please contact MR team if you see this issue occurs continously", null,
+ Utils.getFormattedDate(new Date()), topicName, publisherId, publisherIp, consumerGroup + "/" + clientId, remoteHost);
+ }
+
+ ErrorResponse getGenericError(String msg) {
+ return new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
+ DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
+ "Couldn't respond to client, closing cambria consumer" + msg, null,
+ Utils.getFormattedDate(new Date()), topicName, publisherId, publisherIp, consumerGroup + "/" + clientId, remoteHost);
+ }
+
+ public static class Builder {
+
+ private String clientId;
+ private String topicName;
+ private String consumerGroup;
+ private String remoteHost;
+ private String publisherId;
+ private String publisherIp;
+ DMaaPErrorMessages errorMessages;
+
+ Builder withErrorMessages(DMaaPErrorMessages errorMessages) {
+ this.errorMessages = errorMessages;
+ return this;
+ }
+
+ Builder withTopic(String topic) {
+ this.topicName = topic;
+ return this;
+ }
+
+ Builder withClient(String client) {
+ this.clientId = client;
+ return this;
+ }
+
+ Builder withConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ return this;
+ }
+
+ Builder withRemoteHost(String remoteHost) {
+ this.remoteHost = remoteHost;
+ return this;
+ }
+
+ Builder withPublisherId(String publisherId) {
+ this.publisherId = publisherId;
+ return this;
+ }
+
+ Builder withPublisherIp(String publisherIp) {
+ this.publisherIp = publisherIp;
+ return this;
+ }
+
+ public ErrorResponseProvider build() {
+ Preconditions.checkArgument(errorMessages!=null);
+ ErrorResponseProvider errRespProvider = new ErrorResponseProvider();
+ errRespProvider.errorMessages = this.errorMessages;
+ errRespProvider.clientId = this.clientId;
+ errRespProvider.consumerGroup = this.consumerGroup;
+ errRespProvider.topicName = this.topicName;
+ errRespProvider.remoteHost = this.remoteHost;
+ errRespProvider.publisherId = this.publisherId;
+ errRespProvider.publisherIp = this.publisherIp;
+ return errRespProvider;
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImpl.java b/src/main/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImpl.java
index 11c544f..ec5bfc0 100644
--- a/src/main/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImpl.java
+++ b/src/main/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImpl.java
@@ -21,34 +21,32 @@
*******************************************************************************/
package org.onap.dmaap.dmf.mr.service.impl;
+import com.att.ajsc.filemonitor.AJSCPropertiesMap;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import com.att.nsa.configs.ConfigDbException;
+import com.att.nsa.drumlin.service.standards.MimeTypes;
+import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
+import com.att.nsa.security.NsaApiKey;
+import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
+import com.att.nsa.util.rrConvertor;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.ConcurrentModificationException;
import java.util.Date;
-import java.util.HashMap;
import java.util.LinkedList;
-import java.util.Properties;
-
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.MediaType;
-
+import org.apache.commons.lang.math.NumberUtils;
import org.apache.http.HttpStatus;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.TopicExistsException;
import org.json.JSONObject;
import org.json.JSONTokener;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.stereotype.Service;
-
-import com.att.ajsc.filemonitor.AJSCPropertiesMap;
import org.onap.dmaap.dmf.mr.CambriaApiException;
import org.onap.dmaap.dmf.mr.backends.Consumer;
import org.onap.dmaap.dmf.mr.backends.ConsumerFactory;
@@ -56,7 +54,6 @@ import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException;
import org.onap.dmaap.dmf.mr.backends.MetricsSet;
import org.onap.dmaap.dmf.mr.backends.Publisher;
import org.onap.dmaap.dmf.mr.backends.Publisher.message;
-import org.onap.dmaap.dmf.mr.backends.kafka.KafkaLiveLockAvoider2;
import org.onap.dmaap.dmf.mr.beans.DMaaPCambriaLimiter;
import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
import org.onap.dmaap.dmf.mr.beans.LogDetails;
@@ -65,7 +62,6 @@ import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
-
import org.onap.dmaap.dmf.mr.metabroker.Topic;
import org.onap.dmaap.dmf.mr.resources.CambriaEventSet;
import org.onap.dmaap.dmf.mr.resources.CambriaOutboundEventStream;
@@ -74,15 +70,10 @@ import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl;
import org.onap.dmaap.dmf.mr.service.EventsService;
import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder;
+import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder.StreamWriter;
import org.onap.dmaap.dmf.mr.utils.Utils;
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-import com.att.nsa.configs.ConfigDbException;
-import com.att.nsa.drumlin.service.standards.MimeTypes;
-import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
-import com.att.nsa.security.NsaApiKey;
-import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
-import com.att.nsa.util.rrConvertor;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
/**
* This class provides the functinality to publish and subscribe message to
@@ -93,20 +84,20 @@ import com.att.nsa.util.rrConvertor;
*/
@Service
public class EventsServiceImpl implements EventsService {
- // private static final Logger LOG =
private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventsServiceImpl.class);
-
private static final String BATCH_LENGTH = "event.batch.length";
private static final String TRANSFER_ENCODING = "Transfer-Encoding";
+ private static final String TIMEOUT_PROPERTY = "timeout";
+ private static final String SUBSCRIBE_ACTION = "sub";
+ private static final String PUBLISH_ACTION = "pub";
+
@Autowired
private DMaaPErrorMessages errorMessages;
-
- //@Autowired
-
- // @Value("${metrics.send.cambria.topic}")
-
+ String getPropertyFromAJSCmap(String propertyKey) {
+ return AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, propertyKey);
+ }
public DMaaPErrorMessages getErrorMessages() {
return errorMessages;
@@ -129,222 +120,91 @@ public class EventsServiceImpl implements EventsService {
*/
@Override
public void getEvents(DMaaPContext ctx, String topic, String consumerGroup, String clientId)
- throws ConfigDbException, TopicExistsException, AccessDeniedException, UnavailableException,
- CambriaApiException, IOException, DMaaPAccessDeniedException {
+ throws ConfigDbException, AccessDeniedException, UnavailableException,
+ CambriaApiException, IOException {
+
final long startTime = System.currentTimeMillis();
final HttpServletRequest req = ctx.getRequest();
-
- boolean isAAFTopic = false;
- // was this host blacklisted?
- final String remoteAddr = Utils.getRemoteAddress(ctx);
- if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) {
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
- "Source address [" + remoteAddr + "] is blacklisted. Please contact the cluster management team.",
- null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
- ctx.getRequest().getRemoteHost(), null, null);
- LOG.info(errRes.toString());
- throw new CambriaApiException(errRes);
- }
-
- int limit = CambriaConstants.kNoLimit;
- if (req.getParameter("limit") != null) {
- limit = Integer.parseInt(req.getParameter("limit"));
- }
-
- int timeoutMs = CambriaConstants.kNoTimeout;
- String strtimeoutMS = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "timeout");
- if (strtimeoutMS != null)
- timeoutMs = Integer.parseInt(strtimeoutMS);
- // int timeoutMs = ctx.getConfigReader().getSettings().getInt("timeout",
-
- if (req.getParameter("timeout") != null) {
- timeoutMs = Integer.parseInt(req.getParameter("timeout"));
- }
-
- // By default no filter is applied if filter is not passed as a
- // parameter in the request URI
- String topicFilter = CambriaConstants.kNoFilter;
- if (null != req.getParameter("filter")) {
- topicFilter = req.getParameter("filter");
- }
- // pretty to print the messaages in new line
- String prettyval = "0";
- String strPretty = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "pretty");
- if (null != strPretty)
- prettyval = strPretty;
-
- String metaval = "0";
- String strmeta = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "meta");
- if (null != strmeta)
- metaval = strmeta;
-
- final boolean pretty = rrConvertor.convertToBooleanBroad(prettyval);
- // withMeta to print offset along with message
- final boolean withMeta = rrConvertor.convertToBooleanBroad(metaval);
-
final LogWrap logger = new LogWrap(topic, consumerGroup, clientId);
- logger.info("fetch: timeout=" + timeoutMs + ", limit=" + limit + ", filter=" + topicFilter + " from Remote host "+ctx.getRequest().getRemoteHost());
+ final String remoteHost = req.getRemoteHost();
+ ErrorResponseProvider errRespProvider = new ErrorResponseProvider.Builder().withErrorMessages(errorMessages)
+ .withTopic(topic).withConsumerGroup(consumerGroup).withClient(clientId).withRemoteHost(remoteHost).build();
- // is this user allowed to read this topic?
- final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
- final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
+ validateIpBlacklist(errRespProvider, ctx);
- if (metatopic == null) {
- // no such topic.
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
- DMaaPResponseCode.RESOURCE_NOT_FOUND.getResponseCode(),
- errorMessages.getTopicNotExist() + "-[" + topic + "]", null, Utils.getFormattedDate(new Date()),
- topic, null, null, consumerGroup + "/" + clientId, ctx.getRequest().getRemoteHost());
- LOG.info(errRes.toString());
- throw new CambriaApiException(errRes);
- }
- String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
- "metrics.send.cambria.topic");
- if (null == metricTopicname)
- metricTopicname = "msgrtr.apinode.metrics.dmaap";
-
- boolean topicNameEnforced = false;
- String topicNameStd = null;
- topicNameStd = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
- "enforced.topic.name.AAF");
- if (null != topicNameStd && topic.startsWith(topicNameStd)) {
- topicNameEnforced = true;
+ final Topic metaTopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
+ if (metaTopic == null) {
+ throw new CambriaApiException(errRespProvider.getTopicNotFoundError());
}
- if (null == ctx.getRequest().getHeader("Authorization") && !topic.equalsIgnoreCase(metricTopicname)) {
- if (null != metatopic.getOwner() && !("".equals(metatopic.getOwner()))) {
- // check permissions
- metatopic.checkUserRead(user);
- }
- }
- // if headers are not provided then user will be null
- if (topicNameEnforced ||(user == null && null != ctx.getRequest().getHeader("Authorization"))) {
- // the topic name will be sent by the client
-
- DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
- String permission = aaf.aafPermissionString(topic, "sub");
- if (!aaf.aafAuthentication(ctx.getRequest(), permission)) {
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
- DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
- errorMessages.getNotPermitted1() + " read " + errorMessages.getNotPermitted2() + topic + " on "
- + permission,
- null, Utils.getFormattedDate(new Date()), topic, null, null, consumerGroup + "/" + clientId,
- ctx.getRequest().getRemoteHost());
- LOG.info(errRes.toString());
- throw new DMaaPAccessDeniedException(errRes);
+ boolean isAAFTopic = authorizeClientWhenNeeded(ctx, metaTopic, topic, errRespProvider, SUBSCRIBE_ACTION);
- }
- isAAFTopic = true;
- }
final long elapsedMs1 = System.currentTimeMillis() - startTime;
logger.info("Time taken in getEvents Authorization " + elapsedMs1 + " ms for " + topic + " " + consumerGroup
+ " " + clientId);
- Consumer c = null;
-
- String lhostId = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
- "clusterhostid");
- if (null == lhostId) {
- try {
- lhostId = InetAddress.getLocalHost().getCanonicalHostName();
- } catch (UnknownHostException e) {
- LOG.info("Unknown Host Exception error occured while getting getting hostid");
- }
- }
- CambriaOutboundEventStream coes = null;
+ verifyHostId();
+ final boolean pretty = isPrettyPrintEnabled();
+ final boolean withMeta = isMetaOffsetEnabled();
+ int timeoutMs = getMessageTimeout(req);
+ int limit = getMessageLimit(req);
+ String topicFilter = (null != req.getParameter("filter")) ? req.getParameter("filter") : CambriaConstants.kNoFilter;
+ logger.info("fetch: timeout=" + timeoutMs + ", limit=" + limit + ", filter=" + topicFilter + " from Remote host "+ctx.getRequest().getRemoteHost());
+
+ Consumer consumer = null;
try {
final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
final DMaaPCambriaLimiter rl = ctx.getConfigReader().getfRateLimiter();
- rl.onCall(topic, consumerGroup, clientId, ctx.getRequest().getRemoteHost());
- c = ctx.getConfigReader().getfConsumerFactory().getConsumerFor(topic, consumerGroup, clientId, timeoutMs,
- ctx.getRequest().getRemoteHost());
- coes = new CambriaOutboundEventStream.Builder(c).timeout(timeoutMs)
+ rl.onCall(topic, consumerGroup, clientId, remoteHost);
+ consumer = ctx.getConfigReader().getfConsumerFactory().getConsumerFor(topic, consumerGroup, clientId, timeoutMs,
+ remoteHost);
+ CambriaOutboundEventStream coes = new CambriaOutboundEventStream.Builder(consumer).timeout(timeoutMs)
.limit(limit).filter(topicFilter).pretty(pretty).withMeta(withMeta).build();
coes.setDmaapContext(ctx);
- coes.setTopic(metatopic);
- if (isTransEnabled() || isAAFTopic) {
- coes.setTransEnabled(true);
- } else {
- coes.setTransEnabled(false);
- }
+ coes.setTopic(metaTopic);
+ coes.setTransEnabled(isTransEnabled() || isAAFTopic);
coes.setTopicStyle(isAAFTopic);
final long elapsedMs2 = System.currentTimeMillis() - startTime;
logger.info("Time taken in getEvents getConsumerFor " + elapsedMs2 + " ms for " + topic + " "
+ consumerGroup + " " + clientId);
- DMaaPResponseBuilder.setNoCacheHeadings(ctx);
-
- DMaaPResponseBuilder.respondOkWithStream(ctx, MediaType.APPLICATION_JSON, coes);
+ respondOkWithStream(ctx, coes);
// No IOException thrown during respondOkWithStream, so commit the
// new offsets to all the brokers
- c.commitOffsets();
+ consumer.commitOffsets();
final int sent = coes.getSentCount();
-
- metricsSet.consumeTick(sent);
- rl.onSend(topic, consumerGroup, clientId, sent);
+ metricsSet.consumeTick(sent);
+ rl.onSend(topic, consumerGroup, clientId, sent);
final long elapsedMs = System.currentTimeMillis() - startTime;
- logger.info("Sent " + sent + " msgs in " + elapsedMs + " ms; committed to offset " + c.getOffset() + " for "
+ logger.info("Sent " + sent + " msgs in " + elapsedMs + " ms; committed to offset " + consumer.getOffset() + " for "
+ topic + " " + consumerGroup + " " + clientId + " on to the server "
- + ctx.getRequest().getRemoteHost());
+ + remoteHost);
} catch (UnavailableException excp) {
logger.warn(excp.getMessage(), excp);
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
- DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
- errorMessages.getServerUnav() + excp.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
- null, null, consumerGroup + "-" + clientId, ctx.getRequest().getRemoteHost());
+ ErrorResponse errRes = errRespProvider.getServiceUnavailableError(excp.getMessage());
LOG.info(errRes.toString());
throw new CambriaApiException(errRes);
- } catch (java.util.ConcurrentModificationException excp1) {
- LOG.info(excp1.getMessage() + "on " + topic + " " + consumerGroup + " ****** " + clientId + " from Remote"+ctx.getRequest().getRemoteHost());
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
- DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(),
- "Couldn't respond to client, possible of consumer requests from more than one server. Please contact MR team if you see this issue occurs continously", null,
- Utils.getFormattedDate(new Date()), topic, null, null, clientId, ctx.getRequest().getRemoteHost());
+ } catch (ConcurrentModificationException excp1) {
+ LOG.info(excp1.getMessage() + "on " + topic + " " + consumerGroup + " ****** " + clientId + " from Remote"+remoteHost);
+ ErrorResponse errRes = errRespProvider.getConcurrentModificationError();
logger.info(errRes.toString());
throw new CambriaApiException(errRes);
- } catch (CambriaApiException excp) {
- LOG.info(excp.getMessage() + "on " + topic + " " + consumerGroup + " ****** " + clientId);
-
- throw excp;
- }
- catch (Exception excp) {
- // System.out.println(excp + "------------------ " + topic+"
- // "+consumerGroup+" "+clientId);
-
+ } catch (Exception excp) {
logger.info("Couldn't respond to client, closing cambria consumer " + " " + topic + " " + consumerGroup
+ " " + clientId + " " + HttpStatus.SC_SERVICE_UNAVAILABLE + " ****** " + excp);
-
- ctx.getConfigReader().getfConsumerFactory().destroyConsumer(topic, consumerGroup, clientId);
-
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
- DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
- "Couldn't respond to client, closing cambria consumer" + excp.getMessage(), null,
- Utils.getFormattedDate(new Date()), topic, null, null, clientId, ctx.getRequest().getRemoteHost());
+ ctx.getConfigReader().getfConsumerFactory().destroyConsumer(topic, consumerGroup, clientId);
+ ErrorResponse errRes = errRespProvider.getGenericError(excp.getMessage());
logger.info(errRes.toString());
throw new CambriaApiException(errRes);
} finally {
- coes = null;
- // If no cache, close the consumer now that we're done with it.
- boolean kSetting_EnableCache = ConsumerFactory.kDefault_IsCacheEnabled;
- String strkSetting_EnableCache = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
- ConsumerFactory.kSetting_EnableCache);
- if (null != strkSetting_EnableCache)
- kSetting_EnableCache = Boolean.parseBoolean(strkSetting_EnableCache);
- // if
- // (!ctx.getConfigReader().getSettings().getBoolean(ConsumerFactory.kSetting_EnableCache,
- // ConsumerFactory.kDefault_IsCacheEnabled) && (c != null)) {
- if (!kSetting_EnableCache && (c != null)) {
+ if (consumer != null && !isCacheEnabled()) {
try {
- c.close();
+ consumer.close();
} catch (Exception e) {
- logger.info("***Exception occured in getEvents finaly block while closing the consumer " + " "
+ logger.info("***Exception occurred in getEvents finally block while closing the consumer " + " "
+ topic + " " + consumerGroup + " " + clientId + " " + HttpStatus.SC_SERVICE_UNAVAILABLE
+ " " + e);
}
@@ -352,117 +212,155 @@ public class EventsServiceImpl implements EventsService {
}
}
- /**
- * @throws missingReqdSetting
- *
- */
- @Override
- public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition,
- final String requestTime) throws ConfigDbException, AccessDeniedException, TopicExistsException,
- CambriaApiException, IOException, missingReqdSetting, DMaaPAccessDeniedException {
-
- // is this user allowed to write to this topic?
- final long startMs = System.currentTimeMillis();
- final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
- final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
- boolean isAAFTopic = false;
-
- // was this host blacklisted?
+ private void validateIpBlacklist(ErrorResponseProvider errResponseProvider, DMaaPContext ctx) throws CambriaApiException {
final String remoteAddr = Utils.getRemoteAddress(ctx);
-
if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) {
-
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
- "Source address [" + remoteAddr + "] is blacklisted. Please contact the cluster management team.",
- null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
- ctx.getRequest().getRemoteHost(), null, null);
+ ErrorResponse errRes = errResponseProvider.getIpBlacklistedError(remoteAddr);
LOG.info(errRes.toString());
throw new CambriaApiException(errRes);
}
+ }
- String topicNameStd = null;
+ private boolean authorizeClientWhenNeeded(DMaaPContext ctx, Topic metaTopic, String topicName,
+ ErrorResponseProvider errRespProvider, String action) throws CambriaApiException, AccessDeniedException {
- // topicNameStd=
-
- topicNameStd = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
- "enforced.topic.name.AAF");
- String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
- "metrics.send.cambria.topic");
- if (null == metricTopicname)
- metricTopicname = "msgrtr.apinode.metrics.dmaap";
- boolean topicNameEnforced = false;
- if (null != topicNameStd && topic.startsWith(topicNameStd)) {
- topicNameEnforced = true;
+ boolean isAAFTopic = false;
+ String metricTopicName = getMetricTopicName();
+ if(!metricTopicName.equalsIgnoreCase(topicName)) {
+ if(isCadiEnabled() && isTopicNameEnforcedAaf(topicName)) {
+ isAAFTopic = true;
+ DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
+ String permission = aaf.aafPermissionString(topicName, action);
+ if (!aaf.aafAuthentication(ctx.getRequest(), permission)) {
+ ErrorResponse errRes = errRespProvider.getAafAuthorizationError(permission, action);
+ LOG.info(errRes.toString());
+ throw new DMaaPAccessDeniedException(errRes);
+
+ }
+ } else if( null != metaTopic.getOwner() && !metaTopic.getOwner().isEmpty()) {
+ final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
+ if(SUBSCRIBE_ACTION.equals(action)) {
+ metaTopic.checkUserRead(user);
+ } else if(PUBLISH_ACTION.equals(action)) {
+ metaTopic.checkUserWrite(user);
+ }
+ }
}
+ return isAAFTopic;
+ }
+
+ boolean isCadiEnabled() {
+ return Utils.isCadiEnabled();
+ }
- // Here check if the user has rights to publish on the topic
- // ( This will be called when no auth is added or when UEB API Key
- // Authentication is used)
- // checkUserWrite(user) method will throw an error when there is no Auth
- // header added or when the
- // user has no publish rights
+ void respondOkWithStream(DMaaPContext ctx, StreamWriter coes) throws IOException{
+ DMaaPResponseBuilder.setNoCacheHeadings(ctx);
+ DMaaPResponseBuilder.respondOkWithStream(ctx, MediaType.APPLICATION_JSON, coes);
+ }
- if (null != metatopic && null != metatopic.getOwner() && !("".equals(metatopic.getOwner()))
- && null == ctx.getRequest().getHeader("Authorization") && !topic.equalsIgnoreCase(metricTopicname)) {
- metatopic.checkUserWrite(user);
- }
+ private int getMessageLimit(HttpServletRequest request) {
+ return NumberUtils.toInt(request.getParameter("limit"), CambriaConstants.kNoLimit);
+ }
- // if headers are not provided then user will be null
- if (topicNameEnforced || (user == null && null != ctx.getRequest().getHeader("Authorization")
- && !topic.equalsIgnoreCase(metricTopicname))) {
- // the topic name will be sent by the client
-
- DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
- String permission = aaf.aafPermissionString(topic, "pub");
- if (!aaf.aafAuthentication(ctx.getRequest(), permission)) {
- ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
- DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
- errorMessages.getNotPermitted1() + " publish " + errorMessages.getNotPermitted2() + topic
- + " on " + permission,
- null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
- ctx.getRequest().getRemoteHost(), null, null);
- LOG.info(errRes.toString());
- throw new DMaaPAccessDeniedException(errRes);
+ private int getMessageTimeout(HttpServletRequest request) {
+ String timeoutMsAsString = getPropertyFromAJSCmap(TIMEOUT_PROPERTY);
+ int defaultTimeoutMs = timeoutMsAsString!=null ? NumberUtils.toInt(timeoutMsAsString, CambriaConstants.kNoTimeout) :
+ CambriaConstants.kNoTimeout;
+
+ String timeoutProperty = request.getParameter(TIMEOUT_PROPERTY);
+ return timeoutProperty != null ? NumberUtils.toInt(timeoutProperty, defaultTimeoutMs) : defaultTimeoutMs;
+ }
+
+ private boolean isPrettyPrintEnabled() {
+ return rrConvertor.convertToBooleanBroad(getPropertyFromAJSCmap("pretty"));
+ }
+
+ private boolean isMetaOffsetEnabled() {
+ return rrConvertor.convertToBooleanBroad(getPropertyFromAJSCmap( "meta"));
+ }
+
+ private boolean isTopicNameEnforcedAaf(String topicName) {
+ String topicNameStd = getPropertyFromAJSCmap("enforced.topic.name.AAF");
+ return !topicNameStd.isEmpty() && topicName.startsWith(topicNameStd);
+ }
+
+ private boolean isCacheEnabled() {
+ String cachePropsSetting = getPropertyFromAJSCmap(ConsumerFactory.kSetting_EnableCache);
+ return !cachePropsSetting.isEmpty() ? Boolean.parseBoolean(cachePropsSetting) : ConsumerFactory.kDefault_IsCacheEnabled;
+ }
+
+ private void verifyHostId() {
+ String lhostId = getPropertyFromAJSCmap("clusterhostid");
+ if (lhostId.isEmpty()) {
+ try {
+ InetAddress.getLocalHost().getCanonicalHostName();
+ } catch (UnknownHostException e) {
+ LOG.warn("Unknown Host Exception error occurred while getting getting hostid", e);
}
- isAAFTopic = true;
+
}
+ }
- final HttpServletRequest req = ctx.getRequest();
+ private String getMetricTopicName() {
+ String metricTopicFromProps = getPropertyFromAJSCmap("metrics.send.cambria.topic");
+ return !metricTopicFromProps.isEmpty() ? metricTopicFromProps : "msgrtr.apinode.metrics.dmaap";
+ }
- // check for chunked input
- boolean chunked = false;
- if (null != req.getHeader(TRANSFER_ENCODING)) {
- chunked = req.getHeader(TRANSFER_ENCODING).contains("chunked");
- }
- // get the media type, or set it to a generic value if it wasn't
- // provided
- String mediaType = req.getContentType();
- if (mediaType == null || mediaType.length() == 0) {
- mediaType = MimeTypes.kAppGenericBinary;
- }
+ /**
+ * @throws missingReqdSetting
+ *
+ */
+ @Override
+ public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition,
+ final String requestTime) throws ConfigDbException, AccessDeniedException,
+ CambriaApiException, IOException, missingReqdSetting {
- if (mediaType.contains("charset=UTF-8")) {
- mediaType = mediaType.replace("; charset=UTF-8", "").trim();
- }
+ final long startMs = System.currentTimeMillis();
+ String remoteHost = ctx.getRequest().getRemoteHost();
+ ErrorResponseProvider errRespProvider = new ErrorResponseProvider.Builder().withErrorMessages(errorMessages)
+ .withTopic(topic).withRemoteHost(remoteHost).withPublisherIp(remoteHost)
+ .withPublisherId(Utils.getUserApiKey(ctx.getRequest())).build();
- String istransidUEBtopicreqd = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
- "transidUEBtopicreqd");
- boolean istransidreqd = false;
- if (null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true")) {
- istransidreqd = true;
+ validateIpBlacklist(errRespProvider, ctx);
+
+ final Topic metaTopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
+ if (metaTopic == null) {
+ throw new CambriaApiException(errRespProvider.getTopicNotFoundError());
}
- if (isAAFTopic || istransidreqd) {
+ final boolean isAAFTopic = authorizeClientWhenNeeded(ctx, metaTopic, topic, errRespProvider, PUBLISH_ACTION);
+
+ final HttpServletRequest req = ctx.getRequest();
+ boolean chunked = isRequestedChunk(req);
+ String mediaType = getMediaType(req);
+ boolean transactionRequired = isTransactionIdRequired();
+
+ if (isAAFTopic || transactionRequired) {
pushEventsWithTransaction(ctx, msg, topic, defaultPartition, requestTime, chunked, mediaType);
} else {
pushEvents(ctx, topic, msg, defaultPartition, chunked, mediaType);
}
+
final long endMs = System.currentTimeMillis();
final long totalMs = endMs - startMs;
-
LOG.info("Overall Response time - Published " + " msgs in " + totalMs + " ms for topic " + topic);
+ }
+
+ private boolean isRequestedChunk(HttpServletRequest request) {
+ return null != request.getHeader(TRANSFER_ENCODING) &&
+ request.getHeader(TRANSFER_ENCODING).contains("chunked");
+ }
+
+ private String getMediaType(HttpServletRequest request) {
+ String mediaType = request.getContentType();
+ if (mediaType == null || mediaType.length() == 0) {
+ return MimeTypes.kAppGenericBinary;
+ }
+ return mediaType.replace("; charset=UTF-8", "").trim();
+ }
+ private boolean isTransactionIdRequired() {
+ return getPropertyFromAJSCmap("transidUEBtopicreqd").equalsIgnoreCase("true");
}
/**
@@ -481,7 +379,7 @@ public class EventsServiceImpl implements EventsService {
*/
private void pushEvents(DMaaPContext ctx, String topic, InputStream msg, String defaultPartition, boolean chunked,
String mediaType)
- throws ConfigDbException, AccessDeniedException, TopicExistsException, CambriaApiException, IOException {
+ throws ConfigDbException, AccessDeniedException, CambriaApiException, IOException {
final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
// setup the event set
final CambriaEventSet events = new CambriaEventSet(mediaType, msg, chunked, defaultPartition);
@@ -490,8 +388,8 @@ public class EventsServiceImpl implements EventsService {
final long startMs = System.currentTimeMillis();
long count = 0;
long maxEventBatch = 1024L* 16;
- String batchlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH);
- if (null != batchlen)
+ String batchlen = getPropertyFromAJSCmap( BATCH_LENGTH);
+ if (null != batchlen && !batchlen.isEmpty())
maxEventBatch = Long.parseLong(batchlen);
// long maxEventBatch =
@@ -550,7 +448,7 @@ public class EventsServiceImpl implements EventsService {
final JSONObject response = new JSONObject();
response.put("count", count);
response.put("serverTimeMs", totalMs);
- DMaaPResponseBuilder.respondOk(ctx, response);
+ respondOk(ctx, response);
} catch (Exception excp) {
int status = HttpStatus.SC_NOT_FOUND;
@@ -590,7 +488,7 @@ public class EventsServiceImpl implements EventsService {
*/
private void pushEventsWithTransaction(DMaaPContext ctx, InputStream inputStream, final String topic,
final String partitionKey, final String requestTime, final boolean chunked, final String mediaType)
- throws ConfigDbException, AccessDeniedException, TopicExistsException, IOException, CambriaApiException {
+ throws ConfigDbException, AccessDeniedException, IOException, CambriaApiException {
final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
@@ -601,8 +499,8 @@ public class EventsServiceImpl implements EventsService {
final long startMs = System.currentTimeMillis();
long count = 0;
long maxEventBatch = 1024L * 16;
- String evenlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH);
- if (null != evenlen)
+ String evenlen = getPropertyFromAJSCmap( BATCH_LENGTH);
+ if (null != evenlen && !evenlen.isEmpty())
maxEventBatch = Long.parseLong(evenlen);
// final long maxEventBatch =
@@ -639,9 +537,9 @@ public class EventsServiceImpl implements EventsService {
responseTransactionId = m.getLogDetails().getTransactionId();
- JSONObject jsonObject = new JSONObject();
- jsonObject.put("msgWrapMR", m.getMessage());
- jsonObject.put("transactionId", responseTransactionId);
+ //JSONObject jsonObject = new JSONObject();
+ //jsonObject.put("msgWrapMR", m.getMessage());
+ //jsonObject.put("transactionId", responseTransactionId);
// final KeyedMessage<String, String> data = new
// KeyedMessage<String, String>(topic, m.getKey(),
@@ -755,8 +653,9 @@ public class EventsServiceImpl implements EventsService {
// build a response
final JSONObject response = new JSONObject();
response.put("count", count);
+ response.put("transactionId", responseTransactionId);
response.put("serverTimeMs", totalMs);
- DMaaPResponseBuilder.respondOk(ctx, response);
+ respondOk(ctx, response);
} catch (Exception excp) {
int status = HttpStatus.SC_NOT_FOUND;
@@ -798,6 +697,10 @@ public class EventsServiceImpl implements EventsService {
msg.setLogDetails(logDetails);
}
+ void respondOk(DMaaPContext ctx, JSONObject response) throws IOException {
+ DMaaPResponseBuilder.respondOk(ctx, response);
+ }
+
/**
*
* @author anowarul.islam
@@ -837,8 +740,7 @@ public class EventsServiceImpl implements EventsService {
}
public boolean isTransEnabled() {
- String istransidUEBtopicreqd = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
- "transidUEBtopicreqd");
+ String istransidUEBtopicreqd = getPropertyFromAJSCmap("transidUEBtopicreqd");
boolean istransidreqd = false;
if ((null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true"))) {
istransidreqd = true;
@@ -863,13 +765,5 @@ public class EventsServiceImpl implements EventsService {
return logDetails;
}
-
-
-
-
-
-
-
-
} \ No newline at end of file
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/utils/Utils.java b/src/main/java/org/onap/dmaap/dmf/mr/utils/Utils.java
index 3048251..40e6840 100644
--- a/src/main/java/org/onap/dmaap/dmf/mr/utils/Utils.java
+++ b/src/main/java/org/onap/dmaap/dmf/mr/utils/Utils.java
@@ -23,6 +23,7 @@ package org.onap.dmaap.dmf.mr.utils;
import java.io.IOException;
import java.io.InputStream;
+import java.security.Principal;
import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
@@ -46,7 +47,9 @@ public class Utils {
private static final String DATE_FORMAT = "dd-MM-yyyy::hh:mm:ss:SSS";
public static final String CAMBRIA_AUTH_HEADER = "X-CambriaAuth";
+ private static final String AUTH_HEADER = "Authorization";
private static final String BATCH_ID_FORMAT = "000000";
+ private static final String X509_ATTR = "javax.servlet.request.X509Certificate";
private static final EELFLogger log = EELFManager.getInstance().getLogger(Utils.class);
private Utils() {
@@ -75,15 +78,21 @@ public class Utils {
if (null != auth) {
final String[] splittedAuthKey = auth.split(":");
return splittedAuthKey[0];
- }else if (null!=request.getHeader("Authorization")){
+ }else if (null != request.getHeader(AUTH_HEADER) || null != request.getAttribute(X509_ATTR)){
/**
* AAF implementation enhancement
*/
- String user= request.getUserPrincipal().getName().toString();
- return user.substring(0, user.lastIndexOf("@"));
+ Principal principal = request.getUserPrincipal();
+ if(principal != null){
+ String name = principal.getName();
+ return name.substring(0, name.lastIndexOf('@'));
+ }
+ log.warn("No principal has been provided on HTTP request");
}
return null;
}
+
+
/**
* to format the batch sequence id
* @param batchId