diff options
Diffstat (limited to 'dcae-analytics/dcae-analytics-web/src/main/java/org/onap/dcae/analytics/web/config/DmaapMrConfig.java')
-rw-r--r-- | dcae-analytics/dcae-analytics-web/src/main/java/org/onap/dcae/analytics/web/config/DmaapMrConfig.java | 28 |
1 files changed, 16 insertions, 12 deletions
diff --git a/dcae-analytics/dcae-analytics-web/src/main/java/org/onap/dcae/analytics/web/config/DmaapMrConfig.java b/dcae-analytics/dcae-analytics-web/src/main/java/org/onap/dcae/analytics/web/config/DmaapMrConfig.java index 1fe9e51..705a724 100644 --- a/dcae-analytics/dcae-analytics-web/src/main/java/org/onap/dcae/analytics/web/config/DmaapMrConfig.java +++ b/dcae-analytics/dcae-analytics-web/src/main/java/org/onap/dcae/analytics/web/config/DmaapMrConfig.java @@ -1,6 +1,8 @@ /* * ================================================================================ * Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. + * Copyright (c) 2021 China Mobile Property. All rights reserved. + * Copyright (c) 2021 Nokia Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,10 +21,6 @@ package org.onap.dcae.analytics.web.config; -import com.fasterxml.jackson.databind.ObjectMapper; - -import java.util.Map; - import org.onap.dcae.analytics.model.AnalyticsHttpConstants; import org.onap.dcae.analytics.model.AnalyticsProfile; import org.onap.dcae.analytics.model.DmaapMrConstants; @@ -44,16 +42,20 @@ import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.core.MessageSource; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; -import org.springframework.integration.dsl.channel.MessageChannels; +import org.springframework.integration.dsl.MessageChannels; import org.springframework.integration.endpoint.MethodInvokingMessageSource; +import org.springframework.integration.handler.GenericHandler; import org.springframework.integration.handler.advice.RequestHandlerRetryAdvice; import org.springframework.integration.http.dsl.Http; import org.springframework.integration.scheduling.PollerMetadata; import org.springframework.integration.store.BasicMessageGroupStore; import org.springframework.integration.store.MessageGroupQueue; import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.MessageHeaders; import org.springframework.web.client.RestTemplate; +import com.fasterxml.jackson.databind.ObjectMapper; + /** * @author Rajiv Singla */ @@ -139,21 +141,23 @@ public class DmaapMrConfig { final RestTemplate mrPublisherRestTemplate, final DirectChannel mrPublisherInputChannel, final RequestHandlerRetryAdvice requestHandlerRetryAdvice) { - return IntegrationFlows.from(mrPublisherInputChannel) .handle(Http.outboundGateway(mrPublisherPreferences.getRequestURL(), mrPublisherRestTemplate) .mappedRequestHeaders(DMAAP_MAPPED_REQUEST_HEADERS) .httpMethod(HttpMethod.POST) .extractPayload(true) .expectedResponseType(String.class), c -> c.advice(requestHandlerRetryAdvice)) - // add end timestamp - .handle((String p, Map<String, Object> headers) -> - MessageBuilder.withPayload(p).copyHeaders(headers) - .setHeader(AnalyticsHttpConstants.REQUEST_END_TS_HEADER_KEY, - AnalyticsWebUtils.CREATION_TIMESTAMP_SUPPLIER.get()).build() - ) + .handle(new GenericHandler<String>() { + @Override + public Object handle(String payload, MessageHeaders headers) { + return MessageBuilder.withPayload(payload).copyHeaders(headers) + .setHeader(AnalyticsHttpConstants.REQUEST_END_TS_HEADER_KEY, + AnalyticsWebUtils.CREATION_TIMESTAMP_SUPPLIER.get()).build(); + } + }) .channel(DmaapMrConstants.DMAAP_MR_PUBLISHER_OUTPUT_CHANNEL) .get(); } } + |