diff options
author | rameshiyer27 <ramesh.murugan.iyer@est.tech> | 2021-07-20 07:57:41 +0100 |
---|---|---|
committer | rameshiyer27 <ramesh.murugan.iyer@est.tech> | 2021-07-29 16:30:11 +0100 |
commit | 23e25e89619c732a687b44a5487afbb6badf613c (patch) | |
tree | 76641d98717f826bf1df96357a7db815cf899b25 /participant/participant-impl/participant-impl-http/src/main | |
parent | 931424fc681b1d41afbba98a190253283f9a6eba (diff) |
Implement http participant in CLAMP
Issue-ID: POLICY-3449
Signed-off-by: zrrmmua <ramesh.murugan.iyer@est.tech>
Change-Id: Ibfe23a6e98fb760a930b6080dc2291113b3cb4fa
Diffstat (limited to 'participant/participant-impl/participant-impl-http/src/main')
9 files changed, 569 insertions, 0 deletions
diff --git a/participant/participant-impl/participant-impl-http/src/main/java/org/onap/policy/clamp/controlloop/participant/http/Application.java b/participant/participant-impl/participant-impl-http/src/main/java/org/onap/policy/clamp/controlloop/participant/http/Application.java new file mode 100644 index 000000000..dc36392f7 --- /dev/null +++ b/participant/participant-impl/participant-impl-http/src/main/java/org/onap/policy/clamp/controlloop/participant/http/Application.java @@ -0,0 +1,40 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.http; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * Starter. + * + */ +@SpringBootApplication +public class Application { + /** + * Main class. + * + * @param args args + */ + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + } +} diff --git a/participant/participant-impl/participant-impl-http/src/main/java/org/onap/policy/clamp/controlloop/participant/http/config/ParticipantConfig.java b/participant/participant-impl/participant-impl-http/src/main/java/org/onap/policy/clamp/controlloop/participant/http/config/ParticipantConfig.java new file mode 100644 index 000000000..ce013a765 --- /dev/null +++ b/participant/participant-impl/participant-impl-http/src/main/java/org/onap/policy/clamp/controlloop/participant/http/config/ParticipantConfig.java @@ -0,0 +1,43 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.http.config; + +import org.onap.policy.clamp.controlloop.participant.http.main.handler.ControlLoopElementHandler; +import org.onap.policy.clamp.controlloop.participant.intermediary.api.ParticipantIntermediaryApi; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class ParticipantConfig { + + /** + * Register ControlLoopElementListener. + * + * @param intermediaryApi the ParticipantIntermediaryApi + * @param clElementHandler the ControlLoop Element Handler + */ + @Autowired + public void registerControlLoopElementListener(ParticipantIntermediaryApi intermediaryApi, + ControlLoopElementHandler clElementHandler) { + intermediaryApi.registerControlLoopElementListener(clElementHandler); + clElementHandler.setIntermediaryApi(intermediaryApi); + } +} diff --git a/participant/participant-impl/participant-impl-http/src/main/java/org/onap/policy/clamp/controlloop/participant/http/main/exception/HttpWebClientException.java b/participant/participant-impl/participant-impl-http/src/main/java/org/onap/policy/clamp/controlloop/participant/http/main/exception/HttpWebClientException.java new file mode 100644 index 000000000..b10af884c --- /dev/null +++ b/participant/participant-impl/participant-impl-http/src/main/java/org/onap/policy/clamp/controlloop/participant/http/main/exception/HttpWebClientException.java @@ -0,0 +1,31 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.http.main.exception; + + +import org.springframework.web.reactive.function.client.WebClientResponseException; + +public class HttpWebClientException extends WebClientResponseException { + + public HttpWebClientException(int statusCode, String statusText) { + super(statusCode, statusText, null, null, null); + } +} diff --git a/participant/participant-impl/participant-impl-http/src/main/java/org/onap/policy/clamp/controlloop/participant/http/main/handler/ControlLoopElementHandler.java b/participant/participant-impl/participant-impl-http/src/main/java/org/onap/policy/clamp/controlloop/participant/http/main/handler/ControlLoopElementHandler.java new file mode 100644 index 000000000..67948a95c --- /dev/null +++ b/participant/participant-impl/participant-impl-http/src/main/java/org/onap/policy/clamp/controlloop/participant/http/main/handler/ControlLoopElementHandler.java @@ -0,0 +1,148 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.http.main.handler; + +import java.io.Closeable; +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import javax.validation.ConstraintViolation; +import javax.validation.Validation; +import javax.validation.ValidationException; +import lombok.Getter; +import lombok.Setter; +import org.apache.commons.lang3.tuple.Pair; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopElement; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopOrderedState; +import org.onap.policy.clamp.controlloop.models.controlloop.concepts.ControlLoopState; +import org.onap.policy.clamp.controlloop.participant.http.main.models.ConfigRequest; +import org.onap.policy.clamp.controlloop.participant.http.main.webclient.ClHttpClient; +import org.onap.policy.clamp.controlloop.participant.intermediary.api.ControlLoopElementListener; +import org.onap.policy.clamp.controlloop.participant.intermediary.api.ParticipantIntermediaryApi; +import org.onap.policy.common.utils.coder.Coder; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; +import org.onap.policy.models.base.PfModelException; +import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; +import org.onap.policy.models.tosca.authorative.concepts.ToscaNodeTemplate; +import org.onap.policy.models.tosca.authorative.concepts.ToscaServiceTemplate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +/** + * This class handles implementation of controlLoopElement updates. + */ +@Component +public class ControlLoopElementHandler implements ControlLoopElementListener, Closeable { + + private static final Coder CODER = new StandardCoder(); + + private static final Logger LOGGER = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + + private Map<ToscaConceptIdentifier, Pair<Integer, String>> restResponseMap = new ConcurrentHashMap<>(); + + @Setter + private ParticipantIntermediaryApi intermediaryApi; + + /** + * Handle controlLoopElement statistics. + * + * @param controlLoopElementId controlloop element id + */ + @Override + public void handleStatistics(UUID controlLoopElementId) throws PfModelException { + // Implementation not needed for http participant + + } + + /** + * Handle a control loop element state change. + * + * @param controlLoopElementId the ID of the control loop element + * @param currentState the current state of the control loop element + * @param newState the state to which the control loop element is changing to + * @throws PfModelException in case of a model exception + */ + @Override + public void controlLoopElementStateChange(UUID controlLoopElementId, ControlLoopState currentState, + ControlLoopOrderedState newState) throws PfModelException { + // Implementation not needed for http participant + } + + /** + * Callback method to handle an update on a control loop element. + * + * @param element the information on the control loop element + * @param controlLoopDefinition toscaServiceTemplate + */ + @Override + public void controlLoopElementUpdate(ControlLoopElement element, ToscaServiceTemplate controlLoopDefinition) { + + for (Map.Entry<String, ToscaNodeTemplate> nodeTemplate : controlLoopDefinition.getToscaTopologyTemplate() + .getNodeTemplates().entrySet()) { + // Fetching the node template of corresponding CL element + if (element.getDefinition().getName().equals(nodeTemplate.getKey())) { + try { + var configRequest = CODER.convert(nodeTemplate.getValue().getProperties(), ConfigRequest.class); + Set<ConstraintViolation<ConfigRequest>> violations = Validation.buildDefaultValidatorFactory() + .getValidator().validate(configRequest); + if (violations.isEmpty()) { + invokeHttpClient(configRequest); + } else { + LOGGER.error("Violations found in the config request parameters: {}", violations); + throw new ValidationException("Constraint violations in the config request"); + } + } catch (CoderException | ValidationException e) { + LOGGER.error("Error invoking the http request for the config ", e); + } + } + } + } + + /** + * Invoke a runnable thread to execute http requests. + * @param configRequest ConfigRequest + */ + public void invokeHttpClient(ConfigRequest configRequest) { + // Invoke runnable thread to execute https requests of all config entities + executor.execute(new ClHttpClient(configRequest, restResponseMap)); + } + + /** + * Closes this stream and releases any system resources associated + * with it. If the stream is already closed then invoking this + * method has no effect. + * + * @throws IOException if an I/O error occurs + */ + @Override + public void close() throws IOException { + executor.shutdown(); + } +} diff --git a/participant/participant-impl/participant-impl-http/src/main/java/org/onap/policy/clamp/controlloop/participant/http/main/models/ConfigRequest.java b/participant/participant-impl/participant-impl-http/src/main/java/org/onap/policy/clamp/controlloop/participant/http/main/models/ConfigRequest.java new file mode 100644 index 000000000..87cab88a7 --- /dev/null +++ b/participant/participant-impl/participant-impl-http/src/main/java/org/onap/policy/clamp/controlloop/participant/http/main/models/ConfigRequest.java @@ -0,0 +1,51 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.http.main.models; + +import java.util.List; +import java.util.Map; +import javax.validation.Valid; +import javax.validation.constraints.Min; +import javax.validation.constraints.NotBlank; +import javax.validation.constraints.NotNull; +import javax.ws.rs.DefaultValue; +import lombok.AllArgsConstructor; +import lombok.Data; + +@Data +@AllArgsConstructor +public class ConfigRequest { + + @NotBlank + private String baseUrl; + + @NotNull + private Map<String, String> httpHeaders; + + @NotNull + @Valid + private List<ConfigurationEntity> configurationEntities; + + @Min(value = 1) + @DefaultValue("20") + private int uninitializedToPassiveTimeout; + +} diff --git a/participant/participant-impl/participant-impl-http/src/main/java/org/onap/policy/clamp/controlloop/participant/http/main/models/ConfigurationEntity.java b/participant/participant-impl/participant-impl-http/src/main/java/org/onap/policy/clamp/controlloop/participant/http/main/models/ConfigurationEntity.java new file mode 100644 index 000000000..8703a9d21 --- /dev/null +++ b/participant/participant-impl/participant-impl-http/src/main/java/org/onap/policy/clamp/controlloop/participant/http/main/models/ConfigurationEntity.java @@ -0,0 +1,40 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.http.main.models; + +import java.util.List; +import javax.validation.Valid; +import javax.validation.constraints.NotNull; +import lombok.AllArgsConstructor; +import lombok.Data; +import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; + +@Data +@AllArgsConstructor +public class ConfigurationEntity { + + @NotNull + private ToscaConceptIdentifier configurationEntityId; + + @NotNull + @Valid + private List<RestParams> restSequence; +} diff --git a/participant/participant-impl/participant-impl-http/src/main/java/org/onap/policy/clamp/controlloop/participant/http/main/models/RestParams.java b/participant/participant-impl/participant-impl-http/src/main/java/org/onap/policy/clamp/controlloop/participant/http/main/models/RestParams.java new file mode 100644 index 000000000..fc4e02897 --- /dev/null +++ b/participant/participant-impl/participant-impl-http/src/main/java/org/onap/policy/clamp/controlloop/participant/http/main/models/RestParams.java @@ -0,0 +1,54 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.http.main.models; + +import java.util.Map; +import javax.validation.constraints.Max; +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; +import lombok.AllArgsConstructor; +import lombok.Data; +import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; + +@Data +@AllArgsConstructor +public class RestParams { + + @NotNull + private ToscaConceptIdentifier restRequestId; + + @NotNull + private String httpMethod; + + @NotNull + private String path; + + @Min(100) + @Max(599) + private int expectedResponse; + + private Map<String, Object> pathParams; + + private Map<String, String> queryParams; + + private String body; + +} diff --git a/participant/participant-impl/participant-impl-http/src/main/java/org/onap/policy/clamp/controlloop/participant/http/main/webclient/ClHttpClient.java b/participant/participant-impl/participant-impl-http/src/main/java/org/onap/policy/clamp/controlloop/participant/http/main/webclient/ClHttpClient.java new file mode 100644 index 000000000..0e0ea557e --- /dev/null +++ b/participant/participant-impl/participant-impl-http/src/main/java/org/onap/policy/clamp/controlloop/participant/http/main/webclient/ClHttpClient.java @@ -0,0 +1,135 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2021 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.clamp.controlloop.participant.http.main.webclient; + +import java.lang.invoke.MethodHandles; +import java.time.Duration; +import java.util.Map; +import java.util.Objects; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.onap.policy.clamp.controlloop.participant.http.main.exception.HttpWebClientException; +import org.onap.policy.clamp.controlloop.participant.http.main.models.ConfigRequest; +import org.onap.policy.clamp.controlloop.participant.http.main.models.ConfigurationEntity; +import org.onap.policy.clamp.controlloop.participant.http.main.models.RestParams; +import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.web.reactive.function.BodyInserters; +import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.util.UriComponentsBuilder; +import reactor.core.publisher.Mono; + +public class ClHttpClient implements Runnable { + + private static final Logger LOGGER = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private final ConfigRequest configRequest; + + private Map<ToscaConceptIdentifier, Pair<Integer, String>> responseMap; + + /** + * Constructor. + */ + public ClHttpClient(ConfigRequest configRequest, Map<ToscaConceptIdentifier, Pair<Integer, String>> responseMap) { + this.configRequest = configRequest; + this.responseMap = responseMap; + } + + /** + * Runnable to execute http requests. + */ + @Override + public void run() { + + var webClient = WebClient.builder() + .baseUrl(configRequest.getBaseUrl()) + .defaultHeaders(httpHeaders -> httpHeaders.addAll(createHeaders(configRequest))) + .build(); + + for (ConfigurationEntity configurationEntity : configRequest.getConfigurationEntities()) { + LOGGER.info("Executing http requests for the config entity {}", + configurationEntity.getConfigurationEntityId()); + + executeRequest(webClient, configurationEntity); + } + } + + private void executeRequest(WebClient client, ConfigurationEntity configurationEntity) { + + // Iterate the sequence of http requests + for (RestParams request: configurationEntity.getRestSequence()) { + String response = null; + try { + var httpMethod = Objects.requireNonNull(HttpMethod.resolve(request.getHttpMethod())); + var uri = createUriString(request); + LOGGER.info("Executing HTTP request: {} for the Rest request id: {}", httpMethod, + request.getRestRequestId()); + + response = client.method(httpMethod) + .uri(uri) + .body(request.getBody() == null ? BodyInserters.empty() + : BodyInserters.fromValue(request.getBody())) + .exchangeToMono(clientResponse -> + clientResponse.statusCode().value() == request.getExpectedResponse() + ? clientResponse.bodyToMono(String.class) + : Mono.error(new HttpWebClientException(clientResponse.statusCode().value(), + clientResponse.bodyToMono(String.class).toString()))) + .block(Duration.ofMillis(configRequest.getUninitializedToPassiveTimeout() * 1000L)); + + LOGGER.info("HTTP response for the {} request : {}", httpMethod, response); + responseMap.put(request.getRestRequestId(), new ImmutablePair<>(request.getExpectedResponse(), + response)); + + } catch (HttpWebClientException ex) { + LOGGER.error("Error occurred on the HTTP request ", ex); + responseMap.put(request.getRestRequestId(), new ImmutablePair<>(ex.getStatusCode().value(), + ex.getResponseBodyAsString())); + } + } + } + + private HttpHeaders createHeaders(ConfigRequest request) { + var headers = new HttpHeaders(); + for (Map.Entry<String, String> entry: request.getHttpHeaders().entrySet()) { + headers.add(entry.getKey(), entry.getValue()); + } + return headers; + } + + private String createUriString(RestParams restParams) { + var uriComponentsBuilder = UriComponentsBuilder.fromUriString(restParams.getPath()); + // Add path params if present + if (restParams.getPathParams() != null) { + uriComponentsBuilder.uriVariables(restParams.getPathParams()); + } + // Add query params if present + if (restParams.getQueryParams() != null) { + for (Map.Entry<String, String> entry : restParams.getQueryParams().entrySet()) { + uriComponentsBuilder.queryParam(entry.getKey(), entry.getValue()); + } + } + return uriComponentsBuilder.build().toUriString(); + } + +} diff --git a/participant/participant-impl/participant-impl-http/src/main/resources/config/application.yaml b/participant/participant-impl/participant-impl-http/src/main/resources/config/application.yaml new file mode 100644 index 000000000..1fd528458 --- /dev/null +++ b/participant/participant-impl/participant-impl-http/src/main/resources/config/application.yaml @@ -0,0 +1,27 @@ +participant: + intermediaryParameters: + reportingTimeInterval: 120000 + description: Participant Description + participantId: + name: HttpParticipant0 + version: 1.0.0 + participantType: + name: org.onap.k8s.controlloop.HttpControlLoopParticipant + version: 2.3.4 + clampControlLoopTopics: + topicSources: + - topic: POLICY-CLRUNTIME-PARTICIPANT + servers: + - ${topicServer:message-router} + topicCommInfrastructure: dmaap + fetchTimeout: 15000 + topicSinks: + - topic: POLICY-CLRUNTIME-PARTICIPANT + servers: + - ${topicServer:message-router} + topicCommInfrastructure: dmaap + + - topic: POLICY-NOTIFICATION + servers: + - ${topicServer:message-router} + topicCommInfrastructure: dmaap |