diff options
Diffstat (limited to 'src')
13 files changed, 1133 insertions, 746 deletions
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/exception/DMaaPErrorMessages.java b/src/main/java/org/onap/dmaap/dmf/mr/exception/DMaaPErrorMessages.java index 7b68b42..95b0577 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/exception/DMaaPErrorMessages.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/exception/DMaaPErrorMessages.java @@ -58,10 +58,10 @@ public class DMaaPErrorMessages { private String topicsfailure="Failed to retrieve list of all topics."; //@Value("${not.permitted.access.1}") - private String notPermitted1="Access Denied.User does not have permission to perform"; + private String notPermitted1="Access Denied.User does not have permission to perform "; //@Value("${not.permitted.access.2}") - private String notPermitted2="operation on Topic:"; + private String notPermitted2=" operation on Topic:"; //@Value("${get.topic.details.failure}") private String topicDetailsFail="Failed to retrieve details of topic:"; diff --git a/src/main/java/org/onap/dmaap/dmf/mr/resources/CambriaOutboundEventStream.java b/src/main/java/org/onap/dmaap/dmf/mr/resources/CambriaOutboundEventStream.java index 537fc22..2f7f1db 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/resources/CambriaOutboundEventStream.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/resources/CambriaOutboundEventStream.java @@ -548,7 +548,35 @@ public class CambriaOutboundEventStream implements StreamWriter { private ArrayList<Consumer> fKafkaConsumerList; private boolean istransType = true; // private static final Logger log = - + private static final EELFLogger log = EELFManager.getInstance().getLogger(CambriaOutboundEventStream.class); + + public int getfLimit() { + return fLimit; + } + + public int getfTimeoutMs() { + return fTimeoutMs; + } + + public boolean isfPretty() { + return fPretty; + } + + public boolean isfWithMeta() { + return fWithMeta; + } + + public boolean isAAFTopic() { + return isAAFTopic; + } + + public boolean isIstransEnable() { + return istransEnable; + } + + public boolean isIstransType() { + return istransType; + } }
\ No newline at end of file diff --git a/src/main/java/org/onap/dmaap/dmf/mr/security/DMaaPAAFAuthenticator.java b/src/main/java/org/onap/dmaap/dmf/mr/security/DMaaPAAFAuthenticator.java index a7f2376..12f0465 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/security/DMaaPAAFAuthenticator.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/security/DMaaPAAFAuthenticator.java @@ -22,12 +22,8 @@ package org.onap.dmaap.dmf.mr.security; import javax.servlet.http.HttpServletRequest; - import org.onap.dmaap.dmf.mr.CambriaApiException; - - - /** * * @author sneha.d.desai diff --git a/src/main/java/org/onap/dmaap/dmf/mr/security/DMaaPAAFAuthenticatorImpl.java b/src/main/java/org/onap/dmaap/dmf/mr/security/DMaaPAAFAuthenticatorImpl.java index 25644a7..0e6b0f6 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/security/DMaaPAAFAuthenticatorImpl.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/security/DMaaPAAFAuthenticatorImpl.java @@ -21,6 +21,7 @@ *******************************************************************************/ package org.onap.dmaap.dmf.mr.security; +import com.att.ajsc.filemonitor.AJSCPropertiesMap; import javax.servlet.http.HttpServletRequest; import org.onap.dmaap.dmf.mr.CambriaApiException; @@ -34,47 +35,37 @@ import org.onap.dmaap.dmf.mr.constants.CambriaConstants; */ public class DMaaPAAFAuthenticatorImpl implements DMaaPAAFAuthenticator { + private static final String NAMESPACE_PROPERTY = "defaultNSforUEB"; + private static final String DEFAULT_NAMESPACE = "org.onap.dmaap.mr"; + private static final String NAMESPACE_PREFIX = "org.onap"; + /** * @param req * @param role */ @Override public boolean aafAuthentication(HttpServletRequest req, String role) { - boolean auth = false; - if(req.isUserInRole(role)) - { - - auth = true; - } - - return auth; + return req.isUserInRole(role); } @Override public String aafPermissionString(String topicName, String action) throws CambriaApiException { - - - String permission = ""; - String nameSpace =""; - if(topicName.contains(".") && topicName.contains("org.onap")) { - - nameSpace = topicName.substring(0,topicName.lastIndexOf(".")); - } - else { - nameSpace = null; - nameSpace= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"defaultNSforUEB"); - - if(null==nameSpace)nameSpace="org.onap.dmaap.mr"; - - - - } - - permission = nameSpace+".topic|:topic."+topicName+"|"+action; - return permission; - + + String nameSpace = topicName.startsWith(NAMESPACE_PREFIX) ? parseNamespace(topicName) : + readNamespaceFromProperties(); + + nameSpace = !nameSpace.isEmpty()? nameSpace : DEFAULT_NAMESPACE; + + return new StringBuilder(nameSpace).append(".topic|:topic.").append(topicName) + .append("|").append(action).toString(); + } + + String readNamespaceFromProperties() { + return AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,NAMESPACE_PROPERTY); + } + + private String parseNamespace(String topicName) { + return topicName.substring(0,topicName.lastIndexOf('.')); } - - } diff --git a/src/main/java/org/onap/dmaap/dmf/mr/service/impl/ErrorResponseProvider.java b/src/main/java/org/onap/dmaap/dmf/mr/service/impl/ErrorResponseProvider.java new file mode 100644 index 0000000..50a611e --- /dev/null +++ b/src/main/java/org/onap/dmaap/dmf/mr/service/impl/ErrorResponseProvider.java @@ -0,0 +1,147 @@ +/******************************************************************************* + * ============LICENSE_START=================================================== + * org.onap.dmaap + * ============================================================================ + * Copyright © 2019 Nokia Intellectual Property. All rights reserved. + * ============================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================================== + ******************************************************************************/ +package org.onap.dmaap.dmf.mr.service.impl; + +import com.google.common.base.Preconditions; +import java.util.Date; +import org.apache.http.HttpStatus; +import org.onap.dmaap.dmf.mr.beans.DMaaPContext; +import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages; +import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode; +import org.onap.dmaap.dmf.mr.exception.ErrorResponse; +import org.onap.dmaap.dmf.mr.utils.Utils; + +class ErrorResponseProvider { + + private String clientId; + private String topicName; + private String consumerGroup; + private String remoteHost; + private String publisherId; + private String publisherIp; + private DMaaPErrorMessages errorMessages; + + private ErrorResponseProvider() { + + } + + ErrorResponse getIpBlacklistedError(String remoteAddr) { + return new ErrorResponse(HttpStatus.SC_FORBIDDEN, + DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), + "Source address [" + remoteAddr + "] is blacklisted. Please contact the cluster management team.", + null, Utils.getFormattedDate(new Date()), topicName, publisherId, + publisherIp, consumerGroup + "/" + clientId, remoteHost); + } + + ErrorResponse getTopicNotFoundError() { + return new ErrorResponse(HttpStatus.SC_NOT_FOUND, + DMaaPResponseCode.RESOURCE_NOT_FOUND.getResponseCode(), + errorMessages.getTopicNotExist() + "-[" + topicName + "]", null, Utils.getFormattedDate(new Date()), + topicName, publisherId, publisherIp, consumerGroup + "/" + clientId, remoteHost); + } + + ErrorResponse getAafAuthorizationError(String permission, String action) { + return new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, + DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), + errorMessages.getNotPermitted1() + action + errorMessages.getNotPermitted2() + topicName + " on " + + permission, + null, Utils.getFormattedDate(new Date()), topicName, publisherId, publisherIp, consumerGroup + "/" + clientId, + remoteHost); + } + + ErrorResponse getServiceUnavailableError(String msg) { + return new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE, + DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(), + errorMessages.getServerUnav() + msg, null, Utils.getFormattedDate(new Date()), topicName, + publisherId, publisherIp, consumerGroup + "/" + clientId, remoteHost); + } + + ErrorResponse getConcurrentModificationError() { + return new ErrorResponse(HttpStatus.SC_CONFLICT, + DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(), + "Couldn't respond to client, possible of consumer requests from more than one server. Please contact MR team if you see this issue occurs continously", null, + Utils.getFormattedDate(new Date()), topicName, publisherId, publisherIp, consumerGroup + "/" + clientId, remoteHost); + } + + ErrorResponse getGenericError(String msg) { + return new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE, + DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(), + "Couldn't respond to client, closing cambria consumer" + msg, null, + Utils.getFormattedDate(new Date()), topicName, publisherId, publisherIp, consumerGroup + "/" + clientId, remoteHost); + } + + public static class Builder { + + private String clientId; + private String topicName; + private String consumerGroup; + private String remoteHost; + private String publisherId; + private String publisherIp; + DMaaPErrorMessages errorMessages; + + Builder withErrorMessages(DMaaPErrorMessages errorMessages) { + this.errorMessages = errorMessages; + return this; + } + + Builder withTopic(String topic) { + this.topicName = topic; + return this; + } + + Builder withClient(String client) { + this.clientId = client; + return this; + } + + Builder withConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + return this; + } + + Builder withRemoteHost(String remoteHost) { + this.remoteHost = remoteHost; + return this; + } + + Builder withPublisherId(String publisherId) { + this.publisherId = publisherId; + return this; + } + + Builder withPublisherIp(String publisherIp) { + this.publisherIp = publisherIp; + return this; + } + + public ErrorResponseProvider build() { + Preconditions.checkArgument(errorMessages!=null); + ErrorResponseProvider errRespProvider = new ErrorResponseProvider(); + errRespProvider.errorMessages = this.errorMessages; + errRespProvider.clientId = this.clientId; + errRespProvider.consumerGroup = this.consumerGroup; + errRespProvider.topicName = this.topicName; + errRespProvider.remoteHost = this.remoteHost; + errRespProvider.publisherId = this.publisherId; + errRespProvider.publisherIp = this.publisherIp; + return errRespProvider; + } + } +} diff --git a/src/main/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImpl.java b/src/main/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImpl.java index 11c544f..ec5bfc0 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImpl.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImpl.java @@ -21,34 +21,32 @@ *******************************************************************************/ package org.onap.dmaap.dmf.mr.service.impl; +import com.att.ajsc.filemonitor.AJSCPropertiesMap; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import com.att.nsa.configs.ConfigDbException; +import com.att.nsa.drumlin.service.standards.MimeTypes; +import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; +import com.att.nsa.security.NsaApiKey; +import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException; +import com.att.nsa.util.rrConvertor; import java.io.IOException; import java.io.InputStream; import java.net.InetAddress; import java.net.UnknownHostException; import java.text.SimpleDateFormat; import java.util.ArrayList; -import java.util.Arrays; +import java.util.ConcurrentModificationException; import java.util.Date; -import java.util.HashMap; import java.util.LinkedList; -import java.util.Properties; - import javax.servlet.http.HttpServletRequest; import javax.ws.rs.core.MediaType; - +import org.apache.commons.lang.math.NumberUtils; import org.apache.http.HttpStatus; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.errors.TopicExistsException; import org.json.JSONObject; import org.json.JSONTokener; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.stereotype.Service; - -import com.att.ajsc.filemonitor.AJSCPropertiesMap; import org.onap.dmaap.dmf.mr.CambriaApiException; import org.onap.dmaap.dmf.mr.backends.Consumer; import org.onap.dmaap.dmf.mr.backends.ConsumerFactory; @@ -56,7 +54,6 @@ import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException; import org.onap.dmaap.dmf.mr.backends.MetricsSet; import org.onap.dmaap.dmf.mr.backends.Publisher; import org.onap.dmaap.dmf.mr.backends.Publisher.message; -import org.onap.dmaap.dmf.mr.backends.kafka.KafkaLiveLockAvoider2; import org.onap.dmaap.dmf.mr.beans.DMaaPCambriaLimiter; import org.onap.dmaap.dmf.mr.beans.DMaaPContext; import org.onap.dmaap.dmf.mr.beans.LogDetails; @@ -65,7 +62,6 @@ import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException; import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages; import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode; import org.onap.dmaap.dmf.mr.exception.ErrorResponse; - import org.onap.dmaap.dmf.mr.metabroker.Topic; import org.onap.dmaap.dmf.mr.resources.CambriaEventSet; import org.onap.dmaap.dmf.mr.resources.CambriaOutboundEventStream; @@ -74,15 +70,10 @@ import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl; import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl; import org.onap.dmaap.dmf.mr.service.EventsService; import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder; +import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder.StreamWriter; import org.onap.dmaap.dmf.mr.utils.Utils; -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; -import com.att.nsa.configs.ConfigDbException; -import com.att.nsa.drumlin.service.standards.MimeTypes; -import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; -import com.att.nsa.security.NsaApiKey; -import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException; -import com.att.nsa.util.rrConvertor; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; /** * This class provides the functinality to publish and subscribe message to @@ -93,20 +84,20 @@ import com.att.nsa.util.rrConvertor; */ @Service public class EventsServiceImpl implements EventsService { - // private static final Logger LOG = private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventsServiceImpl.class); - private static final String BATCH_LENGTH = "event.batch.length"; private static final String TRANSFER_ENCODING = "Transfer-Encoding"; + private static final String TIMEOUT_PROPERTY = "timeout"; + private static final String SUBSCRIBE_ACTION = "sub"; + private static final String PUBLISH_ACTION = "pub"; + @Autowired private DMaaPErrorMessages errorMessages; - - //@Autowired - - // @Value("${metrics.send.cambria.topic}") - + String getPropertyFromAJSCmap(String propertyKey) { + return AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, propertyKey); + } public DMaaPErrorMessages getErrorMessages() { return errorMessages; @@ -129,222 +120,91 @@ public class EventsServiceImpl implements EventsService { */ @Override public void getEvents(DMaaPContext ctx, String topic, String consumerGroup, String clientId) - throws ConfigDbException, TopicExistsException, AccessDeniedException, UnavailableException, - CambriaApiException, IOException, DMaaPAccessDeniedException { + throws ConfigDbException, AccessDeniedException, UnavailableException, + CambriaApiException, IOException { + final long startTime = System.currentTimeMillis(); final HttpServletRequest req = ctx.getRequest(); - - boolean isAAFTopic = false; - // was this host blacklisted? - final String remoteAddr = Utils.getRemoteAddress(ctx); - if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) { - - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, - DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), - "Source address [" + remoteAddr + "] is blacklisted. Please contact the cluster management team.", - null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()), - ctx.getRequest().getRemoteHost(), null, null); - LOG.info(errRes.toString()); - throw new CambriaApiException(errRes); - } - - int limit = CambriaConstants.kNoLimit; - if (req.getParameter("limit") != null) { - limit = Integer.parseInt(req.getParameter("limit")); - } - - int timeoutMs = CambriaConstants.kNoTimeout; - String strtimeoutMS = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "timeout"); - if (strtimeoutMS != null) - timeoutMs = Integer.parseInt(strtimeoutMS); - // int timeoutMs = ctx.getConfigReader().getSettings().getInt("timeout", - - if (req.getParameter("timeout") != null) { - timeoutMs = Integer.parseInt(req.getParameter("timeout")); - } - - // By default no filter is applied if filter is not passed as a - // parameter in the request URI - String topicFilter = CambriaConstants.kNoFilter; - if (null != req.getParameter("filter")) { - topicFilter = req.getParameter("filter"); - } - // pretty to print the messaages in new line - String prettyval = "0"; - String strPretty = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "pretty"); - if (null != strPretty) - prettyval = strPretty; - - String metaval = "0"; - String strmeta = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "meta"); - if (null != strmeta) - metaval = strmeta; - - final boolean pretty = rrConvertor.convertToBooleanBroad(prettyval); - // withMeta to print offset along with message - final boolean withMeta = rrConvertor.convertToBooleanBroad(metaval); - final LogWrap logger = new LogWrap(topic, consumerGroup, clientId); - logger.info("fetch: timeout=" + timeoutMs + ", limit=" + limit + ", filter=" + topicFilter + " from Remote host "+ctx.getRequest().getRemoteHost()); + final String remoteHost = req.getRemoteHost(); + ErrorResponseProvider errRespProvider = new ErrorResponseProvider.Builder().withErrorMessages(errorMessages) + .withTopic(topic).withConsumerGroup(consumerGroup).withClient(clientId).withRemoteHost(remoteHost).build(); - // is this user allowed to read this topic? - final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx); - final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic); + validateIpBlacklist(errRespProvider, ctx); - if (metatopic == null) { - // no such topic. - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, - DMaaPResponseCode.RESOURCE_NOT_FOUND.getResponseCode(), - errorMessages.getTopicNotExist() + "-[" + topic + "]", null, Utils.getFormattedDate(new Date()), - topic, null, null, consumerGroup + "/" + clientId, ctx.getRequest().getRemoteHost()); - LOG.info(errRes.toString()); - throw new CambriaApiException(errRes); - } - String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, - "metrics.send.cambria.topic"); - if (null == metricTopicname) - metricTopicname = "msgrtr.apinode.metrics.dmaap"; - - boolean topicNameEnforced = false; - String topicNameStd = null; - topicNameStd = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, - "enforced.topic.name.AAF"); - if (null != topicNameStd && topic.startsWith(topicNameStd)) { - topicNameEnforced = true; + final Topic metaTopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic); + if (metaTopic == null) { + throw new CambriaApiException(errRespProvider.getTopicNotFoundError()); } - if (null == ctx.getRequest().getHeader("Authorization") && !topic.equalsIgnoreCase(metricTopicname)) { - if (null != metatopic.getOwner() && !("".equals(metatopic.getOwner()))) { - // check permissions - metatopic.checkUserRead(user); - } - } - // if headers are not provided then user will be null - if (topicNameEnforced ||(user == null && null != ctx.getRequest().getHeader("Authorization"))) { - // the topic name will be sent by the client - - DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); - String permission = aaf.aafPermissionString(topic, "sub"); - if (!aaf.aafAuthentication(ctx.getRequest(), permission)) { - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, - DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), - errorMessages.getNotPermitted1() + " read " + errorMessages.getNotPermitted2() + topic + " on " - + permission, - null, Utils.getFormattedDate(new Date()), topic, null, null, consumerGroup + "/" + clientId, - ctx.getRequest().getRemoteHost()); - LOG.info(errRes.toString()); - throw new DMaaPAccessDeniedException(errRes); + boolean isAAFTopic = authorizeClientWhenNeeded(ctx, metaTopic, topic, errRespProvider, SUBSCRIBE_ACTION); - } - isAAFTopic = true; - } final long elapsedMs1 = System.currentTimeMillis() - startTime; logger.info("Time taken in getEvents Authorization " + elapsedMs1 + " ms for " + topic + " " + consumerGroup + " " + clientId); - Consumer c = null; - - String lhostId = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, - "clusterhostid"); - if (null == lhostId) { - try { - lhostId = InetAddress.getLocalHost().getCanonicalHostName(); - } catch (UnknownHostException e) { - LOG.info("Unknown Host Exception error occured while getting getting hostid"); - } - } - CambriaOutboundEventStream coes = null; + verifyHostId(); + final boolean pretty = isPrettyPrintEnabled(); + final boolean withMeta = isMetaOffsetEnabled(); + int timeoutMs = getMessageTimeout(req); + int limit = getMessageLimit(req); + String topicFilter = (null != req.getParameter("filter")) ? req.getParameter("filter") : CambriaConstants.kNoFilter; + logger.info("fetch: timeout=" + timeoutMs + ", limit=" + limit + ", filter=" + topicFilter + " from Remote host "+ctx.getRequest().getRemoteHost()); + + Consumer consumer = null; try { final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics(); final DMaaPCambriaLimiter rl = ctx.getConfigReader().getfRateLimiter(); - rl.onCall(topic, consumerGroup, clientId, ctx.getRequest().getRemoteHost()); - c = ctx.getConfigReader().getfConsumerFactory().getConsumerFor(topic, consumerGroup, clientId, timeoutMs, - ctx.getRequest().getRemoteHost()); - coes = new CambriaOutboundEventStream.Builder(c).timeout(timeoutMs) + rl.onCall(topic, consumerGroup, clientId, remoteHost); + consumer = ctx.getConfigReader().getfConsumerFactory().getConsumerFor(topic, consumerGroup, clientId, timeoutMs, + remoteHost); + CambriaOutboundEventStream coes = new CambriaOutboundEventStream.Builder(consumer).timeout(timeoutMs) .limit(limit).filter(topicFilter).pretty(pretty).withMeta(withMeta).build(); coes.setDmaapContext(ctx); - coes.setTopic(metatopic); - if (isTransEnabled() || isAAFTopic) { - coes.setTransEnabled(true); - } else { - coes.setTransEnabled(false); - } + coes.setTopic(metaTopic); + coes.setTransEnabled(isTransEnabled() || isAAFTopic); coes.setTopicStyle(isAAFTopic); final long elapsedMs2 = System.currentTimeMillis() - startTime; logger.info("Time taken in getEvents getConsumerFor " + elapsedMs2 + " ms for " + topic + " " + consumerGroup + " " + clientId); - DMaaPResponseBuilder.setNoCacheHeadings(ctx); - - DMaaPResponseBuilder.respondOkWithStream(ctx, MediaType.APPLICATION_JSON, coes); + respondOkWithStream(ctx, coes); // No IOException thrown during respondOkWithStream, so commit the // new offsets to all the brokers - c.commitOffsets(); + consumer.commitOffsets(); final int sent = coes.getSentCount(); - - metricsSet.consumeTick(sent); - rl.onSend(topic, consumerGroup, clientId, sent); + metricsSet.consumeTick(sent); + rl.onSend(topic, consumerGroup, clientId, sent); final long elapsedMs = System.currentTimeMillis() - startTime; - logger.info("Sent " + sent + " msgs in " + elapsedMs + " ms; committed to offset " + c.getOffset() + " for " + logger.info("Sent " + sent + " msgs in " + elapsedMs + " ms; committed to offset " + consumer.getOffset() + " for " + topic + " " + consumerGroup + " " + clientId + " on to the server " - + ctx.getRequest().getRemoteHost()); + + remoteHost); } catch (UnavailableException excp) { logger.warn(excp.getMessage(), excp); - - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE, - DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(), - errorMessages.getServerUnav() + excp.getMessage(), null, Utils.getFormattedDate(new Date()), topic, - null, null, consumerGroup + "-" + clientId, ctx.getRequest().getRemoteHost()); + ErrorResponse errRes = errRespProvider.getServiceUnavailableError(excp.getMessage()); LOG.info(errRes.toString()); throw new CambriaApiException(errRes); - } catch (java.util.ConcurrentModificationException excp1) { - LOG.info(excp1.getMessage() + "on " + topic + " " + consumerGroup + " ****** " + clientId + " from Remote"+ctx.getRequest().getRemoteHost()); - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT, - DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(), - "Couldn't respond to client, possible of consumer requests from more than one server. Please contact MR team if you see this issue occurs continously", null, - Utils.getFormattedDate(new Date()), topic, null, null, clientId, ctx.getRequest().getRemoteHost()); + } catch (ConcurrentModificationException excp1) { + LOG.info(excp1.getMessage() + "on " + topic + " " + consumerGroup + " ****** " + clientId + " from Remote"+remoteHost); + ErrorResponse errRes = errRespProvider.getConcurrentModificationError(); logger.info(errRes.toString()); throw new CambriaApiException(errRes); - } catch (CambriaApiException excp) { - LOG.info(excp.getMessage() + "on " + topic + " " + consumerGroup + " ****** " + clientId); - - throw excp; - } - catch (Exception excp) { - // System.out.println(excp + "------------------ " + topic+" - // "+consumerGroup+" "+clientId); - + } catch (Exception excp) { logger.info("Couldn't respond to client, closing cambria consumer " + " " + topic + " " + consumerGroup + " " + clientId + " " + HttpStatus.SC_SERVICE_UNAVAILABLE + " ****** " + excp); - - ctx.getConfigReader().getfConsumerFactory().destroyConsumer(topic, consumerGroup, clientId); - - - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE, - DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(), - "Couldn't respond to client, closing cambria consumer" + excp.getMessage(), null, - Utils.getFormattedDate(new Date()), topic, null, null, clientId, ctx.getRequest().getRemoteHost()); + ctx.getConfigReader().getfConsumerFactory().destroyConsumer(topic, consumerGroup, clientId); + ErrorResponse errRes = errRespProvider.getGenericError(excp.getMessage()); logger.info(errRes.toString()); throw new CambriaApiException(errRes); } finally { - coes = null; - // If no cache, close the consumer now that we're done with it. - boolean kSetting_EnableCache = ConsumerFactory.kDefault_IsCacheEnabled; - String strkSetting_EnableCache = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, - ConsumerFactory.kSetting_EnableCache); - if (null != strkSetting_EnableCache) - kSetting_EnableCache = Boolean.parseBoolean(strkSetting_EnableCache); - // if - // (!ctx.getConfigReader().getSettings().getBoolean(ConsumerFactory.kSetting_EnableCache, - // ConsumerFactory.kDefault_IsCacheEnabled) && (c != null)) { - if (!kSetting_EnableCache && (c != null)) { + if (consumer != null && !isCacheEnabled()) { try { - c.close(); + consumer.close(); } catch (Exception e) { - logger.info("***Exception occured in getEvents finaly block while closing the consumer " + " " + logger.info("***Exception occurred in getEvents finally block while closing the consumer " + " " + topic + " " + consumerGroup + " " + clientId + " " + HttpStatus.SC_SERVICE_UNAVAILABLE + " " + e); } @@ -352,117 +212,155 @@ public class EventsServiceImpl implements EventsService { } } - /** - * @throws missingReqdSetting - * - */ - @Override - public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition, - final String requestTime) throws ConfigDbException, AccessDeniedException, TopicExistsException, - CambriaApiException, IOException, missingReqdSetting, DMaaPAccessDeniedException { - - // is this user allowed to write to this topic? - final long startMs = System.currentTimeMillis(); - final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx); - final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic); - boolean isAAFTopic = false; - - // was this host blacklisted? + private void validateIpBlacklist(ErrorResponseProvider errResponseProvider, DMaaPContext ctx) throws CambriaApiException { final String remoteAddr = Utils.getRemoteAddress(ctx); - if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) { - - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, - DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), - "Source address [" + remoteAddr + "] is blacklisted. Please contact the cluster management team.", - null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()), - ctx.getRequest().getRemoteHost(), null, null); + ErrorResponse errRes = errResponseProvider.getIpBlacklistedError(remoteAddr); LOG.info(errRes.toString()); throw new CambriaApiException(errRes); } + } - String topicNameStd = null; + private boolean authorizeClientWhenNeeded(DMaaPContext ctx, Topic metaTopic, String topicName, + ErrorResponseProvider errRespProvider, String action) throws CambriaApiException, AccessDeniedException { - // topicNameStd= - - topicNameStd = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, - "enforced.topic.name.AAF"); - String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, - "metrics.send.cambria.topic"); - if (null == metricTopicname) - metricTopicname = "msgrtr.apinode.metrics.dmaap"; - boolean topicNameEnforced = false; - if (null != topicNameStd && topic.startsWith(topicNameStd)) { - topicNameEnforced = true; + boolean isAAFTopic = false; + String metricTopicName = getMetricTopicName(); + if(!metricTopicName.equalsIgnoreCase(topicName)) { + if(isCadiEnabled() && isTopicNameEnforcedAaf(topicName)) { + isAAFTopic = true; + DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); + String permission = aaf.aafPermissionString(topicName, action); + if (!aaf.aafAuthentication(ctx.getRequest(), permission)) { + ErrorResponse errRes = errRespProvider.getAafAuthorizationError(permission, action); + LOG.info(errRes.toString()); + throw new DMaaPAccessDeniedException(errRes); + + } + } else if( null != metaTopic.getOwner() && !metaTopic.getOwner().isEmpty()) { + final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx); + if(SUBSCRIBE_ACTION.equals(action)) { + metaTopic.checkUserRead(user); + } else if(PUBLISH_ACTION.equals(action)) { + metaTopic.checkUserWrite(user); + } + } } + return isAAFTopic; + } + + boolean isCadiEnabled() { + return Utils.isCadiEnabled(); + } - // Here check if the user has rights to publish on the topic - // ( This will be called when no auth is added or when UEB API Key - // Authentication is used) - // checkUserWrite(user) method will throw an error when there is no Auth - // header added or when the - // user has no publish rights + void respondOkWithStream(DMaaPContext ctx, StreamWriter coes) throws IOException{ + DMaaPResponseBuilder.setNoCacheHeadings(ctx); + DMaaPResponseBuilder.respondOkWithStream(ctx, MediaType.APPLICATION_JSON, coes); + } - if (null != metatopic && null != metatopic.getOwner() && !("".equals(metatopic.getOwner())) - && null == ctx.getRequest().getHeader("Authorization") && !topic.equalsIgnoreCase(metricTopicname)) { - metatopic.checkUserWrite(user); - } + private int getMessageLimit(HttpServletRequest request) { + return NumberUtils.toInt(request.getParameter("limit"), CambriaConstants.kNoLimit); + } - // if headers are not provided then user will be null - if (topicNameEnforced || (user == null && null != ctx.getRequest().getHeader("Authorization") - && !topic.equalsIgnoreCase(metricTopicname))) { - // the topic name will be sent by the client - - DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); - String permission = aaf.aafPermissionString(topic, "pub"); - if (!aaf.aafAuthentication(ctx.getRequest(), permission)) { - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, - DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), - errorMessages.getNotPermitted1() + " publish " + errorMessages.getNotPermitted2() + topic - + " on " + permission, - null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()), - ctx.getRequest().getRemoteHost(), null, null); - LOG.info(errRes.toString()); - throw new DMaaPAccessDeniedException(errRes); + private int getMessageTimeout(HttpServletRequest request) { + String timeoutMsAsString = getPropertyFromAJSCmap(TIMEOUT_PROPERTY); + int defaultTimeoutMs = timeoutMsAsString!=null ? NumberUtils.toInt(timeoutMsAsString, CambriaConstants.kNoTimeout) : + CambriaConstants.kNoTimeout; + + String timeoutProperty = request.getParameter(TIMEOUT_PROPERTY); + return timeoutProperty != null ? NumberUtils.toInt(timeoutProperty, defaultTimeoutMs) : defaultTimeoutMs; + } + + private boolean isPrettyPrintEnabled() { + return rrConvertor.convertToBooleanBroad(getPropertyFromAJSCmap("pretty")); + } + + private boolean isMetaOffsetEnabled() { + return rrConvertor.convertToBooleanBroad(getPropertyFromAJSCmap( "meta")); + } + + private boolean isTopicNameEnforcedAaf(String topicName) { + String topicNameStd = getPropertyFromAJSCmap("enforced.topic.name.AAF"); + return !topicNameStd.isEmpty() && topicName.startsWith(topicNameStd); + } + + private boolean isCacheEnabled() { + String cachePropsSetting = getPropertyFromAJSCmap(ConsumerFactory.kSetting_EnableCache); + return !cachePropsSetting.isEmpty() ? Boolean.parseBoolean(cachePropsSetting) : ConsumerFactory.kDefault_IsCacheEnabled; + } + + private void verifyHostId() { + String lhostId = getPropertyFromAJSCmap("clusterhostid"); + if (lhostId.isEmpty()) { + try { + InetAddress.getLocalHost().getCanonicalHostName(); + } catch (UnknownHostException e) { + LOG.warn("Unknown Host Exception error occurred while getting getting hostid", e); } - isAAFTopic = true; + } + } - final HttpServletRequest req = ctx.getRequest(); + private String getMetricTopicName() { + String metricTopicFromProps = getPropertyFromAJSCmap("metrics.send.cambria.topic"); + return !metricTopicFromProps.isEmpty() ? metricTopicFromProps : "msgrtr.apinode.metrics.dmaap"; + } - // check for chunked input - boolean chunked = false; - if (null != req.getHeader(TRANSFER_ENCODING)) { - chunked = req.getHeader(TRANSFER_ENCODING).contains("chunked"); - } - // get the media type, or set it to a generic value if it wasn't - // provided - String mediaType = req.getContentType(); - if (mediaType == null || mediaType.length() == 0) { - mediaType = MimeTypes.kAppGenericBinary; - } + /** + * @throws missingReqdSetting + * + */ + @Override + public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition, + final String requestTime) throws ConfigDbException, AccessDeniedException, + CambriaApiException, IOException, missingReqdSetting { - if (mediaType.contains("charset=UTF-8")) { - mediaType = mediaType.replace("; charset=UTF-8", "").trim(); - } + final long startMs = System.currentTimeMillis(); + String remoteHost = ctx.getRequest().getRemoteHost(); + ErrorResponseProvider errRespProvider = new ErrorResponseProvider.Builder().withErrorMessages(errorMessages) + .withTopic(topic).withRemoteHost(remoteHost).withPublisherIp(remoteHost) + .withPublisherId(Utils.getUserApiKey(ctx.getRequest())).build(); - String istransidUEBtopicreqd = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, - "transidUEBtopicreqd"); - boolean istransidreqd = false; - if (null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true")) { - istransidreqd = true; + validateIpBlacklist(errRespProvider, ctx); + + final Topic metaTopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic); + if (metaTopic == null) { + throw new CambriaApiException(errRespProvider.getTopicNotFoundError()); } - if (isAAFTopic || istransidreqd) { + final boolean isAAFTopic = authorizeClientWhenNeeded(ctx, metaTopic, topic, errRespProvider, PUBLISH_ACTION); + + final HttpServletRequest req = ctx.getRequest(); + boolean chunked = isRequestedChunk(req); + String mediaType = getMediaType(req); + boolean transactionRequired = isTransactionIdRequired(); + + if (isAAFTopic || transactionRequired) { pushEventsWithTransaction(ctx, msg, topic, defaultPartition, requestTime, chunked, mediaType); } else { pushEvents(ctx, topic, msg, defaultPartition, chunked, mediaType); } + final long endMs = System.currentTimeMillis(); final long totalMs = endMs - startMs; - LOG.info("Overall Response time - Published " + " msgs in " + totalMs + " ms for topic " + topic); + } + + private boolean isRequestedChunk(HttpServletRequest request) { + return null != request.getHeader(TRANSFER_ENCODING) && + request.getHeader(TRANSFER_ENCODING).contains("chunked"); + } + + private String getMediaType(HttpServletRequest request) { + String mediaType = request.getContentType(); + if (mediaType == null || mediaType.length() == 0) { + return MimeTypes.kAppGenericBinary; + } + return mediaType.replace("; charset=UTF-8", "").trim(); + } + private boolean isTransactionIdRequired() { + return getPropertyFromAJSCmap("transidUEBtopicreqd").equalsIgnoreCase("true"); } /** @@ -481,7 +379,7 @@ public class EventsServiceImpl implements EventsService { */ private void pushEvents(DMaaPContext ctx, String topic, InputStream msg, String defaultPartition, boolean chunked, String mediaType) - throws ConfigDbException, AccessDeniedException, TopicExistsException, CambriaApiException, IOException { + throws ConfigDbException, AccessDeniedException, CambriaApiException, IOException { final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics(); // setup the event set final CambriaEventSet events = new CambriaEventSet(mediaType, msg, chunked, defaultPartition); @@ -490,8 +388,8 @@ public class EventsServiceImpl implements EventsService { final long startMs = System.currentTimeMillis(); long count = 0; long maxEventBatch = 1024L* 16; - String batchlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH); - if (null != batchlen) + String batchlen = getPropertyFromAJSCmap( BATCH_LENGTH); + if (null != batchlen && !batchlen.isEmpty()) maxEventBatch = Long.parseLong(batchlen); // long maxEventBatch = @@ -550,7 +448,7 @@ public class EventsServiceImpl implements EventsService { final JSONObject response = new JSONObject(); response.put("count", count); response.put("serverTimeMs", totalMs); - DMaaPResponseBuilder.respondOk(ctx, response); + respondOk(ctx, response); } catch (Exception excp) { int status = HttpStatus.SC_NOT_FOUND; @@ -590,7 +488,7 @@ public class EventsServiceImpl implements EventsService { */ private void pushEventsWithTransaction(DMaaPContext ctx, InputStream inputStream, final String topic, final String partitionKey, final String requestTime, final boolean chunked, final String mediaType) - throws ConfigDbException, AccessDeniedException, TopicExistsException, IOException, CambriaApiException { + throws ConfigDbException, AccessDeniedException, IOException, CambriaApiException { final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics(); @@ -601,8 +499,8 @@ public class EventsServiceImpl implements EventsService { final long startMs = System.currentTimeMillis(); long count = 0; long maxEventBatch = 1024L * 16; - String evenlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH); - if (null != evenlen) + String evenlen = getPropertyFromAJSCmap( BATCH_LENGTH); + if (null != evenlen && !evenlen.isEmpty()) maxEventBatch = Long.parseLong(evenlen); // final long maxEventBatch = @@ -639,9 +537,9 @@ public class EventsServiceImpl implements EventsService { responseTransactionId = m.getLogDetails().getTransactionId(); - JSONObject jsonObject = new JSONObject(); - jsonObject.put("msgWrapMR", m.getMessage()); - jsonObject.put("transactionId", responseTransactionId); + //JSONObject jsonObject = new JSONObject(); + //jsonObject.put("msgWrapMR", m.getMessage()); + //jsonObject.put("transactionId", responseTransactionId); // final KeyedMessage<String, String> data = new // KeyedMessage<String, String>(topic, m.getKey(), @@ -755,8 +653,9 @@ public class EventsServiceImpl implements EventsService { // build a response final JSONObject response = new JSONObject(); response.put("count", count); + response.put("transactionId", responseTransactionId); response.put("serverTimeMs", totalMs); - DMaaPResponseBuilder.respondOk(ctx, response); + respondOk(ctx, response); } catch (Exception excp) { int status = HttpStatus.SC_NOT_FOUND; @@ -798,6 +697,10 @@ public class EventsServiceImpl implements EventsService { msg.setLogDetails(logDetails); } + void respondOk(DMaaPContext ctx, JSONObject response) throws IOException { + DMaaPResponseBuilder.respondOk(ctx, response); + } + /** * * @author anowarul.islam @@ -837,8 +740,7 @@ public class EventsServiceImpl implements EventsService { } public boolean isTransEnabled() { - String istransidUEBtopicreqd = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, - "transidUEBtopicreqd"); + String istransidUEBtopicreqd = getPropertyFromAJSCmap("transidUEBtopicreqd"); boolean istransidreqd = false; if ((null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true"))) { istransidreqd = true; @@ -863,13 +765,5 @@ public class EventsServiceImpl implements EventsService { return logDetails; } - - - - - - - - }
\ No newline at end of file diff --git a/src/main/java/org/onap/dmaap/dmf/mr/utils/Utils.java b/src/main/java/org/onap/dmaap/dmf/mr/utils/Utils.java index 3048251..40e6840 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/utils/Utils.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/utils/Utils.java @@ -23,6 +23,7 @@ package org.onap.dmaap.dmf.mr.utils; import java.io.IOException; import java.io.InputStream; +import java.security.Principal; import java.text.DecimalFormat; import java.text.SimpleDateFormat; import java.util.Date; @@ -46,7 +47,9 @@ public class Utils { private static final String DATE_FORMAT = "dd-MM-yyyy::hh:mm:ss:SSS"; public static final String CAMBRIA_AUTH_HEADER = "X-CambriaAuth"; + private static final String AUTH_HEADER = "Authorization"; private static final String BATCH_ID_FORMAT = "000000"; + private static final String X509_ATTR = "javax.servlet.request.X509Certificate"; private static final EELFLogger log = EELFManager.getInstance().getLogger(Utils.class); private Utils() { @@ -75,15 +78,21 @@ public class Utils { if (null != auth) { final String[] splittedAuthKey = auth.split(":"); return splittedAuthKey[0]; - }else if (null!=request.getHeader("Authorization")){ + }else if (null != request.getHeader(AUTH_HEADER) || null != request.getAttribute(X509_ATTR)){ /** * AAF implementation enhancement */ - String user= request.getUserPrincipal().getName().toString(); - return user.substring(0, user.lastIndexOf("@")); + Principal principal = request.getUserPrincipal(); + if(principal != null){ + String name = principal.getName(); + return name.substring(0, name.lastIndexOf('@')); + } + log.warn("No principal has been provided on HTTP request"); } return null; } + + /** * to format the batch sequence id * @param batchId diff --git a/src/test/java/org/onap/dmaap/dmf/mr/security/DMaaPAAFAuthenticatorImplTest.java b/src/test/java/org/onap/dmaap/dmf/mr/security/DMaaPAAFAuthenticatorImplTest.java new file mode 100644 index 0000000..0b8829c --- /dev/null +++ b/src/test/java/org/onap/dmaap/dmf/mr/security/DMaaPAAFAuthenticatorImplTest.java @@ -0,0 +1,117 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP Policy Engine + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dmaap.dmf.mr.security; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.BDDMockito.given; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Spy; +import org.mockito.runners.MockitoJUnitRunner; +import org.springframework.mock.web.MockHttpServletRequest; + +@RunWith(MockitoJUnitRunner.class) +public class DMaaPAAFAuthenticatorImplTest { + + private MockHttpServletRequest request; + @Spy + private DMaaPAAFAuthenticatorImpl aafAuthorizer; + + @Before + public void setUp() throws Exception { + request = new MockHttpServletRequest(); + } + + + @Test + public void aafAuthentication_shouldSuccess_whenRequestIsConfiguredWithProperUserRole() { + //given + String userRole = "org.onap.dmaap.mr.topic|:topic.org.onap.dmaap.mr.aSimpleTopic|sub"; + request.addUserRole(userRole); + + //when + boolean isAuthorized = aafAuthorizer.aafAuthentication(request, userRole); + + //then + assertTrue(isAuthorized); + } + + @Test + public void aafAuthentication_shouldFail_whenRequestIsConfiguredWithProperUserRole() { + //given + String userRole = "org.onap.dmaap.mr.topic|:topic.org.onap.dmaap.mr.aSimpleTopic|pub"; + + //when + boolean isAuthorized = aafAuthorizer.aafAuthentication(request, userRole); + + //then + assertFalse(isAuthorized); + } + + @Test + public void getPermissionAsString_shouldReturnValidTopicPermission_whenTopicWithNamespace() throws Exception { + //given + String topicPermission = "org.onap.dmaap.mr.topic|:topic.org.onap.dmaap.mr.aSimpleTopic|pub"; + String topicName = "org.onap.dmaap.mr.aSimpleTopic"; + String operation = "pub"; + + //when + String resultPem = aafAuthorizer.aafPermissionString(topicName, operation); + + //then + assertEquals(topicPermission, resultPem); + } + + @Test + public void getPermissionAsString_shouldReturnValidTopicPermission_whenTopicWithoutNamespace() throws Exception { + //given + String topicPermission = "org.onap.dmaap.mr.topic|:topic.topicName|pub"; + String topicName = "topicName"; + String operation = "pub"; + + //when + String resultPem = aafAuthorizer.aafPermissionString(topicName, operation); + + //then + assertEquals(topicPermission, resultPem); + } + + @Test + public void getPermissionAsString_shouldReturnValidTopicPermission_whenNamespaceReadFromProperty() throws Exception { + //given + String topicPermission = "com.custom.ns.topic|:topic.topicName|pub"; + String topicName = "topicName"; + String operation = "pub"; + String customNamespace = "com.custom.ns"; + given(aafAuthorizer.readNamespaceFromProperties()).willReturn(customNamespace); + + //when + String resultPem = aafAuthorizer.aafPermissionString(topicName, operation); + + //then + assertEquals(topicPermission, resultPem); + } + + +} diff --git a/src/test/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImplTest.java b/src/test/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImplTest.java new file mode 100644 index 0000000..4abbe89 --- /dev/null +++ b/src/test/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImplTest.java @@ -0,0 +1,598 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP Policy Engine + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dmaap.dmf.mr.service.impl; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.att.nsa.limits.Blacklist; +import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException; +import com.att.nsa.security.db.simple.NsaSimpleApiKey; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import javax.servlet.http.HttpServletRequest; +import joptsimple.internal.Strings; +import org.apache.http.HttpStatus; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.json.JSONObject; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.mockito.Spy; +import org.mockito.runners.MockitoJUnitRunner; +import org.onap.dmaap.dmf.mr.CambriaApiException; +import org.onap.dmaap.dmf.mr.backends.Consumer; +import org.onap.dmaap.dmf.mr.backends.ConsumerFactory; +import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException; +import org.onap.dmaap.dmf.mr.backends.MetricsSet; +import org.onap.dmaap.dmf.mr.backends.Publisher; +import org.onap.dmaap.dmf.mr.beans.DMaaPCambriaLimiter; +import org.onap.dmaap.dmf.mr.beans.DMaaPContext; +import org.onap.dmaap.dmf.mr.beans.DMaaPKafkaMetaBroker; +import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException; +import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages; +import org.onap.dmaap.dmf.mr.metabroker.Topic; +import org.onap.dmaap.dmf.mr.resources.CambriaOutboundEventStream; +import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticator; +import org.onap.dmaap.dmf.mr.utils.ConfigurationReader; +import org.springframework.mock.web.MockHttpServletRequest; +import org.springframework.mock.web.MockHttpServletResponse; + +@RunWith(MockitoJUnitRunner.class) +public class EventsServiceImplTest { + + private InputStream iStream = null; + private DMaaPContext dMaapContext = new DMaaPContext(); + private DMaaPErrorMessages pErrorMessages = new DMaaPErrorMessages(); + @Mock + private ConfigurationReader configurationReader; + @Mock + private Blacklist blacklist; + @Mock + private DMaaPAuthenticator<NsaSimpleApiKey> dmaaPAuthenticator; + @Mock + private NsaSimpleApiKey nsaSimpleApiKey; + @Mock + private DMaaPKafkaMetaBroker dmaapKafkaMetaBroker; + @Mock + private Topic createdTopic; + @Mock + private ConsumerFactory factory; + @Mock + private Consumer consumer; + @Mock + private Publisher publisher; + @Mock + private DMaaPCambriaLimiter limiter; + @Mock + private MetricsSet metrics; + @Spy + private EventsServiceImpl eventsService; + + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + private MockHttpServletRequest request; + + @Before + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + String source = "source of my InputStream"; + iStream = new ByteArrayInputStream(source.getBytes("UTF-8")); + + request = new MockHttpServletRequest(); + MockHttpServletResponse response = new MockHttpServletResponse(); + dMaapContext.setRequest(request); + dMaapContext.setResponse(response); + when(blacklist.contains(anyString())).thenReturn(false); + when(configurationReader.getfIpBlackList()).thenReturn(blacklist); + when(configurationReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator); + dMaapContext.setConfigReader(configurationReader); + eventsService.setErrorMessages(pErrorMessages); + doReturn("100").when(eventsService).getPropertyFromAJSCmap("timeout"); + } + + @Test + public void getEvents_shouldFailOnAafAuthorization() throws Exception { + String topicPrefix = "org.onap.aaf.enforced"; + String topicName = topicPrefix + ".topicName"; + when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker); + when(dmaapKafkaMetaBroker.getTopic(topicName)).thenReturn(createdTopic); + when(eventsService.getPropertyFromAJSCmap("enforced.topic.name.AAF")).thenReturn(topicPrefix); + when(eventsService.isCadiEnabled()).thenReturn(true); + + thrown.expect(DMaaPAccessDeniedException.class); + thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_UNAUTHORIZED))); + + eventsService.getEvents(dMaapContext, topicName, "CG1", "23"); + } + + @Test + public void getEvents_shouldFail_whenRemoteAddressIsBlacklisted() throws Exception { + String remoteIp = "10.154.17.115"; + request.setRemoteAddr(remoteIp); + when(blacklist.contains(remoteIp)).thenReturn(true); + when(configurationReader.getfIpBlackList()).thenReturn(blacklist); + + thrown.expect(CambriaApiException.class); + thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_FORBIDDEN))); + + eventsService.getEvents(dMaapContext, "testTopic", "CG1", "23"); + } + + @Test + public void getEvents_shouldFail_whenRequestedTopicNotExists() throws Exception { + when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker); + when(dmaapKafkaMetaBroker.getTopic("testTopic")).thenReturn(null); + + thrown.expect(CambriaApiException.class); + thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_NOT_FOUND))); + + eventsService.getEvents(dMaapContext, "testTopic", "CG1", "23"); + } + + @Test + public void getEvents_shouldFail_whenConsumerLockCannotBeAcquired() throws Exception { + //given + String topicName = "testTopic345"; + String consumerGroup = "CG5"; + String clientId = "13"; + when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker); + when(configurationReader.getfRateLimiter()).thenReturn(limiter); + when(dmaapKafkaMetaBroker.getTopic(topicName)).thenReturn(createdTopic); + when(configurationReader.getfConsumerFactory()).thenReturn(factory); + when(dmaaPAuthenticator.authenticate(dMaapContext)).thenReturn(nsaSimpleApiKey); + doThrow(new UnavailableException("Could not acquire consumer lock")).when(factory) + .getConsumerFor(eq(topicName), eq(consumerGroup), eq(clientId), anyInt(), anyString()); + + thrown.expect(CambriaApiException.class); + thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_SERVICE_UNAVAILABLE))); + + //when + eventsService.getEvents(dMaapContext, topicName, consumerGroup, clientId); + + //then + verify(factory).getConsumerFor(eq(topicName), eq(consumerGroup), eq(clientId), anyInt(), anyString()); + + } + + @Test + public void getEvents_shouldFail_whenBrokerServicesAreUnavailable() throws Exception { + String topicName = "testTopic"; + String consumerGroup = "CG1"; + String clientId = "23"; + when(dmaaPAuthenticator.authenticate(dMaapContext)).thenReturn(null); + when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker); + when(dmaapKafkaMetaBroker.getTopic(topicName)).thenReturn(createdTopic); + when(configurationReader.getfConsumerFactory()).thenReturn(factory); + + givenUserAuthorizedWithAAF(request, topicName, "sub"); + + thrown.expect(CambriaApiException.class); + thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_SERVICE_UNAVAILABLE))); + + //when + eventsService.getEvents(dMaapContext, topicName, consumerGroup, clientId); + + //then + verify(factory).destroyConsumer(topicName, consumerGroup, clientId); + } + + private void givenUserAuthorizedWithAAF(MockHttpServletRequest request, String topicName, String operation) { + String permission = "org.onap.dmaap.mr.topic|:topic." + topicName + "|" + operation; + request.addUserRole(permission); + } + + @Test + public void getEvents_shouldHandleConcurrentModificationError() throws Exception { + String testTopic = "testTopic"; + when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker); + when(dmaapKafkaMetaBroker.getTopic(testTopic)).thenReturn(createdTopic); + when(configurationReader.getfConsumerFactory()).thenReturn(factory); + when(configurationReader.getfRateLimiter()).thenThrow(new ConcurrentModificationException("Error occurred")); + givenUserAuthorizedWithAAF(request, testTopic, "sub"); + + thrown.expect(CambriaApiException.class); + thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_CONFLICT))); + + eventsService.getEvents(dMaapContext, "testTopic", "CG1", "23"); + } + + @Test + public void getEvents_shouldNotAuthorizeClient_whenSubscribingToMetricsTopic() throws Exception { + //given + HttpServletRequest permittedRequest = mock(HttpServletRequest.class); + when(permittedRequest.getHeaders(anyString())).thenReturn(Collections.<String>emptyEnumeration()); + dMaapContext.setRequest(permittedRequest); + String metricsTopicName = "msgrtr.apinode.metrics.dmaap"; + String consumerGroup = "CG5"; + String clientId = "7"; + givenConfiguredWithMocks(metricsTopicName); + when(factory.getConsumerFor(eq(metricsTopicName), eq(consumerGroup), eq(clientId), anyInt(), anyString())) + .thenReturn(consumer); + doNothing().when(eventsService).respondOkWithStream(eq(dMaapContext), any(CambriaOutboundEventStream.class)); + + //when + eventsService.getEvents(dMaapContext, metricsTopicName, consumerGroup, clientId); + + //then + verify(eventsService).respondOkWithStream(eq(dMaapContext), any(CambriaOutboundEventStream.class)); + verify(dmaaPAuthenticator, never()).authenticate(dMaapContext); + verify(permittedRequest, never()).isUserInRole(anyString()); + } + + @Test + public void getEvents_shouldNotAuthorizeClient_whenTopicNoteEnforcedWithAaf_andTopicHasNoOwnerSet() + throws Exception { + //given + String topicName = "someSimpleTopicName"; + String consumerGroup = "CG5"; + String clientId = "7"; + HttpServletRequest permittedRequest = mock(HttpServletRequest.class); + when(permittedRequest.getHeaders(anyString())).thenReturn(Collections.<String>emptyEnumeration()); + dMaapContext.setRequest(permittedRequest); + givenConfiguredWithMocks(topicName); + when(factory.getConsumerFor(eq(topicName), eq(consumerGroup), eq(clientId), anyInt(), anyString())) + .thenReturn(consumer); + doNothing().when(eventsService).respondOkWithStream(eq(dMaapContext), any(CambriaOutboundEventStream.class)); + when(createdTopic.getOwner()).thenReturn(Strings.EMPTY); + + //when + eventsService.getEvents(dMaapContext, topicName, consumerGroup, clientId); + + //then + verify(eventsService).respondOkWithStream(eq(dMaapContext), any(CambriaOutboundEventStream.class)); + verify(dmaaPAuthenticator, never()).authenticate(dMaapContext); + verify(permittedRequest, never()).isUserInRole(anyString()); + } + + @Test + public void getEvents_shouldFailDmaapAuthorization_whenTopicOwnerIsSet_andUserHasNoReadPermissionToTopic() + throws Exception { + //given + String topicName = "someSimpleTopicName"; + String consumerGroup = "CG5"; + String clientId = "7"; + HttpServletRequest permittedRequest = mock(HttpServletRequest.class); + when(permittedRequest.getHeaders(anyString())).thenReturn(Collections.<String>emptyEnumeration()); + dMaapContext.setRequest(permittedRequest); + givenConfiguredWithMocks(topicName); + when(factory.getConsumerFor(eq(topicName), eq(consumerGroup), eq(clientId), anyInt(), anyString())) + .thenReturn(consumer); + doNothing().when(eventsService).respondOkWithStream(eq(dMaapContext), any(CambriaOutboundEventStream.class)); + when(createdTopic.getOwner()).thenReturn("SimpleTopicOwner"); + when(dmaaPAuthenticator.authenticate(dMaapContext)).thenReturn(nsaSimpleApiKey); + doThrow(new AccessDeniedException("userName")).when(createdTopic).checkUserRead(nsaSimpleApiKey); + + thrown.expect(AccessDeniedException.class); + + //when + eventsService.getEvents(dMaapContext, topicName, consumerGroup, clientId); + + //then + verify(createdTopic).checkUserRead(nsaSimpleApiKey); + verify(eventsService, never()).respondOkWithStream(eq(dMaapContext), any(CambriaOutboundEventStream.class)); + verify(permittedRequest, never()).isUserInRole(anyString()); + } + + + @Test + public void getEvents_shouldSuccessfullyRegisterConsumerToEventsStream_withAafAuthorization() throws Exception { + //given + String topicName = "testTopic"; + String consumerGroup = "CG2"; + String clientId = "6"; + String messageLimit = "10"; + String timeout = "25"; + String meta = "yes"; + String pretty = "on"; + String cacheEnabled = "false"; + + givenConfiguredWithMocks(topicName); + givenConfiguredWithProperties(messageLimit, timeout, meta, pretty, cacheEnabled); + when(factory.getConsumerFor(eq(topicName), eq(consumerGroup), eq(clientId), anyInt(), anyString())) + .thenReturn(consumer); + givenUserAuthorizedWithAAF(request, topicName, "sub"); + + //when + eventsService.getEvents(dMaapContext, topicName, consumerGroup, clientId); + + //then + ArgumentCaptor<CambriaOutboundEventStream> osWriter = ArgumentCaptor.forClass(CambriaOutboundEventStream.class); + verifyInvocationOrderForSuccessCase(topicName, consumerGroup, clientId, osWriter); + assertEventStreamProperties(osWriter.getValue(), messageLimit, timeout); + } + + private void assertEventStreamProperties(CambriaOutboundEventStream stream, String messageLimit, String timeout) { + assertEquals(Integer.valueOf(messageLimit).intValue(), stream.getfLimit()); + assertEquals(Integer.valueOf(timeout).intValue(), stream.getfTimeoutMs()); + assertTrue(stream.isfWithMeta()); + assertTrue(stream.isfPretty()); + } + + private void givenConfiguredWithProperties(String messageLimit, String timeout, String meta, String pretty, + String cacheEnabled) { + when(eventsService.getPropertyFromAJSCmap("meta")).thenReturn(meta); + when(eventsService.getPropertyFromAJSCmap("pretty")).thenReturn(pretty); + when(eventsService.getPropertyFromAJSCmap(ConsumerFactory.kSetting_EnableCache)).thenReturn(cacheEnabled); + request.addParameter("timeout", timeout); + request.addParameter("limit", messageLimit); + } + + private void givenConfiguredWithMocks(String topicName) throws Exception { + when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker); + when(configurationReader.getfRateLimiter()).thenReturn(limiter); + when(configurationReader.getfMetrics()).thenReturn(metrics); + when(dmaapKafkaMetaBroker.getTopic(topicName)).thenReturn(createdTopic); + when(configurationReader.getfConsumerFactory()).thenReturn(factory); + when(configurationReader.getfPublisher()).thenReturn(publisher); + } + + private void verifyInvocationOrderForSuccessCase(String topicName, String consumerGroup, String clientId, + ArgumentCaptor<CambriaOutboundEventStream> osWriter) throws Exception { + + InOrder inOrder = Mockito.inOrder(configurationReader, factory, metrics, limiter, consumer, eventsService); + inOrder.verify(configurationReader).getfMetrics(); + inOrder.verify(configurationReader).getfRateLimiter(); + inOrder.verify(limiter).onCall(eq(topicName), eq(consumerGroup), eq(clientId), anyString()); + inOrder.verify(factory).getConsumerFor(eq(topicName), eq(consumerGroup), eq(clientId), anyInt(), anyString()); + inOrder.verify(eventsService).respondOkWithStream(eq(dMaapContext), osWriter.capture()); + inOrder.verify(consumer).commitOffsets(); + inOrder.verify(metrics).consumeTick(anyInt()); + inOrder.verify(limiter).onSend(eq(topicName), eq(consumerGroup), eq(clientId), anyLong()); + inOrder.verify(consumer).close(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void pushEvents_shouldFail_whenRemoteAddressIsBlacklisted() throws Exception { + String remoteIp = "10.132.64.112"; + request.setRemoteAddr(remoteIp); + when(configurationReader.getfIpBlackList()).thenReturn(blacklist); + when(blacklist.contains(anyString())).thenReturn(true); + + thrown.expect(CambriaApiException.class); + thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_FORBIDDEN))); + + eventsService.pushEvents(dMaapContext, "testTopic", iStream, "3", "12:00:00"); + } + + + @Test + public void pushEvents_shouldFail_whenRequestedTopicDoesNotExist() throws Exception { + when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker); + when(dmaapKafkaMetaBroker.getTopic("testTopic")).thenReturn(null); + + thrown.expect(CambriaApiException.class); + thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_NOT_FOUND))); + + eventsService.pushEvents(dMaapContext, "testTopic", iStream, "5", "13:00:00"); + } + + @Test + public void pushEvents_shouldFailDmaapAuthorization_whenTopicOwnerIsSet_andUserHasNoWritePermissionToTopic() + throws Exception { + //given + String topicName = "someSimpleTopicName"; + + HttpServletRequest permittedRequest = mock(HttpServletRequest.class); + when(permittedRequest.getHeaders(anyString())).thenReturn(Collections.<String>emptyEnumeration()); + dMaapContext.setRequest(permittedRequest); + givenConfiguredWithMocks(topicName); + when(createdTopic.getOwner()).thenReturn("SimpleTopicOwner"); + when(dmaaPAuthenticator.authenticate(dMaapContext)).thenReturn(nsaSimpleApiKey); + doThrow(new AccessDeniedException("userName")).when(createdTopic).checkUserWrite(nsaSimpleApiKey); + + thrown.expect(AccessDeniedException.class); + + //when + eventsService.pushEvents(dMaapContext, topicName, iStream, "5", "13:00:00"); + + //then + verify(createdTopic).checkUserWrite(nsaSimpleApiKey); + verify(eventsService, never()).respondOkWithStream(eq(dMaapContext), any(CambriaOutboundEventStream.class)); + verify(permittedRequest, never()).isUserInRole(anyString()); + } + + @Test + public void pushEvents_shouldFailOnAafAuthorization_whenCadiIsEnabled_topicNameEnforced_andUserHasNoPermission() + throws Exception { + //given + String topicPrefix = "org.onap.aaf.enforced"; + String topicName = topicPrefix + ".topicName"; + String permission = "org.onap.dmaap.mr.topic|:topic." + topicName + "|pub"; + HttpServletRequest deniedRequest = mock(HttpServletRequest.class); + when(deniedRequest.getHeaders(anyString())).thenReturn(Collections.<String>emptyEnumeration()); + when(deniedRequest.isUserInRole(permission)).thenReturn(false); + when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker); + when(dmaapKafkaMetaBroker.getTopic(topicName)).thenReturn(createdTopic); + when(eventsService.getPropertyFromAJSCmap("enforced.topic.name.AAF")).thenReturn(topicPrefix); + when(eventsService.isCadiEnabled()).thenReturn(true); + dMaapContext.setRequest(deniedRequest); + + thrown.expect(DMaaPAccessDeniedException.class); + thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_UNAUTHORIZED))); + + //when + eventsService.pushEvents(dMaapContext, topicName, iStream, "5", "13:00:00"); + + //then + verify(deniedRequest).isUserInRole(permission); + } + + + @Test + public void pushEvents_shouldPublishMessagesWithoutTransaction() throws Exception { + //given + String topicName = "topicWithoutTransaction"; + givenConfiguredWithMocks(topicName); + doNothing().when(eventsService).respondOk(eq(dMaapContext), any(JSONObject.class)); + + //when + eventsService.pushEvents(dMaapContext, topicName, iStream, "5", "13:00:00"); + + //then + verify(publisher).sendBatchMessageNew(eq(topicName), Mockito.<ArrayList<ProducerRecord<String, String>>>any()); + ArgumentCaptor<JSONObject> captor = ArgumentCaptor.forClass(JSONObject.class); + verify(eventsService).respondOk(eq(dMaapContext), captor.capture()); + assertEquals(1, captor.getValue().getLong("count")); + } + + @Test + public void pushEvents_shouldHandlePublisherError_whenPushWithoutTransaction() throws Exception { + //given + String topicName = "topicWithoutTransaction"; + givenConfiguredWithMocks(topicName); + doThrow(new IOException()).when(publisher) + .sendBatchMessageNew(eq(topicName), Mockito.<ArrayList<ProducerRecord<String, String>>>any()); + + thrown.expect(CambriaApiException.class); + thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_NOT_FOUND))); + + //when + eventsService.pushEvents(dMaapContext, topicName, iStream, "5", "13:00:00"); + + //then + verify(publisher).sendBatchMessageNew(eq(topicName), Mockito.<ArrayList<ProducerRecord<String, String>>>any()); + verify(eventsService, never()).respondOk(any(DMaaPContext.class), any(JSONObject.class)); + } + + + @Test + public void pushEvents_shouldPublishMessagesWithTransaction() throws Exception { + //given + String topicPrefix = "org.onap.dmaap.mr"; + String topicName = topicPrefix + ".topicWithTransaction"; + givenConfiguredWithMocks(topicName); + when(eventsService.getPropertyFromAJSCmap("enforced.topic.name.AAF")).thenReturn(topicPrefix); + when(eventsService.isCadiEnabled()).thenReturn(true); + doNothing().when(eventsService).respondOk(eq(dMaapContext), any(JSONObject.class)); + + request.addUserRole("org.onap.dmaap.mr.topic|:topic." + topicName + "|pub"); + + //when + eventsService.pushEvents(dMaapContext, topicName, iStream, "5", "13:00:00"); + + //then + verify(publisher).sendBatchMessageNew(eq(topicName), Mockito.<ArrayList<ProducerRecord<String, String>>>any()); + ArgumentCaptor<JSONObject> captor = ArgumentCaptor.forClass(JSONObject.class); + verify(eventsService).respondOk(eq(dMaapContext), captor.capture()); + assertEquals(1, captor.getValue().getLong("count")); + assertFalse(captor.getValue().getString("transactionId").isEmpty()); + } + + @Test + public void pushEvents_shouldHandlePublisherError_whenPushWithTransaction() throws Exception { + //given + String topicPrefix = "org.onap.dmaap.mr"; + String topicName = topicPrefix + ".topicWithTransaction"; + givenConfiguredWithMocks(topicName); + when(eventsService.getPropertyFromAJSCmap("enforced.topic.name.AAF")).thenReturn(topicPrefix); + when(eventsService.isCadiEnabled()).thenReturn(true); + doNothing().when(eventsService).respondOk(eq(dMaapContext), any(JSONObject.class)); + request.addUserRole("org.onap.dmaap.mr.topic|:topic." + topicName + "|pub"); + doThrow(new IOException()).when(publisher) + .sendBatchMessageNew(eq(topicName), Mockito.<ArrayList<ProducerRecord<String, String>>>any()); + + thrown.expect(CambriaApiException.class); + thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_NOT_FOUND))); + + //when + eventsService.pushEvents(dMaapContext, topicName, iStream, "5", "13:00:00"); + + //then + verify(publisher).sendBatchMessageNew(eq(topicName), Mockito.<ArrayList<ProducerRecord<String, String>>>any()); + verify(eventsService, never()).respondOk(any(DMaaPContext.class), any(JSONObject.class)); + } + + @Test + public void pushEvents_shouldNotPerformAnyAuthorization_whenPublishToMetricTopic() throws Exception { + //given + HttpServletRequest permittedRequest = mock(HttpServletRequest.class); + when(permittedRequest.getHeaders(anyString())).thenReturn(Collections.<String>emptyEnumeration()); + dMaapContext.setRequest(permittedRequest); + String metricsTopicName = "msgrtr.apinode.metrics.dmaap"; + givenConfiguredWithMocks(metricsTopicName); + doNothing().when(eventsService).respondOk(eq(dMaapContext), any(JSONObject.class)); + + //when + eventsService.pushEvents(dMaapContext, metricsTopicName, iStream, "5", "13:00:00"); + + //then + ArgumentCaptor<JSONObject> captor = ArgumentCaptor.forClass(JSONObject.class); + verify(publisher) + .sendBatchMessageNew(eq(metricsTopicName), Mockito.<ArrayList<ProducerRecord<String, String>>>any()); + verify(eventsService).respondOk(eq(dMaapContext), captor.capture()); + verify(permittedRequest, never()).isUserInRole(anyString()); + verify(createdTopic, never()).checkUserWrite(any(NsaSimpleApiKey.class)); + assertEquals(1, captor.getValue().getLong("count")); + } + + @Test + public void pushEvents_shouldNotPerformAnyAuthorization_whenTopicHasNoOwner() throws Exception { + //given + HttpServletRequest permittedRequest = mock(HttpServletRequest.class); + when(permittedRequest.getHeaders(anyString())).thenReturn(Collections.<String>emptyEnumeration()); + dMaapContext.setRequest(permittedRequest); + String topicName = "notEnforcedAafTopic"; + givenConfiguredWithMocks(topicName); + doNothing().when(eventsService).respondOk(eq(dMaapContext), any(JSONObject.class)); + when(createdTopic.getOwner()).thenReturn(null); + + //when + eventsService.pushEvents(dMaapContext, topicName, iStream, "5", "13:00:00"); + + //then + ArgumentCaptor<JSONObject> captor = ArgumentCaptor.forClass(JSONObject.class); + verify(publisher).sendBatchMessageNew(eq(topicName), Mockito.<ArrayList<ProducerRecord<String, String>>>any()); + verify(eventsService).respondOk(eq(dMaapContext), captor.capture()); + verify(permittedRequest, never()).isUserInRole(anyString()); + verify(createdTopic, never()).checkUserWrite(any(NsaSimpleApiKey.class)); + assertEquals(1, captor.getValue().getLong("count")); + } + +} diff --git a/src/test/java/org/onap/dmaap/mr/cambria/security/DMaaPAAFAuthenticatorImplTest.java b/src/test/java/org/onap/dmaap/mr/cambria/security/DMaaPAAFAuthenticatorImplTest.java deleted file mode 100644 index 7019a2b..0000000 --- a/src/test/java/org/onap/dmaap/mr/cambria/security/DMaaPAAFAuthenticatorImplTest.java +++ /dev/null @@ -1,83 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP Policy Engine - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - - package org.onap.dmaap.mr.cambria.security; - -import static org.junit.Assert.*; - -import javax.servlet.http.HttpServletRequest; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.springframework.mock.web.MockHttpServletRequest; - -import org.onap.dmaap.dmf.mr.CambriaApiException; -import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl; - - - -public class DMaaPAAFAuthenticatorImplTest { - - private MockHttpServletRequest request = null; - @Before - public void setUp() throws Exception { - //creating servlet object - request = new MockHttpServletRequest(); - request.setServerName("www.example.com"); - request.setRequestURI("/foo"); - request.setQueryString("param1=value1¶m"); - String url = request.getRequestURL() + "?" + request.getQueryString(); - - - } - - @After - public void tearDown() throws Exception { - } - - @Test - public void testAafAuthentication() { - - DMaaPAAFAuthenticatorImpl authenticator = new DMaaPAAFAuthenticatorImpl(); - authenticator.aafAuthentication(request, "admin"); - assertTrue(true); - - } - - - - /*@Test - public void testAafPermissionString() { - - DMaaPAAFAuthenticatorImpl authenticator = new DMaaPAAFAuthenticatorImpl(); - try { - authenticator.aafPermissionString("testTopic", "admin"); - } catch (CambriaApiException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - - assertTrue(true); - - }*/ - - -} diff --git a/src/test/java/org/onap/dmaap/mr/cambria/security/JUnitTestSuite.java b/src/test/java/org/onap/dmaap/mr/cambria/security/JUnitTestSuite.java index 60ae849..ea3f051 100644 --- a/src/test/java/org/onap/dmaap/mr/cambria/security/JUnitTestSuite.java +++ b/src/test/java/org/onap/dmaap/mr/cambria/security/JUnitTestSuite.java @@ -26,9 +26,10 @@ import org.junit.runner.RunWith; import org.junit.runners.Suite; import org.junit.runners.Suite.SuiteClasses; import org.apache.log4j.Logger; +import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImplTest; @RunWith(Suite.class) -@SuiteClasses({ DMaaPAAFAuthenticatorImplTest.class,DMaaPAuthenticatorImplTest.class, +@SuiteClasses({ DMaaPAAFAuthenticatorImplTest.class,DMaaPAuthenticatorImplTest.class, }) public class JUnitTestSuite { private static final Logger LOGGER = Logger.getLogger(JUnitTestSuite.class); diff --git a/src/test/java/org/onap/dmaap/mr/cambria/service/impl/EventsServiceImplTest.java b/src/test/java/org/onap/dmaap/mr/cambria/service/impl/EventsServiceImplTest.java deleted file mode 100644 index 1e677d8..0000000 --- a/src/test/java/org/onap/dmaap/mr/cambria/service/impl/EventsServiceImplTest.java +++ /dev/null @@ -1,312 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP Policy Engine - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - - package org.onap.dmaap.mr.cambria.service.impl; - -import static org.mockito.Mockito.when; -import static org.mockito.Matchers.anyString; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.io.ByteArrayInputStream; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.ConcurrentModificationException; -import java.util.Map; -import java.util.Properties; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.mock.web.MockHttpServletRequest; -import org.springframework.mock.web.MockHttpServletResponse; - -import com.att.ajsc.beans.PropertiesMapBean; -import com.att.ajsc.filemonitor.AJSCPropertiesMap; -import org.onap.dmaap.dmf.mr.CambriaApiException; -import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticator; -import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticator; -import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl; -import org.onap.dmaap.dmf.mr.utils.ConfigurationReader; -import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException; -import org.onap.dmaap.dmf.mr.beans.DMaaPCambriaLimiter; -import org.onap.dmaap.dmf.mr.backends.ConsumerFactory; -import org.onap.dmaap.dmf.mr.beans.DMaaPContext; -import org.onap.dmaap.dmf.mr.beans.DMaaPKafkaMetaBroker; -import org.onap.dmaap.dmf.mr.constants.CambriaConstants; -import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException; -import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages; -import org.onap.dmaap.dmf.mr.metabroker.Topic; -import org.onap.dmaap.dmf.mr.metabroker.Broker.TopicExistsException; -import org.onap.dmaap.dmf.mr.service.impl.EventsServiceImpl; -import org.onap.dmaap.dmf.mr.utils.PropertyReader; -import com.att.nsa.configs.ConfigDbException; -import com.att.nsa.drumlin.till.nv.rrNvReadable.invalidSettingValue; -import com.att.nsa.drumlin.till.nv.rrNvReadable.loadException; -import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; -import com.att.nsa.limits.Blacklist; -import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException; -import com.att.nsa.security.NsaApiKey; -import com.att.nsa.security.db.simple.NsaSimpleApiKey; - -import kafka.admin.AdminUtils; - -@RunWith(PowerMockRunner.class) -@PrepareForTest({ DMaaPAuthenticatorImpl.class, AJSCPropertiesMap.class }) -public class EventsServiceImplTest { - - private InputStream iStream = null; - DMaaPContext dMaapContext = new DMaaPContext(); - EventsServiceImpl service = new EventsServiceImpl(); - DMaaPErrorMessages pErrorMessages = new DMaaPErrorMessages(); - @Mock - ConfigurationReader configurationReader; - @Mock - Blacklist blacklist; - @Mock - DMaaPAuthenticator<NsaSimpleApiKey> dmaaPAuthenticator; - @Mock - DMaaPAAFAuthenticator dmaapAAFauthenticator; - @Mock - NsaApiKey user; - @Mock - NsaSimpleApiKey nsaSimpleApiKey; - @Mock - DMaaPKafkaMetaBroker dmaapKafkaMetaBroker; - @Mock - Topic createdTopic; - @Mock - ConsumerFactory factory; - - @Before - public void setUp() throws Exception { - MockitoAnnotations.initMocks(this); - String source = "source of my InputStream"; - iStream = new ByteArrayInputStream(source.getBytes("UTF-8")); - - MockHttpServletRequest request = new MockHttpServletRequest(); - MockHttpServletResponse response = new MockHttpServletResponse(); - dMaapContext.setRequest(request); - dMaapContext.setResponse(response); - when(blacklist.contains(anyString())).thenReturn(false); - when(configurationReader.getfIpBlackList()).thenReturn(blacklist); - dMaapContext.setConfigReader(configurationReader); - - service.setErrorMessages(pErrorMessages); - PowerMockito.mockStatic(AJSCPropertiesMap.class); - when(AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "timeout")).thenReturn("100"); - - AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "timeout"); - - } - - @Test(expected = CambriaApiException.class) - public void testGetEvents() throws DMaaPAccessDeniedException, CambriaApiException, ConfigDbException, - TopicExistsException, AccessDeniedException, UnavailableException, IOException { - when(dmaaPAuthenticator.authenticate(dMaapContext)).thenReturn(nsaSimpleApiKey); - when(configurationReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator); - when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker); - when(dmaapKafkaMetaBroker.getTopic("testTopic")).thenReturn(createdTopic); - PowerMockito.when(configurationReader.getfConsumerFactory()).thenReturn(factory); - service.getEvents(dMaapContext, "testTopic", "CG1", "23"); - } - - @Test(expected = CambriaApiException.class) - public void testGetEventsBlackListErr() throws DMaaPAccessDeniedException, CambriaApiException, ConfigDbException, - TopicExistsException, AccessDeniedException, UnavailableException, IOException { - when(blacklist.contains(anyString())).thenReturn(true); - when(configurationReader.getfIpBlackList()).thenReturn(blacklist); - dMaapContext.setConfigReader(configurationReader); - service.getEvents(dMaapContext, "testTopic", "CG1", "23"); - } - - @Test(expected = CambriaApiException.class) - public void testGetEventsNoTopicError() throws DMaaPAccessDeniedException, CambriaApiException, ConfigDbException, - TopicExistsException, AccessDeniedException, UnavailableException, IOException { - when(dmaaPAuthenticator.authenticate(dMaapContext)).thenReturn(nsaSimpleApiKey); - when(configurationReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator); - when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker); - when(dmaapKafkaMetaBroker.getTopic("testTopic")).thenReturn(null); - PowerMockito.when(configurationReader.getfConsumerFactory()).thenReturn(factory); - service.getEvents(dMaapContext, "testTopic", "CG1", "23"); - } - - @Test(expected = CambriaApiException.class) - public void testGetEventsuserNull() throws DMaaPAccessDeniedException, CambriaApiException, ConfigDbException, - TopicExistsException, AccessDeniedException, UnavailableException, IOException { - when(dmaaPAuthenticator.authenticate(dMaapContext)).thenReturn(null); - when(configurationReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator); - when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker); - when(dmaapKafkaMetaBroker.getTopic("testTopic")).thenReturn(createdTopic); - PowerMockito.when(configurationReader.getfConsumerFactory()).thenReturn(factory); - MockHttpServletRequest mockRequest = new MockHttpServletRequest(); - mockRequest.addHeader("Authorization", "passed"); - dMaapContext.setRequest(mockRequest); - dMaapContext.getRequest().getHeader("Authorization"); - service.getEvents(dMaapContext, "testTopic", "CG1", "23"); - } - - @Test(expected = CambriaApiException.class) - public void testGetEventsExcp2() throws DMaaPAccessDeniedException, CambriaApiException, ConfigDbException, - TopicExistsException, AccessDeniedException, UnavailableException, IOException { - when(dmaaPAuthenticator.authenticate(dMaapContext)).thenReturn(nsaSimpleApiKey); - when(configurationReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator); - when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker); - when(dmaapKafkaMetaBroker.getTopic("testTopic")).thenReturn(createdTopic); - PowerMockito.when(configurationReader.getfConsumerFactory()).thenReturn(factory); - when(configurationReader.getfRateLimiter()).thenThrow(new ConcurrentModificationException("Error occurred")); - service.getEvents(dMaapContext, "testTopic", "CG1", "23"); - } - - @Test(expected = CambriaApiException.class) - public void testPushEvents() throws DMaaPAccessDeniedException, CambriaApiException, ConfigDbException, - TopicExistsException, AccessDeniedException, UnavailableException, IOException, missingReqdSetting, - invalidSettingValue, loadException { - - // AdminUtils.createTopic(configurationReader.getZk(), "testTopic", 10, - // 1, new Properties()); - - configurationReader.setfRateLimiter(new DMaaPCambriaLimiter(new PropertyReader())); - - when(dmaaPAuthenticator.authenticate(dMaapContext)).thenReturn(nsaSimpleApiKey); - when(configurationReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator); - when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker); - when(dmaapKafkaMetaBroker.getTopic("testTopic")).thenReturn(createdTopic); - PowerMockito.when(configurationReader.getfConsumerFactory()).thenReturn(factory); - - service.pushEvents(dMaapContext, "testTopic", iStream, "3", "12:00:00"); - - service.getEvents(dMaapContext, "testTopic", "CG1", "23"); - - /* - * String trueValue = "True"; - * assertTrue(trueValue.equalsIgnoreCase("True")); - */ - - } - - @Test(expected = CambriaApiException.class) - public void testPushEventsBlackListedIp() throws DMaaPAccessDeniedException, CambriaApiException, ConfigDbException, - TopicExistsException, AccessDeniedException, UnavailableException, IOException, missingReqdSetting, - invalidSettingValue, loadException { - - // AdminUtils.createTopic(configurationReader.getZk(), "testTopic", 10, - // 1, new Properties()); - when(blacklist.contains(anyString())).thenReturn(true); - when(configurationReader.getfIpBlackList()).thenReturn(blacklist); - configurationReader.setfRateLimiter(new DMaaPCambriaLimiter(new PropertyReader())); - when(dmaaPAuthenticator.authenticate(dMaapContext)).thenReturn(nsaSimpleApiKey); - when(configurationReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator); - when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker); - when(dmaapKafkaMetaBroker.getTopic("testTopic")).thenReturn(createdTopic); - PowerMockito.when(configurationReader.getfConsumerFactory()).thenReturn(factory); - - service.pushEvents(dMaapContext, "testTopic", iStream, "3", "12:00:00"); - - } - - @Test(expected = NullPointerException.class) - public void testPushEventsNoUser() throws DMaaPAccessDeniedException, CambriaApiException, ConfigDbException, - TopicExistsException, AccessDeniedException, UnavailableException, IOException, missingReqdSetting, - invalidSettingValue, loadException { - - configurationReader.setfRateLimiter(new DMaaPCambriaLimiter(new PropertyReader())); - - when(dmaaPAuthenticator.authenticate(dMaapContext)).thenReturn(null); - when(configurationReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator); - when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker); - when(dmaapKafkaMetaBroker.getTopic("testTopic")).thenReturn(createdTopic); - PowerMockito.when(configurationReader.getfConsumerFactory()).thenReturn(factory); - MockHttpServletRequest mockRequest = new MockHttpServletRequest(); - mockRequest.addHeader("Authorization", "passed"); - mockRequest.addHeader("Authorization", "passed"); - dMaapContext.setRequest(mockRequest); - dMaapContext.getRequest().getHeader("Authorization"); - service.pushEvents(dMaapContext, "testTopic", iStream, "3", "12:00:00"); - - } - - @Test(expected = CambriaApiException.class) - public void testPushEventsWtTransaction() throws DMaaPAccessDeniedException, CambriaApiException, ConfigDbException, - TopicExistsException, AccessDeniedException, UnavailableException, IOException, missingReqdSetting, - invalidSettingValue, loadException { - - configurationReader.setfRateLimiter(new DMaaPCambriaLimiter(new PropertyReader())); - - when(dmaaPAuthenticator.authenticate(dMaapContext)).thenReturn(nsaSimpleApiKey); - when(configurationReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator); - when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker); - when(dmaapKafkaMetaBroker.getTopic("testTopic")).thenReturn(createdTopic); - PowerMockito.when(configurationReader.getfConsumerFactory()).thenReturn(factory); - when(AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "transidUEBtopicreqd")).thenReturn("true"); - - service.pushEvents(dMaapContext, "testTopic", iStream, "3", "12:00:00"); - - } - - @Test(expected = CambriaApiException.class) - public void testPushEventsWtTransactionError() throws DMaaPAccessDeniedException, CambriaApiException, ConfigDbException, - TopicExistsException, AccessDeniedException, UnavailableException, IOException, missingReqdSetting, - invalidSettingValue, loadException { - - configurationReader.setfRateLimiter(new DMaaPCambriaLimiter(new PropertyReader())); - - when(dmaaPAuthenticator.authenticate(dMaapContext)).thenReturn(nsaSimpleApiKey); - when(configurationReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator); - when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker); - when(dmaapKafkaMetaBroker.getTopic("testTopic")).thenReturn(createdTopic); - PowerMockito.when(configurationReader.getfConsumerFactory()).thenReturn(factory); - when(AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "transidUEBtopicreqd")).thenReturn("true"); - when(AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "event.batch.length")).thenReturn("0"); - when(configurationReader.getfPublisher()).thenThrow(new ConcurrentModificationException("Error occurred")); - - service.pushEvents(dMaapContext, "testTopic", iStream, "3", "12:00:00"); - - } - - @Test - public void testIsTransEnabled1() { - - when(AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, - "transidUEBtopicreqd")).thenReturn("true"); - assertTrue(service.isTransEnabled()); - - } - @Test - public void testIsTransEnabled2() { - - when(AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, - "transidUEBtopicreqd")).thenReturn("false"); - assertFalse(service.isTransEnabled()); - - } - -} diff --git a/src/test/java/org/onap/dmaap/mr/cambria/service/impl/JUnitTestSuite.java b/src/test/java/org/onap/dmaap/mr/cambria/service/impl/JUnitTestSuite.java index ec4b0e2..7536127 100644 --- a/src/test/java/org/onap/dmaap/mr/cambria/service/impl/JUnitTestSuite.java +++ b/src/test/java/org/onap/dmaap/mr/cambria/service/impl/JUnitTestSuite.java @@ -25,12 +25,13 @@ import org.junit.runner.RunWith; import org.junit.runners.Suite; import org.junit.runners.Suite.SuiteClasses; import org.apache.log4j.Logger; +import org.onap.dmaap.dmf.mr.service.impl.EventsServiceImplTest; import org.onap.dmaap.dmf.mr.service.impl.TopicServiceImplTest; @RunWith(Suite.class) @SuiteClasses({ UIServiceImplTest.class, AdminServiceImplemTest.class, ApiKeysServiceImplTest.class, ShowConsumerCacheTest.class,TopicServiceImplTest.class, TransactionServiceImplTest.class, MMServiceImplTest.class, - BaseTransactionDbImplTest.class, MetricsServiceImplTest.class,EventsServiceImplTest.class}) + BaseTransactionDbImplTest.class, MetricsServiceImplTest.class, EventsServiceImplTest.class}) public class JUnitTestSuite { private static final Logger LOGGER = Logger.getLogger(JUnitTestSuite.class); |