diff options
Diffstat (limited to 'src/main/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImpl.java')
-rw-r--r-- | src/main/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImpl.java | 510 |
1 files changed, 202 insertions, 308 deletions
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImpl.java b/src/main/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImpl.java index 11c544f..ec5bfc0 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImpl.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImpl.java @@ -21,34 +21,32 @@ *******************************************************************************/ package org.onap.dmaap.dmf.mr.service.impl; +import com.att.ajsc.filemonitor.AJSCPropertiesMap; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import com.att.nsa.configs.ConfigDbException; +import com.att.nsa.drumlin.service.standards.MimeTypes; +import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; +import com.att.nsa.security.NsaApiKey; +import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException; +import com.att.nsa.util.rrConvertor; import java.io.IOException; import java.io.InputStream; import java.net.InetAddress; import java.net.UnknownHostException; import java.text.SimpleDateFormat; import java.util.ArrayList; -import java.util.Arrays; +import java.util.ConcurrentModificationException; import java.util.Date; -import java.util.HashMap; import java.util.LinkedList; -import java.util.Properties; - import javax.servlet.http.HttpServletRequest; import javax.ws.rs.core.MediaType; - +import org.apache.commons.lang.math.NumberUtils; import org.apache.http.HttpStatus; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.errors.TopicExistsException; import org.json.JSONObject; import org.json.JSONTokener; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.stereotype.Service; - -import com.att.ajsc.filemonitor.AJSCPropertiesMap; import org.onap.dmaap.dmf.mr.CambriaApiException; import org.onap.dmaap.dmf.mr.backends.Consumer; import org.onap.dmaap.dmf.mr.backends.ConsumerFactory; @@ -56,7 +54,6 @@ import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException; import org.onap.dmaap.dmf.mr.backends.MetricsSet; import org.onap.dmaap.dmf.mr.backends.Publisher; import org.onap.dmaap.dmf.mr.backends.Publisher.message; -import org.onap.dmaap.dmf.mr.backends.kafka.KafkaLiveLockAvoider2; import org.onap.dmaap.dmf.mr.beans.DMaaPCambriaLimiter; import org.onap.dmaap.dmf.mr.beans.DMaaPContext; import org.onap.dmaap.dmf.mr.beans.LogDetails; @@ -65,7 +62,6 @@ import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException; import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages; import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode; import org.onap.dmaap.dmf.mr.exception.ErrorResponse; - import org.onap.dmaap.dmf.mr.metabroker.Topic; import org.onap.dmaap.dmf.mr.resources.CambriaEventSet; import org.onap.dmaap.dmf.mr.resources.CambriaOutboundEventStream; @@ -74,15 +70,10 @@ import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl; import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl; import org.onap.dmaap.dmf.mr.service.EventsService; import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder; +import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder.StreamWriter; import org.onap.dmaap.dmf.mr.utils.Utils; -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; -import com.att.nsa.configs.ConfigDbException; -import com.att.nsa.drumlin.service.standards.MimeTypes; -import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; -import com.att.nsa.security.NsaApiKey; -import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException; -import com.att.nsa.util.rrConvertor; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; /** * This class provides the functinality to publish and subscribe message to @@ -93,20 +84,20 @@ import com.att.nsa.util.rrConvertor; */ @Service public class EventsServiceImpl implements EventsService { - // private static final Logger LOG = private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventsServiceImpl.class); - private static final String BATCH_LENGTH = "event.batch.length"; private static final String TRANSFER_ENCODING = "Transfer-Encoding"; + private static final String TIMEOUT_PROPERTY = "timeout"; + private static final String SUBSCRIBE_ACTION = "sub"; + private static final String PUBLISH_ACTION = "pub"; + @Autowired private DMaaPErrorMessages errorMessages; - - //@Autowired - - // @Value("${metrics.send.cambria.topic}") - + String getPropertyFromAJSCmap(String propertyKey) { + return AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, propertyKey); + } public DMaaPErrorMessages getErrorMessages() { return errorMessages; @@ -129,222 +120,91 @@ public class EventsServiceImpl implements EventsService { */ @Override public void getEvents(DMaaPContext ctx, String topic, String consumerGroup, String clientId) - throws ConfigDbException, TopicExistsException, AccessDeniedException, UnavailableException, - CambriaApiException, IOException, DMaaPAccessDeniedException { + throws ConfigDbException, AccessDeniedException, UnavailableException, + CambriaApiException, IOException { + final long startTime = System.currentTimeMillis(); final HttpServletRequest req = ctx.getRequest(); - - boolean isAAFTopic = false; - // was this host blacklisted? - final String remoteAddr = Utils.getRemoteAddress(ctx); - if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) { - - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, - DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), - "Source address [" + remoteAddr + "] is blacklisted. Please contact the cluster management team.", - null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()), - ctx.getRequest().getRemoteHost(), null, null); - LOG.info(errRes.toString()); - throw new CambriaApiException(errRes); - } - - int limit = CambriaConstants.kNoLimit; - if (req.getParameter("limit") != null) { - limit = Integer.parseInt(req.getParameter("limit")); - } - - int timeoutMs = CambriaConstants.kNoTimeout; - String strtimeoutMS = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "timeout"); - if (strtimeoutMS != null) - timeoutMs = Integer.parseInt(strtimeoutMS); - // int timeoutMs = ctx.getConfigReader().getSettings().getInt("timeout", - - if (req.getParameter("timeout") != null) { - timeoutMs = Integer.parseInt(req.getParameter("timeout")); - } - - // By default no filter is applied if filter is not passed as a - // parameter in the request URI - String topicFilter = CambriaConstants.kNoFilter; - if (null != req.getParameter("filter")) { - topicFilter = req.getParameter("filter"); - } - // pretty to print the messaages in new line - String prettyval = "0"; - String strPretty = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "pretty"); - if (null != strPretty) - prettyval = strPretty; - - String metaval = "0"; - String strmeta = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "meta"); - if (null != strmeta) - metaval = strmeta; - - final boolean pretty = rrConvertor.convertToBooleanBroad(prettyval); - // withMeta to print offset along with message - final boolean withMeta = rrConvertor.convertToBooleanBroad(metaval); - final LogWrap logger = new LogWrap(topic, consumerGroup, clientId); - logger.info("fetch: timeout=" + timeoutMs + ", limit=" + limit + ", filter=" + topicFilter + " from Remote host "+ctx.getRequest().getRemoteHost()); + final String remoteHost = req.getRemoteHost(); + ErrorResponseProvider errRespProvider = new ErrorResponseProvider.Builder().withErrorMessages(errorMessages) + .withTopic(topic).withConsumerGroup(consumerGroup).withClient(clientId).withRemoteHost(remoteHost).build(); - // is this user allowed to read this topic? - final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx); - final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic); + validateIpBlacklist(errRespProvider, ctx); - if (metatopic == null) { - // no such topic. - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, - DMaaPResponseCode.RESOURCE_NOT_FOUND.getResponseCode(), - errorMessages.getTopicNotExist() + "-[" + topic + "]", null, Utils.getFormattedDate(new Date()), - topic, null, null, consumerGroup + "/" + clientId, ctx.getRequest().getRemoteHost()); - LOG.info(errRes.toString()); - throw new CambriaApiException(errRes); - } - String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, - "metrics.send.cambria.topic"); - if (null == metricTopicname) - metricTopicname = "msgrtr.apinode.metrics.dmaap"; - - boolean topicNameEnforced = false; - String topicNameStd = null; - topicNameStd = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, - "enforced.topic.name.AAF"); - if (null != topicNameStd && topic.startsWith(topicNameStd)) { - topicNameEnforced = true; + final Topic metaTopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic); + if (metaTopic == null) { + throw new CambriaApiException(errRespProvider.getTopicNotFoundError()); } - if (null == ctx.getRequest().getHeader("Authorization") && !topic.equalsIgnoreCase(metricTopicname)) { - if (null != metatopic.getOwner() && !("".equals(metatopic.getOwner()))) { - // check permissions - metatopic.checkUserRead(user); - } - } - // if headers are not provided then user will be null - if (topicNameEnforced ||(user == null && null != ctx.getRequest().getHeader("Authorization"))) { - // the topic name will be sent by the client - - DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); - String permission = aaf.aafPermissionString(topic, "sub"); - if (!aaf.aafAuthentication(ctx.getRequest(), permission)) { - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, - DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), - errorMessages.getNotPermitted1() + " read " + errorMessages.getNotPermitted2() + topic + " on " - + permission, - null, Utils.getFormattedDate(new Date()), topic, null, null, consumerGroup + "/" + clientId, - ctx.getRequest().getRemoteHost()); - LOG.info(errRes.toString()); - throw new DMaaPAccessDeniedException(errRes); + boolean isAAFTopic = authorizeClientWhenNeeded(ctx, metaTopic, topic, errRespProvider, SUBSCRIBE_ACTION); - } - isAAFTopic = true; - } final long elapsedMs1 = System.currentTimeMillis() - startTime; logger.info("Time taken in getEvents Authorization " + elapsedMs1 + " ms for " + topic + " " + consumerGroup + " " + clientId); - Consumer c = null; - - String lhostId = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, - "clusterhostid"); - if (null == lhostId) { - try { - lhostId = InetAddress.getLocalHost().getCanonicalHostName(); - } catch (UnknownHostException e) { - LOG.info("Unknown Host Exception error occured while getting getting hostid"); - } - } - CambriaOutboundEventStream coes = null; + verifyHostId(); + final boolean pretty = isPrettyPrintEnabled(); + final boolean withMeta = isMetaOffsetEnabled(); + int timeoutMs = getMessageTimeout(req); + int limit = getMessageLimit(req); + String topicFilter = (null != req.getParameter("filter")) ? req.getParameter("filter") : CambriaConstants.kNoFilter; + logger.info("fetch: timeout=" + timeoutMs + ", limit=" + limit + ", filter=" + topicFilter + " from Remote host "+ctx.getRequest().getRemoteHost()); + + Consumer consumer = null; try { final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics(); final DMaaPCambriaLimiter rl = ctx.getConfigReader().getfRateLimiter(); - rl.onCall(topic, consumerGroup, clientId, ctx.getRequest().getRemoteHost()); - c = ctx.getConfigReader().getfConsumerFactory().getConsumerFor(topic, consumerGroup, clientId, timeoutMs, - ctx.getRequest().getRemoteHost()); - coes = new CambriaOutboundEventStream.Builder(c).timeout(timeoutMs) + rl.onCall(topic, consumerGroup, clientId, remoteHost); + consumer = ctx.getConfigReader().getfConsumerFactory().getConsumerFor(topic, consumerGroup, clientId, timeoutMs, + remoteHost); + CambriaOutboundEventStream coes = new CambriaOutboundEventStream.Builder(consumer).timeout(timeoutMs) .limit(limit).filter(topicFilter).pretty(pretty).withMeta(withMeta).build(); coes.setDmaapContext(ctx); - coes.setTopic(metatopic); - if (isTransEnabled() || isAAFTopic) { - coes.setTransEnabled(true); - } else { - coes.setTransEnabled(false); - } + coes.setTopic(metaTopic); + coes.setTransEnabled(isTransEnabled() || isAAFTopic); coes.setTopicStyle(isAAFTopic); final long elapsedMs2 = System.currentTimeMillis() - startTime; logger.info("Time taken in getEvents getConsumerFor " + elapsedMs2 + " ms for " + topic + " " + consumerGroup + " " + clientId); - DMaaPResponseBuilder.setNoCacheHeadings(ctx); - - DMaaPResponseBuilder.respondOkWithStream(ctx, MediaType.APPLICATION_JSON, coes); + respondOkWithStream(ctx, coes); // No IOException thrown during respondOkWithStream, so commit the // new offsets to all the brokers - c.commitOffsets(); + consumer.commitOffsets(); final int sent = coes.getSentCount(); - - metricsSet.consumeTick(sent); - rl.onSend(topic, consumerGroup, clientId, sent); + metricsSet.consumeTick(sent); + rl.onSend(topic, consumerGroup, clientId, sent); final long elapsedMs = System.currentTimeMillis() - startTime; - logger.info("Sent " + sent + " msgs in " + elapsedMs + " ms; committed to offset " + c.getOffset() + " for " + logger.info("Sent " + sent + " msgs in " + elapsedMs + " ms; committed to offset " + consumer.getOffset() + " for " + topic + " " + consumerGroup + " " + clientId + " on to the server " - + ctx.getRequest().getRemoteHost()); + + remoteHost); } catch (UnavailableException excp) { logger.warn(excp.getMessage(), excp); - - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE, - DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(), - errorMessages.getServerUnav() + excp.getMessage(), null, Utils.getFormattedDate(new Date()), topic, - null, null, consumerGroup + "-" + clientId, ctx.getRequest().getRemoteHost()); + ErrorResponse errRes = errRespProvider.getServiceUnavailableError(excp.getMessage()); LOG.info(errRes.toString()); throw new CambriaApiException(errRes); - } catch (java.util.ConcurrentModificationException excp1) { - LOG.info(excp1.getMessage() + "on " + topic + " " + consumerGroup + " ****** " + clientId + " from Remote"+ctx.getRequest().getRemoteHost()); - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT, - DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(), - "Couldn't respond to client, possible of consumer requests from more than one server. Please contact MR team if you see this issue occurs continously", null, - Utils.getFormattedDate(new Date()), topic, null, null, clientId, ctx.getRequest().getRemoteHost()); + } catch (ConcurrentModificationException excp1) { + LOG.info(excp1.getMessage() + "on " + topic + " " + consumerGroup + " ****** " + clientId + " from Remote"+remoteHost); + ErrorResponse errRes = errRespProvider.getConcurrentModificationError(); logger.info(errRes.toString()); throw new CambriaApiException(errRes); - } catch (CambriaApiException excp) { - LOG.info(excp.getMessage() + "on " + topic + " " + consumerGroup + " ****** " + clientId); - - throw excp; - } - catch (Exception excp) { - // System.out.println(excp + "------------------ " + topic+" - // "+consumerGroup+" "+clientId); - + } catch (Exception excp) { logger.info("Couldn't respond to client, closing cambria consumer " + " " + topic + " " + consumerGroup + " " + clientId + " " + HttpStatus.SC_SERVICE_UNAVAILABLE + " ****** " + excp); - - ctx.getConfigReader().getfConsumerFactory().destroyConsumer(topic, consumerGroup, clientId); - - - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE, - DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(), - "Couldn't respond to client, closing cambria consumer" + excp.getMessage(), null, - Utils.getFormattedDate(new Date()), topic, null, null, clientId, ctx.getRequest().getRemoteHost()); + ctx.getConfigReader().getfConsumerFactory().destroyConsumer(topic, consumerGroup, clientId); + ErrorResponse errRes = errRespProvider.getGenericError(excp.getMessage()); logger.info(errRes.toString()); throw new CambriaApiException(errRes); } finally { - coes = null; - // If no cache, close the consumer now that we're done with it. - boolean kSetting_EnableCache = ConsumerFactory.kDefault_IsCacheEnabled; - String strkSetting_EnableCache = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, - ConsumerFactory.kSetting_EnableCache); - if (null != strkSetting_EnableCache) - kSetting_EnableCache = Boolean.parseBoolean(strkSetting_EnableCache); - // if - // (!ctx.getConfigReader().getSettings().getBoolean(ConsumerFactory.kSetting_EnableCache, - // ConsumerFactory.kDefault_IsCacheEnabled) && (c != null)) { - if (!kSetting_EnableCache && (c != null)) { + if (consumer != null && !isCacheEnabled()) { try { - c.close(); + consumer.close(); } catch (Exception e) { - logger.info("***Exception occured in getEvents finaly block while closing the consumer " + " " + logger.info("***Exception occurred in getEvents finally block while closing the consumer " + " " + topic + " " + consumerGroup + " " + clientId + " " + HttpStatus.SC_SERVICE_UNAVAILABLE + " " + e); } @@ -352,117 +212,155 @@ public class EventsServiceImpl implements EventsService { } } - /** - * @throws missingReqdSetting - * - */ - @Override - public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition, - final String requestTime) throws ConfigDbException, AccessDeniedException, TopicExistsException, - CambriaApiException, IOException, missingReqdSetting, DMaaPAccessDeniedException { - - // is this user allowed to write to this topic? - final long startMs = System.currentTimeMillis(); - final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx); - final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic); - boolean isAAFTopic = false; - - // was this host blacklisted? + private void validateIpBlacklist(ErrorResponseProvider errResponseProvider, DMaaPContext ctx) throws CambriaApiException { final String remoteAddr = Utils.getRemoteAddress(ctx); - if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) { - - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, - DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), - "Source address [" + remoteAddr + "] is blacklisted. Please contact the cluster management team.", - null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()), - ctx.getRequest().getRemoteHost(), null, null); + ErrorResponse errRes = errResponseProvider.getIpBlacklistedError(remoteAddr); LOG.info(errRes.toString()); throw new CambriaApiException(errRes); } + } - String topicNameStd = null; + private boolean authorizeClientWhenNeeded(DMaaPContext ctx, Topic metaTopic, String topicName, + ErrorResponseProvider errRespProvider, String action) throws CambriaApiException, AccessDeniedException { - // topicNameStd= - - topicNameStd = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, - "enforced.topic.name.AAF"); - String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, - "metrics.send.cambria.topic"); - if (null == metricTopicname) - metricTopicname = "msgrtr.apinode.metrics.dmaap"; - boolean topicNameEnforced = false; - if (null != topicNameStd && topic.startsWith(topicNameStd)) { - topicNameEnforced = true; + boolean isAAFTopic = false; + String metricTopicName = getMetricTopicName(); + if(!metricTopicName.equalsIgnoreCase(topicName)) { + if(isCadiEnabled() && isTopicNameEnforcedAaf(topicName)) { + isAAFTopic = true; + DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); + String permission = aaf.aafPermissionString(topicName, action); + if (!aaf.aafAuthentication(ctx.getRequest(), permission)) { + ErrorResponse errRes = errRespProvider.getAafAuthorizationError(permission, action); + LOG.info(errRes.toString()); + throw new DMaaPAccessDeniedException(errRes); + + } + } else if( null != metaTopic.getOwner() && !metaTopic.getOwner().isEmpty()) { + final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx); + if(SUBSCRIBE_ACTION.equals(action)) { + metaTopic.checkUserRead(user); + } else if(PUBLISH_ACTION.equals(action)) { + metaTopic.checkUserWrite(user); + } + } } + return isAAFTopic; + } + + boolean isCadiEnabled() { + return Utils.isCadiEnabled(); + } - // Here check if the user has rights to publish on the topic - // ( This will be called when no auth is added or when UEB API Key - // Authentication is used) - // checkUserWrite(user) method will throw an error when there is no Auth - // header added or when the - // user has no publish rights + void respondOkWithStream(DMaaPContext ctx, StreamWriter coes) throws IOException{ + DMaaPResponseBuilder.setNoCacheHeadings(ctx); + DMaaPResponseBuilder.respondOkWithStream(ctx, MediaType.APPLICATION_JSON, coes); + } - if (null != metatopic && null != metatopic.getOwner() && !("".equals(metatopic.getOwner())) - && null == ctx.getRequest().getHeader("Authorization") && !topic.equalsIgnoreCase(metricTopicname)) { - metatopic.checkUserWrite(user); - } + private int getMessageLimit(HttpServletRequest request) { + return NumberUtils.toInt(request.getParameter("limit"), CambriaConstants.kNoLimit); + } - // if headers are not provided then user will be null - if (topicNameEnforced || (user == null && null != ctx.getRequest().getHeader("Authorization") - && !topic.equalsIgnoreCase(metricTopicname))) { - // the topic name will be sent by the client - - DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); - String permission = aaf.aafPermissionString(topic, "pub"); - if (!aaf.aafAuthentication(ctx.getRequest(), permission)) { - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, - DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), - errorMessages.getNotPermitted1() + " publish " + errorMessages.getNotPermitted2() + topic - + " on " + permission, - null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()), - ctx.getRequest().getRemoteHost(), null, null); - LOG.info(errRes.toString()); - throw new DMaaPAccessDeniedException(errRes); + private int getMessageTimeout(HttpServletRequest request) { + String timeoutMsAsString = getPropertyFromAJSCmap(TIMEOUT_PROPERTY); + int defaultTimeoutMs = timeoutMsAsString!=null ? NumberUtils.toInt(timeoutMsAsString, CambriaConstants.kNoTimeout) : + CambriaConstants.kNoTimeout; + + String timeoutProperty = request.getParameter(TIMEOUT_PROPERTY); + return timeoutProperty != null ? NumberUtils.toInt(timeoutProperty, defaultTimeoutMs) : defaultTimeoutMs; + } + + private boolean isPrettyPrintEnabled() { + return rrConvertor.convertToBooleanBroad(getPropertyFromAJSCmap("pretty")); + } + + private boolean isMetaOffsetEnabled() { + return rrConvertor.convertToBooleanBroad(getPropertyFromAJSCmap( "meta")); + } + + private boolean isTopicNameEnforcedAaf(String topicName) { + String topicNameStd = getPropertyFromAJSCmap("enforced.topic.name.AAF"); + return !topicNameStd.isEmpty() && topicName.startsWith(topicNameStd); + } + + private boolean isCacheEnabled() { + String cachePropsSetting = getPropertyFromAJSCmap(ConsumerFactory.kSetting_EnableCache); + return !cachePropsSetting.isEmpty() ? Boolean.parseBoolean(cachePropsSetting) : ConsumerFactory.kDefault_IsCacheEnabled; + } + + private void verifyHostId() { + String lhostId = getPropertyFromAJSCmap("clusterhostid"); + if (lhostId.isEmpty()) { + try { + InetAddress.getLocalHost().getCanonicalHostName(); + } catch (UnknownHostException e) { + LOG.warn("Unknown Host Exception error occurred while getting getting hostid", e); } - isAAFTopic = true; + } + } - final HttpServletRequest req = ctx.getRequest(); + private String getMetricTopicName() { + String metricTopicFromProps = getPropertyFromAJSCmap("metrics.send.cambria.topic"); + return !metricTopicFromProps.isEmpty() ? metricTopicFromProps : "msgrtr.apinode.metrics.dmaap"; + } - // check for chunked input - boolean chunked = false; - if (null != req.getHeader(TRANSFER_ENCODING)) { - chunked = req.getHeader(TRANSFER_ENCODING).contains("chunked"); - } - // get the media type, or set it to a generic value if it wasn't - // provided - String mediaType = req.getContentType(); - if (mediaType == null || mediaType.length() == 0) { - mediaType = MimeTypes.kAppGenericBinary; - } + /** + * @throws missingReqdSetting + * + */ + @Override + public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition, + final String requestTime) throws ConfigDbException, AccessDeniedException, + CambriaApiException, IOException, missingReqdSetting { - if (mediaType.contains("charset=UTF-8")) { - mediaType = mediaType.replace("; charset=UTF-8", "").trim(); - } + final long startMs = System.currentTimeMillis(); + String remoteHost = ctx.getRequest().getRemoteHost(); + ErrorResponseProvider errRespProvider = new ErrorResponseProvider.Builder().withErrorMessages(errorMessages) + .withTopic(topic).withRemoteHost(remoteHost).withPublisherIp(remoteHost) + .withPublisherId(Utils.getUserApiKey(ctx.getRequest())).build(); - String istransidUEBtopicreqd = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, - "transidUEBtopicreqd"); - boolean istransidreqd = false; - if (null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true")) { - istransidreqd = true; + validateIpBlacklist(errRespProvider, ctx); + + final Topic metaTopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic); + if (metaTopic == null) { + throw new CambriaApiException(errRespProvider.getTopicNotFoundError()); } - if (isAAFTopic || istransidreqd) { + final boolean isAAFTopic = authorizeClientWhenNeeded(ctx, metaTopic, topic, errRespProvider, PUBLISH_ACTION); + + final HttpServletRequest req = ctx.getRequest(); + boolean chunked = isRequestedChunk(req); + String mediaType = getMediaType(req); + boolean transactionRequired = isTransactionIdRequired(); + + if (isAAFTopic || transactionRequired) { pushEventsWithTransaction(ctx, msg, topic, defaultPartition, requestTime, chunked, mediaType); } else { pushEvents(ctx, topic, msg, defaultPartition, chunked, mediaType); } + final long endMs = System.currentTimeMillis(); final long totalMs = endMs - startMs; - LOG.info("Overall Response time - Published " + " msgs in " + totalMs + " ms for topic " + topic); + } + + private boolean isRequestedChunk(HttpServletRequest request) { + return null != request.getHeader(TRANSFER_ENCODING) && + request.getHeader(TRANSFER_ENCODING).contains("chunked"); + } + + private String getMediaType(HttpServletRequest request) { + String mediaType = request.getContentType(); + if (mediaType == null || mediaType.length() == 0) { + return MimeTypes.kAppGenericBinary; + } + return mediaType.replace("; charset=UTF-8", "").trim(); + } + private boolean isTransactionIdRequired() { + return getPropertyFromAJSCmap("transidUEBtopicreqd").equalsIgnoreCase("true"); } /** @@ -481,7 +379,7 @@ public class EventsServiceImpl implements EventsService { */ private void pushEvents(DMaaPContext ctx, String topic, InputStream msg, String defaultPartition, boolean chunked, String mediaType) - throws ConfigDbException, AccessDeniedException, TopicExistsException, CambriaApiException, IOException { + throws ConfigDbException, AccessDeniedException, CambriaApiException, IOException { final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics(); // setup the event set final CambriaEventSet events = new CambriaEventSet(mediaType, msg, chunked, defaultPartition); @@ -490,8 +388,8 @@ public class EventsServiceImpl implements EventsService { final long startMs = System.currentTimeMillis(); long count = 0; long maxEventBatch = 1024L* 16; - String batchlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH); - if (null != batchlen) + String batchlen = getPropertyFromAJSCmap( BATCH_LENGTH); + if (null != batchlen && !batchlen.isEmpty()) maxEventBatch = Long.parseLong(batchlen); // long maxEventBatch = @@ -550,7 +448,7 @@ public class EventsServiceImpl implements EventsService { final JSONObject response = new JSONObject(); response.put("count", count); response.put("serverTimeMs", totalMs); - DMaaPResponseBuilder.respondOk(ctx, response); + respondOk(ctx, response); } catch (Exception excp) { int status = HttpStatus.SC_NOT_FOUND; @@ -590,7 +488,7 @@ public class EventsServiceImpl implements EventsService { */ private void pushEventsWithTransaction(DMaaPContext ctx, InputStream inputStream, final String topic, final String partitionKey, final String requestTime, final boolean chunked, final String mediaType) - throws ConfigDbException, AccessDeniedException, TopicExistsException, IOException, CambriaApiException { + throws ConfigDbException, AccessDeniedException, IOException, CambriaApiException { final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics(); @@ -601,8 +499,8 @@ public class EventsServiceImpl implements EventsService { final long startMs = System.currentTimeMillis(); long count = 0; long maxEventBatch = 1024L * 16; - String evenlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH); - if (null != evenlen) + String evenlen = getPropertyFromAJSCmap( BATCH_LENGTH); + if (null != evenlen && !evenlen.isEmpty()) maxEventBatch = Long.parseLong(evenlen); // final long maxEventBatch = @@ -639,9 +537,9 @@ public class EventsServiceImpl implements EventsService { responseTransactionId = m.getLogDetails().getTransactionId(); - JSONObject jsonObject = new JSONObject(); - jsonObject.put("msgWrapMR", m.getMessage()); - jsonObject.put("transactionId", responseTransactionId); + //JSONObject jsonObject = new JSONObject(); + //jsonObject.put("msgWrapMR", m.getMessage()); + //jsonObject.put("transactionId", responseTransactionId); // final KeyedMessage<String, String> data = new // KeyedMessage<String, String>(topic, m.getKey(), @@ -755,8 +653,9 @@ public class EventsServiceImpl implements EventsService { // build a response final JSONObject response = new JSONObject(); response.put("count", count); + response.put("transactionId", responseTransactionId); response.put("serverTimeMs", totalMs); - DMaaPResponseBuilder.respondOk(ctx, response); + respondOk(ctx, response); } catch (Exception excp) { int status = HttpStatus.SC_NOT_FOUND; @@ -798,6 +697,10 @@ public class EventsServiceImpl implements EventsService { msg.setLogDetails(logDetails); } + void respondOk(DMaaPContext ctx, JSONObject response) throws IOException { + DMaaPResponseBuilder.respondOk(ctx, response); + } + /** * * @author anowarul.islam @@ -837,8 +740,7 @@ public class EventsServiceImpl implements EventsService { } public boolean isTransEnabled() { - String istransidUEBtopicreqd = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, - "transidUEBtopicreqd"); + String istransidUEBtopicreqd = getPropertyFromAJSCmap("transidUEBtopicreqd"); boolean istransidreqd = false; if ((null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true"))) { istransidreqd = true; @@ -863,13 +765,5 @@ public class EventsServiceImpl implements EventsService { return logDetails; } - - - - - - - - }
\ No newline at end of file |