diff options
author | Stavros Kanarakis <stavros.kanarakis@nokia.com> | 2020-02-28 21:53:04 +0200 |
---|---|---|
committer | Stavros Kanarakis <stavros.kanarakis@nokia.com> | 2020-03-03 17:30:22 +0200 |
commit | 22025d4ddfcccd86a2f93be7dadea9735e4b4528 (patch) | |
tree | 6b12e0770f65cb6a7389c0300fe95454ac1717fb /components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities | |
parent | 82a39f7da3177a9b9b700c7291ed5ea47c90e478 (diff) |
Upgrade of BBS-ep service
Upgraded service to use latest DCAE-SDK
Upgraded many of the dependencies to latest versions
Introduced Java 11
Change-Id: I29d265d2a75aa80749f567cfb10920b2c45c2cec
Issue-ID: DCAEGEN2-2105
Signed-off-by: Stavros Kanarakis <stavros.kanarakis@nokia.com>
Diffstat (limited to 'components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities')
8 files changed, 166 insertions, 42 deletions
diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java index 84fc9f7d..c0e9b1c4 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/AaiReactiveClient.java @@ -20,6 +20,8 @@ package org.onap.bbs.event.processor.utilities; +import static org.onap.bbs.event.processor.utilities.GenericUtils.keyStoreFromResource; +import static org.onap.dcaegen2.services.sdk.security.ssl.Passwords.fromPath; import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication; import com.google.gson.Gson; @@ -27,9 +29,10 @@ import com.google.gson.JsonSyntaxException; import io.netty.handler.ssl.SslContext; +import java.nio.file.Paths; + import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; -import javax.net.ssl.SSLException; import org.onap.bbs.event.processor.config.AaiClientConfiguration; import org.onap.bbs.event.processor.config.ApplicationConfiguration; @@ -37,7 +40,9 @@ import org.onap.bbs.event.processor.config.ConfigurationChangeObserver; import org.onap.bbs.event.processor.exceptions.AaiTaskException; import org.onap.bbs.event.processor.model.PnfAaiObject; import org.onap.bbs.event.processor.model.ServiceInstanceAaiObject; -import org.onap.dcaegen2.services.sdk.rest.services.ssl.SslFactory; +import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys; +import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys; +import org.onap.dcaegen2.services.sdk.security.ssl.SslFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -64,7 +69,7 @@ public class AaiReactiveClient implements ConfigurationChangeObserver { private AaiClientConfiguration aaiClientConfiguration; @Autowired - AaiReactiveClient(ApplicationConfiguration configuration, Gson gson) throws SSLException { + AaiReactiveClient(ApplicationConfiguration configuration, Gson gson) { this.configuration = configuration; this.gson = gson; this.sslFactory = new SslFactory(); @@ -85,25 +90,20 @@ public class AaiReactiveClient implements ConfigurationChangeObserver { @Override public void updateConfiguration() { - AaiClientConfiguration newConfiguration = configuration.getAaiClientConfiguration(); + var newConfiguration = configuration.getAaiClientConfiguration(); if (aaiClientConfiguration.equals(newConfiguration)) { LOGGER.info("No Configuration changes necessary for AAI Reactive client"); } else { synchronized (this) { LOGGER.info("AAI Reactive client must be re-configured"); aaiClientConfiguration = newConfiguration; - try { - setupWebClient(); - } catch (SSLException e) { - LOGGER.error("AAI Reactive client SSL error while re-configuring WebClient"); - LOGGER.debug("SSL Exception\n", e); - } + setupWebClient(); } } } - private void setupWebClient() throws SSLException { - SslContext sslContext = createSslContext(); + private void setupWebClient() { + var sslContext = createSslContext(); ClientHttpConnector reactorClientHttpConnector = new ReactorClientHttpConnector( HttpClient.create().secure(sslContextSpec -> sslContextSpec.sslContext(sslContext))); @@ -132,7 +132,7 @@ public class AaiReactiveClient implements ConfigurationChangeObserver { private <T> Mono<T> performReactiveHttpGet(String url, Class<T> responseType) { LOGGER.debug("Will issue Reactive GET request to URL ({}) for object ({})", url, responseType.getName()); - WebClient webClient = getWebClient(); + var webClient = getWebClient(); return webClient .get() .uri(url) @@ -179,17 +179,18 @@ public class AaiReactiveClient implements ConfigurationChangeObserver { }); } - private SslContext createSslContext() throws SSLException { + private SslContext createSslContext() { if (aaiClientConfiguration.enableAaiCertAuth()) { LOGGER.info("Creating secure context with:\n {}", aaiClientConfiguration); - return sslFactory.createSecureContext( - aaiClientConfiguration.keyStorePath(), - aaiClientConfiguration.keyStorePasswordPath(), - aaiClientConfiguration.trustStorePath(), - aaiClientConfiguration.trustStorePasswordPath() - ); + final SecurityKeys securityKeys = ImmutableSecurityKeys.builder() + .keyStore(keyStoreFromResource(aaiClientConfiguration.keyStorePath())) + .keyStorePassword(fromPath(Paths.get(aaiClientConfiguration.keyStorePasswordPath()))) + .trustStore(keyStoreFromResource(aaiClientConfiguration.trustStorePath())) + .trustStorePassword(fromPath(Paths.get(aaiClientConfiguration.trustStorePasswordPath()))) + .build(); + return sslFactory.createSecureClientContext(securityKeys); } - return sslFactory.createInsecureContext(); + return sslFactory.createInsecureClientContext(); } private synchronized WebClient getWebClient() { diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CommonEventFields.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CommonEventFields.java index c2b67348..b3fa09db 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CommonEventFields.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CommonEventFields.java @@ -20,7 +20,7 @@ package org.onap.bbs.event.processor.utilities; -public class CommonEventFields { +class CommonEventFields { static final String COMMON_FORMAT = "\": \"%s\""; static final String EVENT = "event"; diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ControlLoopJsonBodyBuilder.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ControlLoopJsonBodyBuilder.java index 03ea0936..f27cca5e 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ControlLoopJsonBodyBuilder.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ControlLoopJsonBodyBuilder.java @@ -21,24 +21,23 @@ package org.onap.bbs.event.processor.utilities; import com.google.gson.GsonBuilder; +import com.google.gson.JsonElement; import com.google.gson.TypeAdapterFactory; import java.util.ServiceLoader; import org.onap.bbs.event.processor.model.ControlLoopPublisherDmaapModel; import org.onap.bbs.event.processor.model.ImmutableControlLoopPublisherDmaapModel; -import org.onap.dcaegen2.services.sdk.rest.services.model.JsonBodyBuilder; -public class ControlLoopJsonBodyBuilder implements JsonBodyBuilder<ControlLoopPublisherDmaapModel> { +public class ControlLoopJsonBodyBuilder { /** * Serialize the Control Loop DMaaP model with GSON. * @param publisherDmaapModel object to be serialized * @return String output of serialization */ - @Override public String createJsonBody(ControlLoopPublisherDmaapModel publisherDmaapModel) { - GsonBuilder gsonBuilder = new GsonBuilder().disableHtmlEscaping(); + var gsonBuilder = new GsonBuilder().disableHtmlEscaping(); ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); return gsonBuilder.create().toJson(ImmutableControlLoopPublisherDmaapModel.builder() .closedLoopEventClient(publisherDmaapModel.getClosedLoopEventClient()) @@ -56,4 +55,30 @@ public class ControlLoopJsonBodyBuilder implements JsonBodyBuilder<ControlLoopPu .originator(publisherDmaapModel.getOriginator()) .build()); } + + /** + * Serialize the Control Loop DMaaP model with GSON. + * @param publisherDmaapModel object to be serialized + * @return String output of serialization + */ + public static JsonElement createAsJsonElement(ControlLoopPublisherDmaapModel publisherDmaapModel) { + var gsonBuilder = new GsonBuilder().disableHtmlEscaping(); + ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); + return gsonBuilder.create().toJsonTree(ImmutableControlLoopPublisherDmaapModel.builder() + .closedLoopEventClient(publisherDmaapModel.getClosedLoopEventClient()) + .policyVersion(publisherDmaapModel.getPolicyVersion()) + .policyName(publisherDmaapModel.getPolicyName()) + .policyScope(publisherDmaapModel.getPolicyScope()) + .targetType(publisherDmaapModel.getTargetType()) + .aaiEnrichmentData(publisherDmaapModel.getAaiEnrichmentData()) + .closedLoopAlarmStart(publisherDmaapModel.getClosedLoopAlarmStart()) + .closedLoopEventStatus(publisherDmaapModel.getClosedLoopEventStatus()) + .closedLoopControlName(publisherDmaapModel.getClosedLoopControlName()) + .version(publisherDmaapModel.getVersion()) + .target(publisherDmaapModel.getTarget()) + .requestId(publisherDmaapModel.getRequestId()) + .originator(publisherDmaapModel.getOriginator()) + .build()); + } + } diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java index 3cff4e65..5a0466ac 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationDmaapConsumerJsonParser.java @@ -46,11 +46,13 @@ import org.onap.bbs.event.processor.model.CpeAuthenticationConsumerDmaapModel; import org.onap.bbs.event.processor.model.ImmutableCpeAuthenticationConsumerDmaapModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +@Component public class CpeAuthenticationDmaapConsumerJsonParser { private static final Logger LOGGER = LoggerFactory.getLogger(CpeAuthenticationDmaapConsumerJsonParser.class); @@ -108,9 +110,9 @@ public class CpeAuthenticationDmaapConsumerJsonParser { return Mono.empty(); } - JsonObject commonEventHeader = dmaapResponseJsonObject.getAsJsonObject(EVENT) + var commonEventHeader = dmaapResponseJsonObject.getAsJsonObject(EVENT) .getAsJsonObject(COMMON_EVENT_HEADER); - JsonObject stateChangeFields = dmaapResponseJsonObject.getAsJsonObject(EVENT) + var stateChangeFields = dmaapResponseJsonObject.getAsJsonObject(EVENT) .getAsJsonObject(STATE_CHANGE_FIELDS); pnfSourceName = getValueFromJson(commonEventHeader, SOURCE_NAME); @@ -120,7 +122,7 @@ public class CpeAuthenticationDmaapConsumerJsonParser { stateInterface = getValueFromJson(stateChangeFields, STATE_INTERFACE); if (stateChangeFields.has(ADDITIONAL_FIELDS)) { - JsonObject additionalFields = stateChangeFields.getAsJsonObject(ADDITIONAL_FIELDS); + var additionalFields = stateChangeFields.getAsJsonObject(ADDITIONAL_FIELDS); rgwMacAddress = getValueFromJson(additionalFields, RGW_MAC_ADDRESS); swVersion = getValueFromJson(additionalFields, SW_VERSION); } @@ -128,7 +130,7 @@ public class CpeAuthenticationDmaapConsumerJsonParser { if (StringUtils.isEmpty(pnfSourceName) || authenticationStatusMissing(oldAuthenticationStatus) || authenticationStatusMissing(newAuthenticationStatus)) { - String incorrectEvent = dumpJsonData(); + var incorrectEvent = dumpJsonData(); LOGGER.warn("Incorrect CPE Authentication JSON event: {}", incorrectEvent); return Mono.empty(); } @@ -164,7 +166,7 @@ public class CpeAuthenticationDmaapConsumerJsonParser { } Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) { - JsonParser jsonParser = new JsonParser(); + var jsonParser = new JsonParser(); return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject()) : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject()); } diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationJsonBodyBuilder.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationJsonBodyBuilder.java index 09aa7303..df8eaa40 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationJsonBodyBuilder.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/CpeAuthenticationJsonBodyBuilder.java @@ -27,18 +27,16 @@ import java.util.ServiceLoader; import org.onap.bbs.event.processor.model.CpeAuthenticationConsumerDmaapModel; import org.onap.bbs.event.processor.model.ImmutableCpeAuthenticationConsumerDmaapModel; -import org.onap.dcaegen2.services.sdk.rest.services.model.JsonBodyBuilder; -public class CpeAuthenticationJsonBodyBuilder implements JsonBodyBuilder<CpeAuthenticationConsumerDmaapModel> { +public class CpeAuthenticationJsonBodyBuilder { /** * Serialize the CPE authentication DMaaP model with GSON. * @param cpeAuthenticationConsumerDmaapModel object to be serialized * @return String output of serialization */ - @Override public String createJsonBody(CpeAuthenticationConsumerDmaapModel cpeAuthenticationConsumerDmaapModel) { - GsonBuilder gsonBuilder = new GsonBuilder(); + var gsonBuilder = new GsonBuilder(); ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); return gsonBuilder.create().toJson(ImmutableCpeAuthenticationConsumerDmaapModel.builder() .correlationId(cpeAuthenticationConsumerDmaapModel.getCorrelationId()) diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/GenericUtils.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/GenericUtils.java new file mode 100644 index 00000000..be8bea09 --- /dev/null +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/GenericUtils.java @@ -0,0 +1,98 @@ +/* + * ============LICENSE_START======================================================= + * BBS-RELOCATION-CPE-AUTHENTICATION-HANDLER + * ================================================================================ + * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.bbs.event.processor.utilities; + +import java.nio.file.Paths; + +import org.jetbrains.annotations.NotNull; +import org.onap.bbs.event.processor.exceptions.ApplicationEnvironmentException; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeysStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.util.StringUtils; + +public class GenericUtils { + + private static final Logger LOGGER = LoggerFactory.getLogger(GenericUtils.class); + + private GenericUtils() {} + + /** + * Creates Message Router subscription request. + * @param topicUrl URL of topic to use + * @param consumerGroup Consumer Group for subscription + * @param consumerId Consumer ID for subscription + * @return request based on provided input + */ + public static MessageRouterSubscribeRequest createSubscribeRequest(String topicUrl, + String consumerGroup, String consumerId) { + var sourceDefinition = ImmutableMessageRouterSource.builder() + .name("Subscriber source") + .topicUrl(topicUrl) + .build(); + + return ImmutableMessageRouterSubscribeRequest + .builder() + .sourceDefinition(sourceDefinition) + .consumerGroup(consumerGroup) + .consumerId(consumerId) + .build(); + } + + /** + * Creates Message Router publish request. + * @param topicUrl URL of topic to use + * @return request based on provided input + */ + public static MessageRouterPublishRequest createPublishRequest(String topicUrl) { + MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder() + .topicUrl(topicUrl) + .name("Producer sink") + .build(); + + return ImmutableMessageRouterPublishRequest.builder() + .sinkDefinition(sinkDefinition) + .contentType(ContentType.APPLICATION_JSON) + .build(); + } + + /** + * Creates a security key store for HTTPS. + * @param resource identifying the resource from which we will read security information + * @return store that will be used for HTTPS + */ + @NotNull public static SecurityKeysStore keyStoreFromResource(String resource) { + if (StringUtils.isEmpty(resource)) { + throw new ApplicationEnvironmentException("Resource for security key store is empty"); + } + var path = Paths.get(resource); + LOGGER.info("Reading keys from {}", path.toString()); + return SecurityKeysStore.fromPath(path); + } +} diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java index 9fe0c277..5e249e57 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationDmaapConsumerJsonParser.java @@ -41,11 +41,13 @@ import org.onap.bbs.event.processor.model.ImmutableReRegistrationConsumerDmaapMo import org.onap.bbs.event.processor.model.ReRegistrationConsumerDmaapModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +@Component public class ReRegistrationDmaapConsumerJsonParser { private static final Logger LOGGER = LoggerFactory.getLogger(ReRegistrationDmaapConsumerJsonParser.class); @@ -101,7 +103,7 @@ public class ReRegistrationDmaapConsumerJsonParser { return Mono.empty(); } - JsonObject pnfReRegistrationFields = + var pnfReRegistrationFields = dmaapResponseJsonObject.getAsJsonObject(ADDITIONAL_FIELDS); pnfCorrelationId = getValueFromJson(dmaapResponseJsonObject, CORRELATION_ID); @@ -112,7 +114,7 @@ public class ReRegistrationDmaapConsumerJsonParser { svlan = getValueFromJson(pnfReRegistrationFields, SVLAN); if (StringUtils.isEmpty(pnfCorrelationId) || anyImportantPropertyMissing()) { - String incorrectEvent = dumpJsonData(); + var incorrectEvent = dumpJsonData(); LOGGER.warn("Incorrect Re-Registration JSON event: {}", incorrectEvent); return Mono.empty(); } @@ -148,7 +150,7 @@ public class ReRegistrationDmaapConsumerJsonParser { } Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) { - JsonParser jsonParser = new JsonParser(); + var jsonParser = new JsonParser(); return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject()) : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject()); } diff --git a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationJsonBodyBuilder.java b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationJsonBodyBuilder.java index 867cfda7..bf90a4c5 100644 --- a/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationJsonBodyBuilder.java +++ b/components/bbs-event-processor/src/main/java/org/onap/bbs/event/processor/utilities/ReRegistrationJsonBodyBuilder.java @@ -27,18 +27,16 @@ import java.util.ServiceLoader; import org.onap.bbs.event.processor.model.ImmutableReRegistrationConsumerDmaapModel; import org.onap.bbs.event.processor.model.ReRegistrationConsumerDmaapModel; -import org.onap.dcaegen2.services.sdk.rest.services.model.JsonBodyBuilder; -public class ReRegistrationJsonBodyBuilder implements JsonBodyBuilder<ReRegistrationConsumerDmaapModel> { +public class ReRegistrationJsonBodyBuilder { /** * Serialize the Re-Registration DMaaP model with GSON. * @param reRegistrationConsumerDmaapModel object to be serialized * @return String output of serialization */ - @Override public String createJsonBody(ReRegistrationConsumerDmaapModel reRegistrationConsumerDmaapModel) { - GsonBuilder gsonBuilder = new GsonBuilder(); + var gsonBuilder = new GsonBuilder(); ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); return gsonBuilder.create().toJson(ImmutableReRegistrationConsumerDmaapModel.builder() .correlationId(reRegistrationConsumerDmaapModel.getCorrelationId()) |