diff options
Diffstat (limited to 'src/main/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImpl.java')
-rw-r--r-- | src/main/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImpl.java | 768 |
1 files changed, 768 insertions, 0 deletions
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImpl.java b/src/main/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImpl.java new file mode 100644 index 0000000..9f35812 --- /dev/null +++ b/src/main/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImpl.java @@ -0,0 +1,768 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package org.onap.dmaap.dmf.mr.service.impl; + +import com.att.ajsc.filemonitor.AJSCPropertiesMap; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import com.att.nsa.configs.ConfigDbException; +import com.att.nsa.drumlin.service.standards.MimeTypes; +import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; +import com.att.nsa.security.NsaApiKey; +import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException; +import com.att.nsa.util.rrConvertor; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.math.NumberUtils; +import org.apache.http.HttpStatus; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.errors.TopicExistsException; +import org.json.JSONObject; +import org.json.JSONTokener; +import org.onap.dmaap.dmf.mr.CambriaApiException; +import org.onap.dmaap.dmf.mr.backends.Consumer; +import org.onap.dmaap.dmf.mr.backends.ConsumerFactory; +import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException; +import org.onap.dmaap.dmf.mr.backends.MetricsSet; +import org.onap.dmaap.dmf.mr.backends.Publisher.message; +import org.onap.dmaap.dmf.mr.beans.DMaaPCambriaLimiter; +import org.onap.dmaap.dmf.mr.beans.DMaaPContext; +import org.onap.dmaap.dmf.mr.beans.LogDetails; +import org.onap.dmaap.dmf.mr.constants.CambriaConstants; +import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException; +import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages; +import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode; +import org.onap.dmaap.dmf.mr.exception.ErrorResponse; +import org.onap.dmaap.dmf.mr.metabroker.Topic; +import org.onap.dmaap.dmf.mr.resources.CambriaEventSet; +import org.onap.dmaap.dmf.mr.resources.CambriaOutboundEventStream; +import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticator; +import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl; +import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl; +import org.onap.dmaap.dmf.mr.service.EventsService; +import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder; +import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder.StreamWriter; +import org.onap.dmaap.dmf.mr.utils.Utils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.core.MediaType; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.Date; +import java.util.LinkedList; + +/** + * This class provides the functinality to publish and subscribe message to + * kafka + * + * @author Ramkumar Sembaiyam + * + */ +@Service +public class EventsServiceImpl implements EventsService { + + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventsServiceImpl.class); + private static final String BATCH_LENGTH = "event.batch.length"; + private static final String TRANSFER_ENCODING = "Transfer-Encoding"; + private static final String TIMEOUT_PROPERTY = "timeout"; + private static final String SUBSCRIBE_ACTION = "sub"; + private static final String PUBLISH_ACTION = "pub"; + + @Autowired + private DMaaPErrorMessages errorMessages; + + String getPropertyFromAJSCmap(String propertyKey) { + return AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, propertyKey); + } + + public DMaaPErrorMessages getErrorMessages() { + return errorMessages; + } + + public void setErrorMessages(DMaaPErrorMessages errorMessages) { + this.errorMessages = errorMessages; + } + + /** + * @param ctx + * @param topic + * @param consumerGroup + * @param clientId + * @throws ConfigDbException, + * TopicExistsException, AccessDeniedException, + * UnavailableException, CambriaApiException, IOException + * + * + */ + @Override + public void getEvents(DMaaPContext ctx, String topic, String consumerGroup, String clientId) + throws ConfigDbException, AccessDeniedException, UnavailableException, + CambriaApiException, IOException { + + final long startTime = System.currentTimeMillis(); + final HttpServletRequest req = ctx.getRequest(); + final LogWrap logger = new LogWrap(topic, consumerGroup, clientId); + final String remoteHost = req.getRemoteHost(); + ErrorResponseProvider errRespProvider = new ErrorResponseProvider.Builder().withErrorMessages(errorMessages) + .withTopic(topic).withConsumerGroup(consumerGroup).withClient(clientId).withRemoteHost(remoteHost).build(); + + validateIpBlacklist(errRespProvider, ctx); + + final Topic metaTopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic); + if (metaTopic == null) { + throw new CambriaApiException(errRespProvider.getTopicNotFoundError()); + } + + boolean isAAFTopic = authorizeClientWhenNeeded(ctx, metaTopic, topic, errRespProvider, SUBSCRIBE_ACTION); + + final long elapsedMs1 = System.currentTimeMillis() - startTime; + logger.info("Time taken in getEvents Authorization " + elapsedMs1 + " ms for " + topic + " " + consumerGroup + + " " + clientId); + + verifyHostId(); + final boolean pretty = isPrettyPrintEnabled(); + final boolean withMeta = isMetaOffsetEnabled(); + int timeoutMs = getMessageTimeout(req); + int limit = getMessageLimit(req); + String topicFilter = (null != req.getParameter("filter")) ? req.getParameter("filter") : CambriaConstants.kNoFilter; + logger.info("fetch: timeout=" + timeoutMs + ", limit=" + limit + ", filter=" + topicFilter + " from Remote host "+ctx.getRequest().getRemoteHost()); + + Consumer consumer = null; + try { + final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics(); + final DMaaPCambriaLimiter rl = ctx.getConfigReader().getfRateLimiter(); + rl.onCall(topic, consumerGroup, clientId, remoteHost); + consumer = ctx.getConfigReader().getfConsumerFactory().getConsumerFor(topic, consumerGroup, clientId, timeoutMs, + remoteHost); + CambriaOutboundEventStream coes = new CambriaOutboundEventStream.Builder(consumer).timeout(timeoutMs) + .limit(limit).filter(topicFilter).pretty(pretty).withMeta(withMeta).build(); + coes.setDmaapContext(ctx); + coes.setTopic(metaTopic); + coes.setTransEnabled(isTransEnabled() || isAAFTopic); + coes.setTopicStyle(isAAFTopic); + final long elapsedMs2 = System.currentTimeMillis() - startTime; + logger.info("Time taken in getEvents getConsumerFor " + elapsedMs2 + " ms for " + topic + " " + + consumerGroup + " " + clientId); + + respondOkWithStream(ctx, coes); + // No IOException thrown during respondOkWithStream, so commit the + // new offsets to all the brokers + consumer.commitOffsets(); + final int sent = coes.getSentCount(); + metricsSet.consumeTick(sent); + rl.onSend(topic, consumerGroup, clientId, sent); + final long elapsedMs = System.currentTimeMillis() - startTime; + logger.info("Sent " + sent + " msgs in " + elapsedMs + " ms; committed to offset " + consumer.getOffset() + " for " + + topic + " " + consumerGroup + " " + clientId + " on to the server " + + remoteHost); + + } catch (UnavailableException excp) { + logger.warn(excp.getMessage(), excp); + ErrorResponse errRes = errRespProvider.getServiceUnavailableError(excp.getMessage()); + LOG.info(errRes.toString()); + throw new CambriaApiException(errRes); + + } catch (ConcurrentModificationException excp1) { + LOG.info(excp1.getMessage() + "on " + topic + " " + consumerGroup + " ****** " + clientId + " from Remote"+remoteHost); + ErrorResponse errRes = errRespProvider.getConcurrentModificationError(); + logger.info(errRes.toString()); + throw new CambriaApiException(errRes); + + } catch (Exception excp) { + logger.info("Couldn't respond to client, closing cambria consumer " + " " + topic + " " + consumerGroup + + " " + clientId + " " + HttpStatus.SC_SERVICE_UNAVAILABLE + " ****** " + excp); + ctx.getConfigReader().getfConsumerFactory().destroyConsumer(topic, consumerGroup, clientId); + ErrorResponse errRes = errRespProvider.getGenericError(excp.getMessage()); + logger.info(errRes.toString()); + throw new CambriaApiException(errRes); + } finally { + if (consumer != null && !isCacheEnabled()) { + try { + consumer.close(); + } catch (Exception e) { + logger.info("***Exception occurred in getEvents finally block while closing the consumer " + " " + + topic + " " + consumerGroup + " " + clientId + " " + HttpStatus.SC_SERVICE_UNAVAILABLE + + " " + e); + } + } + } + } + + private void validateIpBlacklist(ErrorResponseProvider errResponseProvider, DMaaPContext ctx) throws CambriaApiException { + final String remoteAddr = Utils.getRemoteAddress(ctx); + if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) { + ErrorResponse errRes = errResponseProvider.getIpBlacklistedError(remoteAddr); + LOG.info(errRes.toString()); + throw new CambriaApiException(errRes); + } + } + + private boolean authorizeClientWhenNeeded(DMaaPContext ctx, Topic metaTopic, String topicName, + ErrorResponseProvider errRespProvider, String action) throws CambriaApiException, AccessDeniedException { + + boolean isAAFTopic = false; + String metricTopicName = getMetricTopicName(); + if(!metricTopicName.equalsIgnoreCase(topicName)) { + if(isCadiEnabled() && isTopicNameEnforcedAaf(topicName)) { + isAAFTopic = true; + DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); + String permission = aaf.aafPermissionString(topicName, action); + if (!aaf.aafAuthentication(ctx.getRequest(), permission)) { + ErrorResponse errRes = errRespProvider.getAafAuthorizationError(permission, action); + LOG.info(errRes.toString()); + throw new DMaaPAccessDeniedException(errRes); + + } + } else if(metaTopic!=null && null != metaTopic.getOwner() && !metaTopic.getOwner().isEmpty()) { + final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx); + if(SUBSCRIBE_ACTION.equals(action)) { + metaTopic.checkUserRead(user); + } else if(PUBLISH_ACTION.equals(action)) { + metaTopic.checkUserWrite(user); + } + } + } + return isAAFTopic; + } + + boolean isCadiEnabled() { + return Utils.isCadiEnabled(); + } + + void respondOkWithStream(DMaaPContext ctx, StreamWriter coes) throws IOException{ + DMaaPResponseBuilder.setNoCacheHeadings(ctx); + DMaaPResponseBuilder.respondOkWithStream(ctx, MediaType.APPLICATION_JSON, coes); + } + + private int getMessageLimit(HttpServletRequest request) { + return NumberUtils.toInt(request.getParameter("limit"), CambriaConstants.kNoLimit); + } + + private int getMessageTimeout(HttpServletRequest request) { + String timeoutMsAsString = getPropertyFromAJSCmap(TIMEOUT_PROPERTY); + int defaultTimeoutMs = StringUtils.isNotEmpty(timeoutMsAsString) ? NumberUtils.toInt(timeoutMsAsString, CambriaConstants.kNoTimeout) : + CambriaConstants.kNoTimeout; + + String timeoutProperty = request.getParameter(TIMEOUT_PROPERTY); + return timeoutProperty != null ? NumberUtils.toInt(timeoutProperty, defaultTimeoutMs) : defaultTimeoutMs; + } + + private boolean isPrettyPrintEnabled() { + return rrConvertor.convertToBooleanBroad(getPropertyFromAJSCmap("pretty")); + } + + private boolean isMetaOffsetEnabled() { + return rrConvertor.convertToBooleanBroad(getPropertyFromAJSCmap( "meta")); + } + + private boolean isTopicNameEnforcedAaf(String topicName) { + String topicNameStd = getPropertyFromAJSCmap("enforced.topic.name.AAF"); + return StringUtils.isNotEmpty(topicNameStd) && topicName.startsWith(topicNameStd); + } + + private boolean isCacheEnabled() { + String cachePropsSetting = getPropertyFromAJSCmap(ConsumerFactory.kSetting_EnableCache); + return StringUtils.isNotEmpty(cachePropsSetting) ? Boolean.parseBoolean(cachePropsSetting) : ConsumerFactory.kDefault_IsCacheEnabled; + } + + private void verifyHostId() { + String lhostId = getPropertyFromAJSCmap("clusterhostid"); + if (StringUtils.isEmpty(lhostId)) { + try { + InetAddress.getLocalHost().getCanonicalHostName(); + } catch (UnknownHostException e) { + LOG.warn("Unknown Host Exception error occurred while getting getting hostid", e); + } + + } + } + + private String getMetricTopicName() { + String metricTopicFromProps = getPropertyFromAJSCmap("metrics.send.cambria.topic"); + return StringUtils.isNotEmpty(metricTopicFromProps) ? metricTopicFromProps : "msgrtr.apinode.metrics.dmaap"; + } + + /** + * @throws missingReqdSetting + * + */ + @Override + public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition, + final String requestTime) throws ConfigDbException, AccessDeniedException, + CambriaApiException, IOException, missingReqdSetting { + + final long startMs = System.currentTimeMillis(); + String remoteHost = ctx.getRequest().getRemoteHost(); + ErrorResponseProvider errRespProvider = new ErrorResponseProvider.Builder().withErrorMessages(errorMessages) + .withTopic(topic).withRemoteHost(remoteHost).withPublisherIp(remoteHost) + .withPublisherId(Utils.getUserApiKey(ctx.getRequest())).build(); + + validateIpBlacklist(errRespProvider, ctx); + + final Topic metaTopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic); + + final boolean isAAFTopic = authorizeClientWhenNeeded(ctx, metaTopic, topic, errRespProvider, PUBLISH_ACTION); + + final HttpServletRequest req = ctx.getRequest(); + boolean chunked = isRequestedChunk(req); + String mediaType = getMediaType(req); + boolean transactionRequired = isTransactionIdRequired(); + + if (isAAFTopic || transactionRequired) { + pushEventsWithTransaction(ctx, msg, topic, defaultPartition, requestTime, chunked, mediaType); + } else { + pushEvents(ctx, topic, msg, defaultPartition, chunked, mediaType); + } + + final long endMs = System.currentTimeMillis(); + final long totalMs = endMs - startMs; + LOG.info("Overall Response time - Published " + " msgs in " + totalMs + " ms for topic " + topic); + } + + private boolean isRequestedChunk(HttpServletRequest request) { + return null != request.getHeader(TRANSFER_ENCODING) && + request.getHeader(TRANSFER_ENCODING).contains("chunked"); + } + + private String getMediaType(HttpServletRequest request) { + String mediaType = request.getContentType(); + if (mediaType == null || mediaType.length() == 0) { + return MimeTypes.kAppGenericBinary; + } + return mediaType.replace("; charset=UTF-8", "").trim(); + } + + private boolean isTransactionIdRequired() { + String transIdReqProperty = getPropertyFromAJSCmap("transidUEBtopicreqd"); + return StringUtils.isNotEmpty(transIdReqProperty) && transIdReqProperty.equalsIgnoreCase("true"); + } + + /** + * + * @param ctx + * @param topic + * @param msg + * @param defaultPartition + * @param chunked + * @param mediaType + * @throws ConfigDbException + * @throws AccessDeniedException + * @throws TopicExistsException + * @throws CambriaApiException + * @throws IOException + */ + private void pushEvents(DMaaPContext ctx, String topic, InputStream msg, String defaultPartition, boolean chunked, + String mediaType) + throws ConfigDbException, AccessDeniedException, CambriaApiException, IOException { + final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics(); + // setup the event set + final CambriaEventSet events = new CambriaEventSet(mediaType, msg, chunked, defaultPartition); + + // start processing, building a batch to push to the backend + final long startMs = System.currentTimeMillis(); + long count = 0; + long maxEventBatch = 1024L* 16; + String batchlen = getPropertyFromAJSCmap( BATCH_LENGTH); + if (null != batchlen && !batchlen.isEmpty()) + maxEventBatch = Long.parseLong(batchlen); + // long maxEventBatch = + + final LinkedList<message> batch = new LinkedList<>(); + // final ArrayList<KeyedMessage<String, String>> kms = new + + final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<>(); + try { + // for each message... + message m = null; + while ((m = events.next()) != null) { + // add the message to the batch + batch.add(m); + // final KeyedMessage<String, String> data = new + // KeyedMessage<String, String>(topic, m.getKey(), + + // kms.add(data); + final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(), + m.getMessage()); + + pms.add(data); + // check if the batch is full + final int sizeNow = batch.size(); + if (sizeNow > maxEventBatch) { + // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic, + + // kms.clear(); + ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms); + pms.clear(); + batch.clear(); + metricsSet.publishTick(sizeNow); + count += sizeNow; + } + } + + // send the pending batch + final int sizeNow = batch.size(); + if (sizeNow > 0) { + // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic, + + // kms.clear(); + ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms); + pms.clear(); + batch.clear(); + metricsSet.publishTick(sizeNow); + count += sizeNow; + } + + final long endMs = System.currentTimeMillis(); + final long totalMs = endMs - startMs; + + LOG.info("Published " + count + " msgs in " + totalMs + " ms for topic " + topic + " from server " + + ctx.getRequest().getRemoteHost()); + + // build a responseP + final JSONObject response = new JSONObject(); + response.put("count", count); + response.put("serverTimeMs", totalMs); + respondOk(ctx, response); + + } catch (Exception excp) { + int status = HttpStatus.SC_NOT_FOUND; + String errorMsg = null; + if (excp instanceof CambriaApiException) { + status = ((CambriaApiException) excp).getStatus(); + JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody()); + JSONObject errObject = new JSONObject(jsonTokener); + errorMsg = (String) errObject.get("message"); + + } + ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(), + errorMessages.getPublishMsgError() + ":" + topic + "." + errorMessages.getPublishMsgCount() + count + + "." + errorMsg, + null, Utils.getFormattedDate(new Date()), topic, null, ctx.getRequest().getRemoteHost(), null, + null); + LOG.info(errRes.toString()); + throw new CambriaApiException(errRes); + + } + } + + /** + * + * @param ctx + * @param inputStream + * @param topic + * @param partitionKey + * @param requestTime + * @param chunked + * @param mediaType + * @throws ConfigDbException + * @throws AccessDeniedException + * @throws TopicExistsException + * @throws IOException + * @throws CambriaApiException + */ + private void pushEventsWithTransaction(DMaaPContext ctx, InputStream inputStream, final String topic, + final String partitionKey, final String requestTime, final boolean chunked, final String mediaType) + throws ConfigDbException, AccessDeniedException, IOException, CambriaApiException { + + final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics(); + + // setup the event set + final CambriaEventSet events = new CambriaEventSet(mediaType, inputStream, chunked, partitionKey); + + // start processing, building a batch to push to the backend + final long startMs = System.currentTimeMillis(); + long count = 0; + long maxEventBatch = 1024L * 16; + String evenlen = getPropertyFromAJSCmap( BATCH_LENGTH); + if (null != evenlen && !evenlen.isEmpty()) + maxEventBatch = Long.parseLong(evenlen); + // final long maxEventBatch = + + final LinkedList<message> batch = new LinkedList<message>(); + // final ArrayList<KeyedMessage<String, String>> kms = new + + final ArrayList<ProducerRecord<String, String>> pms = new ArrayList<ProducerRecord<String, String>>(); + message m = null; + int messageSequence = 1; + Long batchId = 1L; + final boolean transactionEnabled = true; + int publishBatchCount = 0; + SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SS"); + + // LOG.warn("Batch Start Id: " + + + try { + // for each message... + batchId = DMaaPContext.getBatchID(); + + String responseTransactionId = null; + + while ((m = events.next()) != null) { + + // LOG.warn("Batch Start Id: " + + + + addTransactionDetailsToMessage(m, topic, ctx.getRequest(), requestTime, messageSequence, batchId, + transactionEnabled); + messageSequence++; + + + batch.add(m); + + responseTransactionId = m.getLogDetails().getTransactionId(); + + //JSONObject jsonObject = new JSONObject(); + //jsonObject.put("msgWrapMR", m.getMessage()); + //jsonObject.put("transactionId", responseTransactionId); + // final KeyedMessage<String, String> data = new + // KeyedMessage<String, String>(topic, m.getKey(), + + + final ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, m.getKey(), + m.getMessage()); + + pms.add(data); + // check if the batch is full + final int sizeNow = batch.size(); + if (sizeNow >= maxEventBatch) { + String startTime = sdf.format(new Date()); + LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id=" + + batchId + "]"); + try { + // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic, + + ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms); + // transactionLogs(batch); + for (message msg : batch) { + LogDetails logDetails = msg.getLogDetails(); + LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails()); + } + } catch (Exception excp) { + + int status = HttpStatus.SC_NOT_FOUND; + String errorMsg = null; + if (excp instanceof CambriaApiException) { + status = ((CambriaApiException) excp).getStatus(); + JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody()); + JSONObject errObject = new JSONObject(jsonTokener); + errorMsg = (String) errObject.get("message"); + } + ErrorResponse errRes = new ErrorResponse(status, + DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(), + "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "." + + errorMessages.getPublishMsgCount() + count + "." + errorMsg, + null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()), + ctx.getRequest().getRemoteHost(), null, null); + LOG.info(errRes.toString()); + throw new CambriaApiException(errRes); + } + pms.clear(); + batch.clear(); + metricsSet.publishTick(sizeNow); + publishBatchCount = sizeNow; + count += sizeNow; + + String endTime = sdf.format(new Date()); + LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id=" + + batchId + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime + + ",Batch End Time=" + endTime + "]"); + batchId = DMaaPContext.getBatchID(); + } + } + + // send the pending batch + final int sizeNow = batch.size(); + if (sizeNow > 0) { + String startTime = sdf.format(new Date()); + LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id=" + + batchId + "]"); + try { + // ctx.getConfigReader().getfPublisher().sendBatchMessage(topic, + + ctx.getConfigReader().getfPublisher().sendBatchMessageNew(topic, pms); + + for (message msg : batch) { + LogDetails logDetails = msg.getLogDetails(); + LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails()); + } + } catch (Exception excp) { + int status = HttpStatus.SC_NOT_FOUND; + String errorMsg = null; + if (excp instanceof CambriaApiException) { + status = ((CambriaApiException) excp).getStatus(); + JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody()); + JSONObject errObject = new JSONObject(jsonTokener); + errorMsg = (String) errObject.get("message"); + } + + ErrorResponse errRes = new ErrorResponse(status, + DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(), + "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "." + + errorMessages.getPublishMsgCount() + count + "." + errorMsg, + null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()), + ctx.getRequest().getRemoteHost(), null, null); + LOG.info(errRes.toString()); + throw new CambriaApiException(errRes); + } + pms.clear(); + metricsSet.publishTick(sizeNow); + count += sizeNow; + + String endTime = sdf.format(new Date()); + publishBatchCount = sizeNow; + LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id=" + batchId + + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime + ",Batch End Time=" + + endTime + "]"); + } + + final long endMs = System.currentTimeMillis(); + final long totalMs = endMs - startMs; + + LOG.info("Published " + count + " msgs(with transaction id) in " + totalMs + " ms for topic " + topic); + + if (null != responseTransactionId) { + ctx.getResponse().setHeader("transactionId", Utils.getResponseTransactionId(responseTransactionId)); + } + + // build a response + final JSONObject response = new JSONObject(); + response.put("count", count); + response.put("transactionId", responseTransactionId); + response.put("serverTimeMs", totalMs); + respondOk(ctx, response); + + } catch (Exception excp) { + int status = HttpStatus.SC_NOT_FOUND; + String errorMsg = null; + if (excp instanceof CambriaApiException) { + status = ((CambriaApiException) excp).getStatus(); + JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody()); + JSONObject errObject = new JSONObject(jsonTokener); + errorMsg = (String) errObject.get("message"); + } + + ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(), + "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "." + + errorMessages.getPublishMsgCount() + count + "." + errorMsg, + null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()), + ctx.getRequest().getRemoteHost(), null, null); + LOG.info(errRes.toString()); + throw new CambriaApiException(errRes); + } + } + + /** + * + * @param msg + * @param topic + * @param request + * @param messageCreationTime + * @param messageSequence + * @param batchId + * @param transactionEnabled + */ + private static void addTransactionDetailsToMessage(message msg, final String topic, HttpServletRequest request, + final String messageCreationTime, final int messageSequence, final Long batchId, + final boolean transactionEnabled) { + LogDetails logDetails = generateLogDetails(topic, request, messageCreationTime, messageSequence, batchId, + transactionEnabled); + logDetails.setMessageLengthInBytes(Utils.messageLengthInBytes(msg.getMessage())); + msg.setTransactionEnabled(transactionEnabled); + msg.setLogDetails(logDetails); + } + + void respondOk(DMaaPContext ctx, JSONObject response) throws IOException { + DMaaPResponseBuilder.respondOk(ctx, response); + } + + /** + * + * @author anowarul.islam + * + */ + private static class LogWrap { + private final String fId; + + /** + * constructor initialization + * + * @param topic + * @param cgroup + * @param cid + */ + public LogWrap(String topic, String cgroup, String cid) { + fId = "[" + topic + "/" + cgroup + "/" + cid + "] "; + } + + /** + * + * @param msg + */ + public void info(String msg) { + LOG.info(fId + msg); + } + + /** + * + * @param msg + * @param t + */ + public void warn(String msg, Exception t) { + LOG.warn(fId + msg, t); + } + + } + + public boolean isTransEnabled() { + String istransidUEBtopicreqd = getPropertyFromAJSCmap("transidUEBtopicreqd"); + boolean istransidreqd = false; + if ((null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true"))) { + istransidreqd = true; + } + + return istransidreqd; + + } + + private static LogDetails generateLogDetails(final String topicName, HttpServletRequest request, + final String messageTimestamp, int messageSequence, Long batchId, final boolean transactionEnabled) { + LogDetails logDetails = new LogDetails(); + logDetails.setTopicId(topicName); + logDetails.setMessageTimestamp(messageTimestamp); + logDetails.setPublisherId(Utils.getUserApiKey(request)); + logDetails.setPublisherIp(request.getRemoteHost()); + logDetails.setMessageBatchId(batchId); + logDetails.setMessageSequence(String.valueOf(messageSequence)); + logDetails.setTransactionEnabled(transactionEnabled); + logDetails.setTransactionIdTs(Utils.getFormattedDate(new Date())); + logDetails.setServerIp(request.getLocalAddr()); + return logDetails; + } + + +}
\ No newline at end of file |