diff options
Diffstat (limited to 'src/main')
7 files changed, 414 insertions, 349 deletions
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/exception/DMaaPErrorMessages.java b/src/main/java/org/onap/dmaap/dmf/mr/exception/DMaaPErrorMessages.java index 7b68b42..95b0577 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/exception/DMaaPErrorMessages.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/exception/DMaaPErrorMessages.java @@ -58,10 +58,10 @@ public class DMaaPErrorMessages { private String topicsfailure="Failed to retrieve list of all topics."; //@Value("${not.permitted.access.1}") - private String notPermitted1="Access Denied.User does not have permission to perform"; + private String notPermitted1="Access Denied.User does not have permission to perform "; //@Value("${not.permitted.access.2}") - private String notPermitted2="operation on Topic:"; + private String notPermitted2=" operation on Topic:"; //@Value("${get.topic.details.failure}") private String topicDetailsFail="Failed to retrieve details of topic:"; diff --git a/src/main/java/org/onap/dmaap/dmf/mr/resources/CambriaOutboundEventStream.java b/src/main/java/org/onap/dmaap/dmf/mr/resources/CambriaOutboundEventStream.java index 537fc22..2f7f1db 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/resources/CambriaOutboundEventStream.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/resources/CambriaOutboundEventStream.java @@ -548,7 +548,35 @@ public class CambriaOutboundEventStream implements StreamWriter { private ArrayList<Consumer> fKafkaConsumerList; private boolean istransType = true; // private static final Logger log = - + private static final EELFLogger log = EELFManager.getInstance().getLogger(CambriaOutboundEventStream.class); + + public int getfLimit() { + return fLimit; + } + + public int getfTimeoutMs() { + return fTimeoutMs; + } + + public boolean isfPretty() { + return fPretty; + } + + public boolean isfWithMeta() { + return fWithMeta; + } + + public boolean isAAFTopic() { + return isAAFTopic; + } + + public boolean isIstransEnable() { + return istransEnable; + } + + public boolean isIstransType() { + return istransType; + } }
\ No newline at end of file diff --git a/src/main/java/org/onap/dmaap/dmf/mr/security/DMaaPAAFAuthenticator.java b/src/main/java/org/onap/dmaap/dmf/mr/security/DMaaPAAFAuthenticator.java index a7f2376..12f0465 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/security/DMaaPAAFAuthenticator.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/security/DMaaPAAFAuthenticator.java @@ -22,12 +22,8 @@ package org.onap.dmaap.dmf.mr.security; import javax.servlet.http.HttpServletRequest; - import org.onap.dmaap.dmf.mr.CambriaApiException; - - - /** * * @author sneha.d.desai diff --git a/src/main/java/org/onap/dmaap/dmf/mr/security/DMaaPAAFAuthenticatorImpl.java b/src/main/java/org/onap/dmaap/dmf/mr/security/DMaaPAAFAuthenticatorImpl.java index 25644a7..0e6b0f6 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/security/DMaaPAAFAuthenticatorImpl.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/security/DMaaPAAFAuthenticatorImpl.java @@ -21,6 +21,7 @@ *******************************************************************************/ package org.onap.dmaap.dmf.mr.security; +import com.att.ajsc.filemonitor.AJSCPropertiesMap; import javax.servlet.http.HttpServletRequest; import org.onap.dmaap.dmf.mr.CambriaApiException; @@ -34,47 +35,37 @@ import org.onap.dmaap.dmf.mr.constants.CambriaConstants; */ public class DMaaPAAFAuthenticatorImpl implements DMaaPAAFAuthenticator { + private static final String NAMESPACE_PROPERTY = "defaultNSforUEB"; + private static final String DEFAULT_NAMESPACE = "org.onap.dmaap.mr"; + private static final String NAMESPACE_PREFIX = "org.onap"; + /** * @param req * @param role */ @Override public boolean aafAuthentication(HttpServletRequest req, String role) { - boolean auth = false; - if(req.isUserInRole(role)) - { - - auth = true; - } - - return auth; + return req.isUserInRole(role); } @Override public String aafPermissionString(String topicName, String action) throws CambriaApiException { - - - String permission = ""; - String nameSpace =""; - if(topicName.contains(".") && topicName.contains("org.onap")) { - - nameSpace = topicName.substring(0,topicName.lastIndexOf(".")); - } - else { - nameSpace = null; - nameSpace= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"defaultNSforUEB"); - - if(null==nameSpace)nameSpace="org.onap.dmaap.mr"; - - - - } - - permission = nameSpace+".topic|:topic."+topicName+"|"+action; - return permission; - + + String nameSpace = topicName.startsWith(NAMESPACE_PREFIX) ? parseNamespace(topicName) : + readNamespaceFromProperties(); + + nameSpace = !nameSpace.isEmpty()? nameSpace : DEFAULT_NAMESPACE; + + return new StringBuilder(nameSpace).append(".topic|:topic.").append(topicName) + .append("|").append(action).toString(); + } + + String readNamespaceFromProperties() { + return AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,NAMESPACE_PROPERTY); + } + + private String parseNamespace(String topicName) { + return topicName.substring(0,topicName.lastIndexOf('.')); } - - } diff --git a/src/main/java/org/onap/dmaap/dmf/mr/service/impl/ErrorResponseProvider.java b/src/main/java/org/onap/dmaap/dmf/mr/service/impl/ErrorResponseProvider.java new file mode 100644 index 0000000..50a611e --- /dev/null +++ b/src/main/java/org/onap/dmaap/dmf/mr/service/impl/ErrorResponseProvider.java @@ -0,0 +1,147 @@ +/******************************************************************************* + * ============LICENSE_START=================================================== + * org.onap.dmaap + * ============================================================================ + * Copyright © 2019 Nokia Intellectual Property. All rights reserved. + * ============================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================================== + ******************************************************************************/ +package org.onap.dmaap.dmf.mr.service.impl; + +import com.google.common.base.Preconditions; +import java.util.Date; +import org.apache.http.HttpStatus; +import org.onap.dmaap.dmf.mr.beans.DMaaPContext; +import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages; +import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode; +import org.onap.dmaap.dmf.mr.exception.ErrorResponse; +import org.onap.dmaap.dmf.mr.utils.Utils; + +class ErrorResponseProvider { + + private String clientId; + private String topicName; + private String consumerGroup; + private String remoteHost; + private String publisherId; + private String publisherIp; + private DMaaPErrorMessages errorMessages; + + private ErrorResponseProvider() { + + } + + ErrorResponse getIpBlacklistedError(String remoteAddr) { + return new ErrorResponse(HttpStatus.SC_FORBIDDEN, + DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), + "Source address [" + remoteAddr + "] is blacklisted. Please contact the cluster management team.", + null, Utils.getFormattedDate(new Date()), topicName, publisherId, + publisherIp, consumerGroup + "/" + clientId, remoteHost); + } + + ErrorResponse getTopicNotFoundError() { + return new ErrorResponse(HttpStatus.SC_NOT_FOUND, + DMaaPResponseCode.RESOURCE_NOT_FOUND.getResponseCode(), + errorMessages.getTopicNotExist() + "-[" + topicName + "]", null, Utils.getFormattedDate(new Date()), + topicName, publisherId, publisherIp, consumerGroup + "/" + clientId, remoteHost); + } + + ErrorResponse getAafAuthorizationError(String permission, String action) { + return new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, + DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), + errorMessages.getNotPermitted1() + action + errorMessages.getNotPermitted2() + topicName + " on " + + permission, + null, Utils.getFormattedDate(new Date()), topicName, publisherId, publisherIp, consumerGroup + "/" + clientId, + remoteHost); + } + + ErrorResponse getServiceUnavailableError(String msg) { + return new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE, + DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(), + errorMessages.getServerUnav() + msg, null, Utils.getFormattedDate(new Date()), topicName, + publisherId, publisherIp, consumerGroup + "/" + clientId, remoteHost); + } + + ErrorResponse getConcurrentModificationError() { + return new ErrorResponse(HttpStatus.SC_CONFLICT, + DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(), + "Couldn't respond to client, possible of consumer requests from more than one server. Please contact MR team if you see this issue occurs continously", null, + Utils.getFormattedDate(new Date()), topicName, publisherId, publisherIp, consumerGroup + "/" + clientId, remoteHost); + } + + ErrorResponse getGenericError(String msg) { + return new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE, + DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(), + "Couldn't respond to client, closing cambria consumer" + msg, null, + Utils.getFormattedDate(new Date()), topicName, publisherId, publisherIp, consumerGroup + "/" + clientId, remoteHost); + } + + public static class Builder { + + private String clientId; + private String topicName; + private String consumerGroup; + private String remoteHost; + private String publisherId; + private String publisherIp; + DMaaPErrorMessages errorMessages; + + Builder withErrorMessages(DMaaPErrorMessages errorMessages) { + this.errorMessages = errorMessages; + return this; + } + + Builder withTopic(String topic) { + this.topicName = topic; + return this; + } + + Builder withClient(String client) { + this.clientId = client; + return this; + } + + Builder withConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + return this; + } + + Builder withRemoteHost(String remoteHost) { + this.remoteHost = remoteHost; + return this; + } + + Builder withPublisherId(String publisherId) { + this.publisherId = publisherId; + return this; + } + + Builder withPublisherIp(String publisherIp) { + this.publisherIp = publisherIp; + return this; + } + + public ErrorResponseProvider build() { + Preconditions.checkArgument(errorMessages!=null); + ErrorResponseProvider errRespProvider = new ErrorResponseProvider(); + errRespProvider.errorMessages = this.errorMessages; + errRespProvider.clientId = this.clientId; + errRespProvider.consumerGroup = this.consumerGroup; + errRespProvider.topicName = this.topicName; + errRespProvider.remoteHost = this.remoteHost; + errRespProvider.publisherId = this.publisherId; + errRespProvider.publisherIp = this.publisherIp; + return errRespProvider; + } + } +} diff --git a/src/main/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImpl.java b/src/main/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImpl.java index 11c544f..ec5bfc0 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImpl.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImpl.java @@ -21,34 +21,32 @@ *******************************************************************************/ package org.onap.dmaap.dmf.mr.service.impl; +import com.att.ajsc.filemonitor.AJSCPropertiesMap; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import com.att.nsa.configs.ConfigDbException; +import com.att.nsa.drumlin.service.standards.MimeTypes; +import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; +import com.att.nsa.security.NsaApiKey; +import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException; +import com.att.nsa.util.rrConvertor; import java.io.IOException; import java.io.InputStream; import java.net.InetAddress; import java.net.UnknownHostException; import java.text.SimpleDateFormat; import java.util.ArrayList; -import java.util.Arrays; +import java.util.ConcurrentModificationException; import java.util.Date; -import java.util.HashMap; import java.util.LinkedList; -import java.util.Properties; - import javax.servlet.http.HttpServletRequest; import javax.ws.rs.core.MediaType; - +import org.apache.commons.lang.math.NumberUtils; import org.apache.http.HttpStatus; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.errors.TopicExistsException; import org.json.JSONObject; import org.json.JSONTokener; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.stereotype.Service; - -import com.att.ajsc.filemonitor.AJSCPropertiesMap; import org.onap.dmaap.dmf.mr.CambriaApiException; import org.onap.dmaap.dmf.mr.backends.Consumer; import org.onap.dmaap.dmf.mr.backends.ConsumerFactory; @@ -56,7 +54,6 @@ import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException; import org.onap.dmaap.dmf.mr.backends.MetricsSet; import org.onap.dmaap.dmf.mr.backends.Publisher; import org.onap.dmaap.dmf.mr.backends.Publisher.message; -import org.onap.dmaap.dmf.mr.backends.kafka.KafkaLiveLockAvoider2; import org.onap.dmaap.dmf.mr.beans.DMaaPCambriaLimiter; import org.onap.dmaap.dmf.mr.beans.DMaaPContext; import org.onap.dmaap.dmf.mr.beans.LogDetails; @@ -65,7 +62,6 @@ import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException; import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages; import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode; import org.onap.dmaap.dmf.mr.exception.ErrorResponse; - import org.onap.dmaap.dmf.mr.metabroker.Topic; import org.onap.dmaap.dmf.mr.resources.CambriaEventSet; import org.onap.dmaap.dmf.mr.resources.CambriaOutboundEventStream; @@ -74,15 +70,10 @@ import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl; import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl; import org.onap.dmaap.dmf.mr.service.EventsService; import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder; +import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder.StreamWriter; import org.onap.dmaap.dmf.mr.utils.Utils; -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; -import com.att.nsa.configs.ConfigDbException; -import com.att.nsa.drumlin.service.standards.MimeTypes; -import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; -import com.att.nsa.security.NsaApiKey; -import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException; -import com.att.nsa.util.rrConvertor; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; /** * This class provides the functinality to publish and subscribe message to @@ -93,20 +84,20 @@ import com.att.nsa.util.rrConvertor; */ @Service public class EventsServiceImpl implements EventsService { - // private static final Logger LOG = private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventsServiceImpl.class); - private static final String BATCH_LENGTH = "event.batch.length"; private static final String TRANSFER_ENCODING = "Transfer-Encoding"; + private static final String TIMEOUT_PROPERTY = "timeout"; + private static final String SUBSCRIBE_ACTION = "sub"; + private static final String PUBLISH_ACTION = "pub"; + @Autowired private DMaaPErrorMessages errorMessages; - - //@Autowired - - // @Value("${metrics.send.cambria.topic}") - + String getPropertyFromAJSCmap(String propertyKey) { + return AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, propertyKey); + } public DMaaPErrorMessages getErrorMessages() { return errorMessages; @@ -129,222 +120,91 @@ public class EventsServiceImpl implements EventsService { */ @Override public void getEvents(DMaaPContext ctx, String topic, String consumerGroup, String clientId) - throws ConfigDbException, TopicExistsException, AccessDeniedException, UnavailableException, - CambriaApiException, IOException, DMaaPAccessDeniedException { + throws ConfigDbException, AccessDeniedException, UnavailableException, + CambriaApiException, IOException { + final long startTime = System.currentTimeMillis(); final HttpServletRequest req = ctx.getRequest(); - - boolean isAAFTopic = false; - // was this host blacklisted? - final String remoteAddr = Utils.getRemoteAddress(ctx); - if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) { - - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, - DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), - "Source address [" + remoteAddr + "] is blacklisted. Please contact the cluster management team.", - null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()), - ctx.getRequest().getRemoteHost(), null, null); - LOG.info(errRes.toString()); - throw new CambriaApiException(errRes); - } - - int limit = CambriaConstants.kNoLimit; - if (req.getParameter("limit") != null) { - limit = Integer.parseInt(req.getParameter("limit")); - } - - int timeoutMs = CambriaConstants.kNoTimeout; - String strtimeoutMS = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "timeout"); - if (strtimeoutMS != null) - timeoutMs = Integer.parseInt(strtimeoutMS); - // int timeoutMs = ctx.getConfigReader().getSettings().getInt("timeout", - - if (req.getParameter("timeout") != null) { - timeoutMs = Integer.parseInt(req.getParameter("timeout")); - } - - // By default no filter is applied if filter is not passed as a - // parameter in the request URI - String topicFilter = CambriaConstants.kNoFilter; - if (null != req.getParameter("filter")) { - topicFilter = req.getParameter("filter"); - } - // pretty to print the messaages in new line - String prettyval = "0"; - String strPretty = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "pretty"); - if (null != strPretty) - prettyval = strPretty; - - String metaval = "0"; - String strmeta = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "meta"); - if (null != strmeta) - metaval = strmeta; - - final boolean pretty = rrConvertor.convertToBooleanBroad(prettyval); - // withMeta to print offset along with message - final boolean withMeta = rrConvertor.convertToBooleanBroad(metaval); - final LogWrap logger = new LogWrap(topic, consumerGroup, clientId); - logger.info("fetch: timeout=" + timeoutMs + ", limit=" + limit + ", filter=" + topicFilter + " from Remote host "+ctx.getRequest().getRemoteHost()); + final String remoteHost = req.getRemoteHost(); + ErrorResponseProvider errRespProvider = new ErrorResponseProvider.Builder().withErrorMessages(errorMessages) + .withTopic(topic).withConsumerGroup(consumerGroup).withClient(clientId).withRemoteHost(remoteHost).build(); - // is this user allowed to read this topic? - final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx); - final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic); + validateIpBlacklist(errRespProvider, ctx); - if (metatopic == null) { - // no such topic. - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, - DMaaPResponseCode.RESOURCE_NOT_FOUND.getResponseCode(), - errorMessages.getTopicNotExist() + "-[" + topic + "]", null, Utils.getFormattedDate(new Date()), - topic, null, null, consumerGroup + "/" + clientId, ctx.getRequest().getRemoteHost()); - LOG.info(errRes.toString()); - throw new CambriaApiException(errRes); - } - String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, - "metrics.send.cambria.topic"); - if (null == metricTopicname) - metricTopicname = "msgrtr.apinode.metrics.dmaap"; - - boolean topicNameEnforced = false; - String topicNameStd = null; - topicNameStd = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, - "enforced.topic.name.AAF"); - if (null != topicNameStd && topic.startsWith(topicNameStd)) { - topicNameEnforced = true; + final Topic metaTopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic); + if (metaTopic == null) { + throw new CambriaApiException(errRespProvider.getTopicNotFoundError()); } - if (null == ctx.getRequest().getHeader("Authorization") && !topic.equalsIgnoreCase(metricTopicname)) { - if (null != metatopic.getOwner() && !("".equals(metatopic.getOwner()))) { - // check permissions - metatopic.checkUserRead(user); - } - } - // if headers are not provided then user will be null - if (topicNameEnforced ||(user == null && null != ctx.getRequest().getHeader("Authorization"))) { - // the topic name will be sent by the client - - DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); - String permission = aaf.aafPermissionString(topic, "sub"); - if (!aaf.aafAuthentication(ctx.getRequest(), permission)) { - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, - DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), - errorMessages.getNotPermitted1() + " read " + errorMessages.getNotPermitted2() + topic + " on " - + permission, - null, Utils.getFormattedDate(new Date()), topic, null, null, consumerGroup + "/" + clientId, - ctx.getRequest().getRemoteHost()); - LOG.info(errRes.toString()); - throw new DMaaPAccessDeniedException(errRes); + boolean isAAFTopic = authorizeClientWhenNeeded(ctx, metaTopic, topic, errRespProvider, SUBSCRIBE_ACTION); - } - isAAFTopic = true; - } final long elapsedMs1 = System.currentTimeMillis() - startTime; logger.info("Time taken in getEvents Authorization " + elapsedMs1 + " ms for " + topic + " " + consumerGroup + " " + clientId); - Consumer c = null; - - String lhostId = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, - "clusterhostid"); - if (null == lhostId) { - try { - lhostId = InetAddress.getLocalHost().getCanonicalHostName(); - } catch (UnknownHostException e) { - LOG.info("Unknown Host Exception error occured while getting getting hostid"); - } - } - CambriaOutboundEventStream coes = null; + verifyHostId(); + final boolean pretty = isPrettyPrintEnabled(); + final boolean withMeta = isMetaOffsetEnabled(); + int timeoutMs = getMessageTimeout(req); + int limit = getMessageLimit(req); + String topicFilter = (null != req.getParameter("filter")) ? req.getParameter("filter") : CambriaConstants.kNoFilter; + logger.info("fetch: timeout=" + timeoutMs + ", limit=" + limit + ", filter=" + topicFilter + " from Remote host "+ctx.getRequest().getRemoteHost()); + + Consumer consumer = null; try { final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics(); final DMaaPCambriaLimiter rl = ctx.getConfigReader().getfRateLimiter(); - rl.onCall(topic, consumerGroup, clientId, ctx.getRequest().getRemoteHost()); - c = ctx.getConfigReader().getfConsumerFactory().getConsumerFor(topic, consumerGroup, clientId, timeoutMs, - ctx.getRequest().getRemoteHost()); - coes = new CambriaOutboundEventStream.Builder(c).timeout(timeoutMs) + rl.onCall(topic, consumerGroup, clientId, remoteHost); + consumer = ctx.getConfigReader().getfConsumerFactory().getConsumerFor(topic, consumerGroup, clientId, timeoutMs, + remoteHost); + CambriaOutboundEventStream coes = new CambriaOutboundEventStream.Builder(consumer).timeout(timeoutMs) .limit(limit).filter(topicFilter).pretty(pretty).withMeta(withMeta).build(); coes.setDmaapContext(ctx); - coes.setTopic(metatopic); - if (isTransEnabled() || isAAFTopic) { - coes.setTransEnabled(true); - } else { - coes.setTransEnabled(false); - } + coes.setTopic(metaTopic); + coes.setTransEnabled(isTransEnabled() || isAAFTopic); coes.setTopicStyle(isAAFTopic); final long elapsedMs2 = System.currentTimeMillis() - startTime; logger.info("Time taken in getEvents getConsumerFor " + elapsedMs2 + " ms for " + topic + " " + consumerGroup + " " + clientId); - DMaaPResponseBuilder.setNoCacheHeadings(ctx); - - DMaaPResponseBuilder.respondOkWithStream(ctx, MediaType.APPLICATION_JSON, coes); + respondOkWithStream(ctx, coes); // No IOException thrown during respondOkWithStream, so commit the // new offsets to all the brokers - c.commitOffsets(); + consumer.commitOffsets(); final int sent = coes.getSentCount(); - - metricsSet.consumeTick(sent); - rl.onSend(topic, consumerGroup, clientId, sent); + metricsSet.consumeTick(sent); + rl.onSend(topic, consumerGroup, clientId, sent); final long elapsedMs = System.currentTimeMillis() - startTime; - logger.info("Sent " + sent + " msgs in " + elapsedMs + " ms; committed to offset " + c.getOffset() + " for " + logger.info("Sent " + sent + " msgs in " + elapsedMs + " ms; committed to offset " + consumer.getOffset() + " for " + topic + " " + consumerGroup + " " + clientId + " on to the server " - + ctx.getRequest().getRemoteHost()); + + remoteHost); } catch (UnavailableException excp) { logger.warn(excp.getMessage(), excp); - - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE, - DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(), - errorMessages.getServerUnav() + excp.getMessage(), null, Utils.getFormattedDate(new Date()), topic, - null, null, consumerGroup + "-" + clientId, ctx.getRequest().getRemoteHost()); + ErrorResponse errRes = errRespProvider.getServiceUnavailableError(excp.getMessage()); LOG.info(errRes.toString()); throw new CambriaApiException(errRes); - } catch (java.util.ConcurrentModificationException excp1) { - LOG.info(excp1.getMessage() + "on " + topic + " " + consumerGroup + " ****** " + clientId + " from Remote"+ctx.getRequest().getRemoteHost()); - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT, - DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(), - "Couldn't respond to client, possible of consumer requests from more than one server. Please contact MR team if you see this issue occurs continously", null, - Utils.getFormattedDate(new Date()), topic, null, null, clientId, ctx.getRequest().getRemoteHost()); + } catch (ConcurrentModificationException excp1) { + LOG.info(excp1.getMessage() + "on " + topic + " " + consumerGroup + " ****** " + clientId + " from Remote"+remoteHost); + ErrorResponse errRes = errRespProvider.getConcurrentModificationError(); logger.info(errRes.toString()); throw new CambriaApiException(errRes); - } catch (CambriaApiException excp) { - LOG.info(excp.getMessage() + "on " + topic + " " + consumerGroup + " ****** " + clientId); - - throw excp; - } - catch (Exception excp) { - // System.out.println(excp + "------------------ " + topic+" - // "+consumerGroup+" "+clientId); - + } catch (Exception excp) { logger.info("Couldn't respond to client, closing cambria consumer " + " " + topic + " " + consumerGroup + " " + clientId + " " + HttpStatus.SC_SERVICE_UNAVAILABLE + " ****** " + excp); - - ctx.getConfigReader().getfConsumerFactory().destroyConsumer(topic, consumerGroup, clientId); - - - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE, - DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(), - "Couldn't respond to client, closing cambria consumer" + excp.getMessage(), null, - Utils.getFormattedDate(new Date()), topic, null, null, clientId, ctx.getRequest().getRemoteHost()); + ctx.getConfigReader().getfConsumerFactory().destroyConsumer(topic, consumerGroup, clientId); + ErrorResponse errRes = errRespProvider.getGenericError(excp.getMessage()); logger.info(errRes.toString()); throw new CambriaApiException(errRes); } finally { - coes = null; - // If no cache, close the consumer now that we're done with it. - boolean kSetting_EnableCache = ConsumerFactory.kDefault_IsCacheEnabled; - String strkSetting_EnableCache = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, - ConsumerFactory.kSetting_EnableCache); - if (null != strkSetting_EnableCache) - kSetting_EnableCache = Boolean.parseBoolean(strkSetting_EnableCache); - // if - // (!ctx.getConfigReader().getSettings().getBoolean(ConsumerFactory.kSetting_EnableCache, - // ConsumerFactory.kDefault_IsCacheEnabled) && (c != null)) { - if (!kSetting_EnableCache && (c != null)) { + if (consumer != null && !isCacheEnabled()) { try { - c.close(); + consumer.close(); } catch (Exception e) { - logger.info("***Exception occured in getEvents finaly block while closing the consumer " + " " + logger.info("***Exception occurred in getEvents finally block while closing the consumer " + " " + topic + " " + consumerGroup + " " + clientId + " " + HttpStatus.SC_SERVICE_UNAVAILABLE + " " + e); } @@ -352,117 +212,155 @@ public class EventsServiceImpl implements EventsService { } } - /** - * @throws missingReqdSetting - * - */ - @Override - public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition, - final String requestTime) throws ConfigDbException, AccessDeniedException, TopicExistsException, - CambriaApiException, IOException, missingReqdSetting, DMaaPAccessDeniedException { - - // is this user allowed to write to this topic? - final long startMs = System.currentTimeMillis(); - final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx); - final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic); - boolean isAAFTopic = false; - - // was this host blacklisted? + private void validateIpBlacklist(ErrorResponseProvider errResponseProvider, DMaaPContext ctx) throws CambriaApiException { final String remoteAddr = Utils.getRemoteAddress(ctx); - if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) { - - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, - DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), - "Source address [" + remoteAddr + "] is blacklisted. Please contact the cluster management team.", - null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()), - ctx.getRequest().getRemoteHost(), null, null); + ErrorResponse errRes = errResponseProvider.getIpBlacklistedError(remoteAddr); LOG.info(errRes.toString()); throw new CambriaApiException(errRes); } + } - String topicNameStd = null; + private boolean authorizeClientWhenNeeded(DMaaPContext ctx, Topic metaTopic, String topicName, + ErrorResponseProvider errRespProvider, String action) throws CambriaApiException, AccessDeniedException { - // topicNameStd= - - topicNameStd = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, - "enforced.topic.name.AAF"); - String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, - "metrics.send.cambria.topic"); - if (null == metricTopicname) - metricTopicname = "msgrtr.apinode.metrics.dmaap"; - boolean topicNameEnforced = false; - if (null != topicNameStd && topic.startsWith(topicNameStd)) { - topicNameEnforced = true; + boolean isAAFTopic = false; + String metricTopicName = getMetricTopicName(); + if(!metricTopicName.equalsIgnoreCase(topicName)) { + if(isCadiEnabled() && isTopicNameEnforcedAaf(topicName)) { + isAAFTopic = true; + DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); + String permission = aaf.aafPermissionString(topicName, action); + if (!aaf.aafAuthentication(ctx.getRequest(), permission)) { + ErrorResponse errRes = errRespProvider.getAafAuthorizationError(permission, action); + LOG.info(errRes.toString()); + throw new DMaaPAccessDeniedException(errRes); + + } + } else if( null != metaTopic.getOwner() && !metaTopic.getOwner().isEmpty()) { + final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx); + if(SUBSCRIBE_ACTION.equals(action)) { + metaTopic.checkUserRead(user); + } else if(PUBLISH_ACTION.equals(action)) { + metaTopic.checkUserWrite(user); + } + } } + return isAAFTopic; + } + + boolean isCadiEnabled() { + return Utils.isCadiEnabled(); + } - // Here check if the user has rights to publish on the topic - // ( This will be called when no auth is added or when UEB API Key - // Authentication is used) - // checkUserWrite(user) method will throw an error when there is no Auth - // header added or when the - // user has no publish rights + void respondOkWithStream(DMaaPContext ctx, StreamWriter coes) throws IOException{ + DMaaPResponseBuilder.setNoCacheHeadings(ctx); + DMaaPResponseBuilder.respondOkWithStream(ctx, MediaType.APPLICATION_JSON, coes); + } - if (null != metatopic && null != metatopic.getOwner() && !("".equals(metatopic.getOwner())) - && null == ctx.getRequest().getHeader("Authorization") && !topic.equalsIgnoreCase(metricTopicname)) { - metatopic.checkUserWrite(user); - } + private int getMessageLimit(HttpServletRequest request) { + return NumberUtils.toInt(request.getParameter("limit"), CambriaConstants.kNoLimit); + } - // if headers are not provided then user will be null - if (topicNameEnforced || (user == null && null != ctx.getRequest().getHeader("Authorization") - && !topic.equalsIgnoreCase(metricTopicname))) { - // the topic name will be sent by the client - - DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); - String permission = aaf.aafPermissionString(topic, "pub"); - if (!aaf.aafAuthentication(ctx.getRequest(), permission)) { - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, - DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), - errorMessages.getNotPermitted1() + " publish " + errorMessages.getNotPermitted2() + topic - + " on " + permission, - null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()), - ctx.getRequest().getRemoteHost(), null, null); - LOG.info(errRes.toString()); - throw new DMaaPAccessDeniedException(errRes); + private int getMessageTimeout(HttpServletRequest request) { + String timeoutMsAsString = getPropertyFromAJSCmap(TIMEOUT_PROPERTY); + int defaultTimeoutMs = timeoutMsAsString!=null ? NumberUtils.toInt(timeoutMsAsString, CambriaConstants.kNoTimeout) : + CambriaConstants.kNoTimeout; + + String timeoutProperty = request.getParameter(TIMEOUT_PROPERTY); + return timeoutProperty != null ? NumberUtils.toInt(timeoutProperty, defaultTimeoutMs) : defaultTimeoutMs; + } + + private boolean isPrettyPrintEnabled() { + return rrConvertor.convertToBooleanBroad(getPropertyFromAJSCmap("pretty")); + } + + private boolean isMetaOffsetEnabled() { + return rrConvertor.convertToBooleanBroad(getPropertyFromAJSCmap( "meta")); + } + + private boolean isTopicNameEnforcedAaf(String topicName) { + String topicNameStd = getPropertyFromAJSCmap("enforced.topic.name.AAF"); + return !topicNameStd.isEmpty() && topicName.startsWith(topicNameStd); + } + + private boolean isCacheEnabled() { + String cachePropsSetting = getPropertyFromAJSCmap(ConsumerFactory.kSetting_EnableCache); + return !cachePropsSetting.isEmpty() ? Boolean.parseBoolean(cachePropsSetting) : ConsumerFactory.kDefault_IsCacheEnabled; + } + + private void verifyHostId() { + String lhostId = getPropertyFromAJSCmap("clusterhostid"); + if (lhostId.isEmpty()) { + try { + InetAddress.getLocalHost().getCanonicalHostName(); + } catch (UnknownHostException e) { + LOG.warn("Unknown Host Exception error occurred while getting getting hostid", e); } - isAAFTopic = true; + } + } - final HttpServletRequest req = ctx.getRequest(); + private String getMetricTopicName() { + String metricTopicFromProps = getPropertyFromAJSCmap("metrics.send.cambria.topic"); + return !metricTopicFromProps.isEmpty() ? metricTopicFromProps : "msgrtr.apinode.metrics.dmaap"; + } - // check for chunked input - boolean chunked = false; - if (null != req.getHeader(TRANSFER_ENCODING)) { - chunked = req.getHeader(TRANSFER_ENCODING).contains("chunked"); - } - // get the media type, or set it to a generic value if it wasn't - // provided - String mediaType = req.getContentType(); - if (mediaType == null || mediaType.length() == 0) { - mediaType = MimeTypes.kAppGenericBinary; - } + /** + * @throws missingReqdSetting + * + */ + @Override + public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition, + final String requestTime) throws ConfigDbException, AccessDeniedException, + CambriaApiException, IOException, missingReqdSetting { - if (mediaType.contains("charset=UTF-8")) { - mediaType = mediaType.replace("; charset=UTF-8", "").trim(); - } + final long startMs = System.currentTimeMillis(); + String remoteHost = ctx.getRequest().getRemoteHost(); + ErrorResponseProvider errRespProvider = new ErrorResponseProvider.Builder().withErrorMessages(errorMessages) + .withTopic(topic).withRemoteHost(remoteHost).withPublisherIp(remoteHost) + .withPublisherId(Utils.getUserApiKey(ctx.getRequest())).build(); - String istransidUEBtopicreqd = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, - "transidUEBtopicreqd"); - boolean istransidreqd = false; - if (null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true")) { - istransidreqd = true; + validateIpBlacklist(errRespProvider, ctx); + + final Topic metaTopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic); + if (metaTopic == null) { + throw new CambriaApiException(errRespProvider.getTopicNotFoundError()); } - if (isAAFTopic || istransidreqd) { + final boolean isAAFTopic = authorizeClientWhenNeeded(ctx, metaTopic, topic, errRespProvider, PUBLISH_ACTION); + + final HttpServletRequest req = ctx.getRequest(); + boolean chunked = isRequestedChunk(req); + String mediaType = getMediaType(req); + boolean transactionRequired = isTransactionIdRequired(); + + if (isAAFTopic || transactionRequired) { pushEventsWithTransaction(ctx, msg, topic, defaultPartition, requestTime, chunked, mediaType); } else { pushEvents(ctx, topic, msg, defaultPartition, chunked, mediaType); } + final long endMs = System.currentTimeMillis(); final long totalMs = endMs - startMs; - LOG.info("Overall Response time - Published " + " msgs in " + totalMs + " ms for topic " + topic); + } + + private boolean isRequestedChunk(HttpServletRequest request) { + return null != request.getHeader(TRANSFER_ENCODING) && + request.getHeader(TRANSFER_ENCODING).contains("chunked"); + } + + private String getMediaType(HttpServletRequest request) { + String mediaType = request.getContentType(); + if (mediaType == null || mediaType.length() == 0) { + return MimeTypes.kAppGenericBinary; + } + return mediaType.replace("; charset=UTF-8", "").trim(); + } + private boolean isTransactionIdRequired() { + return getPropertyFromAJSCmap("transidUEBtopicreqd").equalsIgnoreCase("true"); } /** @@ -481,7 +379,7 @@ public class EventsServiceImpl implements EventsService { */ private void pushEvents(DMaaPContext ctx, String topic, InputStream msg, String defaultPartition, boolean chunked, String mediaType) - throws ConfigDbException, AccessDeniedException, TopicExistsException, CambriaApiException, IOException { + throws ConfigDbException, AccessDeniedException, CambriaApiException, IOException { final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics(); // setup the event set final CambriaEventSet events = new CambriaEventSet(mediaType, msg, chunked, defaultPartition); @@ -490,8 +388,8 @@ public class EventsServiceImpl implements EventsService { final long startMs = System.currentTimeMillis(); long count = 0; long maxEventBatch = 1024L* 16; - String batchlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH); - if (null != batchlen) + String batchlen = getPropertyFromAJSCmap( BATCH_LENGTH); + if (null != batchlen && !batchlen.isEmpty()) maxEventBatch = Long.parseLong(batchlen); // long maxEventBatch = @@ -550,7 +448,7 @@ public class EventsServiceImpl implements EventsService { final JSONObject response = new JSONObject(); response.put("count", count); response.put("serverTimeMs", totalMs); - DMaaPResponseBuilder.respondOk(ctx, response); + respondOk(ctx, response); } catch (Exception excp) { int status = HttpStatus.SC_NOT_FOUND; @@ -590,7 +488,7 @@ public class EventsServiceImpl implements EventsService { */ private void pushEventsWithTransaction(DMaaPContext ctx, InputStream inputStream, final String topic, final String partitionKey, final String requestTime, final boolean chunked, final String mediaType) - throws ConfigDbException, AccessDeniedException, TopicExistsException, IOException, CambriaApiException { + throws ConfigDbException, AccessDeniedException, IOException, CambriaApiException { final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics(); @@ -601,8 +499,8 @@ public class EventsServiceImpl implements EventsService { final long startMs = System.currentTimeMillis(); long count = 0; long maxEventBatch = 1024L * 16; - String evenlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH); - if (null != evenlen) + String evenlen = getPropertyFromAJSCmap( BATCH_LENGTH); + if (null != evenlen && !evenlen.isEmpty()) maxEventBatch = Long.parseLong(evenlen); // final long maxEventBatch = @@ -639,9 +537,9 @@ public class EventsServiceImpl implements EventsService { responseTransactionId = m.getLogDetails().getTransactionId(); - JSONObject jsonObject = new JSONObject(); - jsonObject.put("msgWrapMR", m.getMessage()); - jsonObject.put("transactionId", responseTransactionId); + //JSONObject jsonObject = new JSONObject(); + //jsonObject.put("msgWrapMR", m.getMessage()); + //jsonObject.put("transactionId", responseTransactionId); // final KeyedMessage<String, String> data = new // KeyedMessage<String, String>(topic, m.getKey(), @@ -755,8 +653,9 @@ public class EventsServiceImpl implements EventsService { // build a response final JSONObject response = new JSONObject(); response.put("count", count); + response.put("transactionId", responseTransactionId); response.put("serverTimeMs", totalMs); - DMaaPResponseBuilder.respondOk(ctx, response); + respondOk(ctx, response); } catch (Exception excp) { int status = HttpStatus.SC_NOT_FOUND; @@ -798,6 +697,10 @@ public class EventsServiceImpl implements EventsService { msg.setLogDetails(logDetails); } + void respondOk(DMaaPContext ctx, JSONObject response) throws IOException { + DMaaPResponseBuilder.respondOk(ctx, response); + } + /** * * @author anowarul.islam @@ -837,8 +740,7 @@ public class EventsServiceImpl implements EventsService { } public boolean isTransEnabled() { - String istransidUEBtopicreqd = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, - "transidUEBtopicreqd"); + String istransidUEBtopicreqd = getPropertyFromAJSCmap("transidUEBtopicreqd"); boolean istransidreqd = false; if ((null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true"))) { istransidreqd = true; @@ -863,13 +765,5 @@ public class EventsServiceImpl implements EventsService { return logDetails; } - - - - - - - - }
\ No newline at end of file diff --git a/src/main/java/org/onap/dmaap/dmf/mr/utils/Utils.java b/src/main/java/org/onap/dmaap/dmf/mr/utils/Utils.java index 3048251..40e6840 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/utils/Utils.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/utils/Utils.java @@ -23,6 +23,7 @@ package org.onap.dmaap.dmf.mr.utils; import java.io.IOException; import java.io.InputStream; +import java.security.Principal; import java.text.DecimalFormat; import java.text.SimpleDateFormat; import java.util.Date; @@ -46,7 +47,9 @@ public class Utils { private static final String DATE_FORMAT = "dd-MM-yyyy::hh:mm:ss:SSS"; public static final String CAMBRIA_AUTH_HEADER = "X-CambriaAuth"; + private static final String AUTH_HEADER = "Authorization"; private static final String BATCH_ID_FORMAT = "000000"; + private static final String X509_ATTR = "javax.servlet.request.X509Certificate"; private static final EELFLogger log = EELFManager.getInstance().getLogger(Utils.class); private Utils() { @@ -75,15 +78,21 @@ public class Utils { if (null != auth) { final String[] splittedAuthKey = auth.split(":"); return splittedAuthKey[0]; - }else if (null!=request.getHeader("Authorization")){ + }else if (null != request.getHeader(AUTH_HEADER) || null != request.getAttribute(X509_ATTR)){ /** * AAF implementation enhancement */ - String user= request.getUserPrincipal().getName().toString(); - return user.substring(0, user.lastIndexOf("@")); + Principal principal = request.getUserPrincipal(); + if(principal != null){ + String name = principal.getName(); + return name.substring(0, name.lastIndexOf('@')); + } + log.warn("No principal has been provided on HTTP request"); } return null; } + + /** * to format the batch sequence id * @param batchId |