From 425ebb15f5177f99103198aedc158b5691763fd9 Mon Sep 17 00:00:00 2001 From: efiacor Date: Fri, 12 Aug 2022 13:14:17 +0100 Subject: [KAFKA] Allow kafka params to be passed as config Allow topic names to be passed Add new api endpoint to retrieve the kafka and topic info Signed-off-by: efiacor Issue-ID: DMAAP-1744 Change-Id: Id7bdcf54c6191a5953bc94092218595bf608a733 --- .../be/distribution/DistributionBusinessLogic.java | 21 +++++++++ .../distribution/api/client/KafkaDataResponse.java | 35 ++++++++++++++ .../distribution/servlet/DistributionServlet.java | 53 ++++++++++++++++++++++ 3 files changed, 109 insertions(+) create mode 100644 catalog-be/src/main/java/org/openecomp/sdc/be/distribution/api/client/KafkaDataResponse.java (limited to 'catalog-be/src/main/java/org') diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/distribution/DistributionBusinessLogic.java b/catalog-be/src/main/java/org/openecomp/sdc/be/distribution/DistributionBusinessLogic.java index d293e9b615..7d9c7357c2 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/distribution/DistributionBusinessLogic.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/distribution/DistributionBusinessLogic.java @@ -42,6 +42,7 @@ import org.openecomp.sdc.be.config.ConfigurationManager; import org.openecomp.sdc.be.config.DistributionEngineConfiguration; import org.openecomp.sdc.be.dao.api.ActionStatus; import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus; +import org.openecomp.sdc.be.distribution.api.client.KafkaDataResponse; import org.openecomp.sdc.be.distribution.api.client.RegistrationRequest; import org.openecomp.sdc.be.distribution.api.client.ServerListResponse; import org.openecomp.sdc.be.distribution.api.client.TopicRegistrationResponse; @@ -102,6 +103,26 @@ public class DistributionBusinessLogic { } } + public Either getKafkaData() { + DistributionEngineConfiguration distributionEngineConfiguration = ConfigurationManager.getConfigurationManager() + .getDistributionEngineConfiguration(); + String bootStrapServers = distributionEngineConfiguration.getKafkaBootStrapServers(); + if (bootStrapServers != null) { + String statusTopicName = DistributionEngineInitTask + .buildTopicName(distributionEngineConfiguration.getDistributionStatusTopicName(), distributionEngineConfiguration.getEnvironments().get(0)); + String notificationTopicName = DistributionEngineInitTask + .buildTopicName(distributionEngineConfiguration.getDistributionNotifTopicName(), distributionEngineConfiguration.getEnvironments().get(0)); + KafkaDataResponse kafkaDataResponse = new KafkaDataResponse(); + kafkaDataResponse.setKafkaBootStrapServer(bootStrapServers); + kafkaDataResponse.setDistrStatusTopicName(statusTopicName); + kafkaDataResponse.setDistrNotificationTopicName(notificationTopicName); + return Either.left(kafkaDataResponse); + } else { + ResponseFormat errorResponseWrapper = getResponseFormatManager().getResponseFormat(ActionStatus.GENERAL_ERROR); + return Either.right(errorResponseWrapper); + } + } + public void handleRegistration(Wrapper responseWrapper, RegistrationRequest registrationRequest, AuditHandler auditHandler) { CambriaErrorResponse registerResponse = null; try { diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/distribution/api/client/KafkaDataResponse.java b/catalog-be/src/main/java/org/openecomp/sdc/be/distribution/api/client/KafkaDataResponse.java new file mode 100644 index 0000000000..db61e884c5 --- /dev/null +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/distribution/api/client/KafkaDataResponse.java @@ -0,0 +1,35 @@ +/*- + * ============LICENSE_START======================================================= + * SDC + * ================================================================================ + * Copyright (C) 2022 Nordix Foundation. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.sdc.be.distribution.api.client; + +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@Getter +@Setter +@NoArgsConstructor +public class KafkaDataResponse { + + private String kafkaBootStrapServer; + private String distrNotificationTopicName; + private String distrStatusTopicName; +} diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/distribution/servlet/DistributionServlet.java b/catalog-be/src/main/java/org/openecomp/sdc/be/distribution/servlet/DistributionServlet.java index ee28d948e0..fb8a34830d 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/distribution/servlet/DistributionServlet.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/distribution/servlet/DistributionServlet.java @@ -49,6 +49,7 @@ import org.openecomp.sdc.be.config.BeEcompErrorManager; import org.openecomp.sdc.be.dao.api.ActionStatus; import org.openecomp.sdc.be.distribution.AuditHandler; import org.openecomp.sdc.be.distribution.DistributionBusinessLogic; +import org.openecomp.sdc.be.distribution.api.client.KafkaDataResponse; import org.openecomp.sdc.be.distribution.api.client.RegistrationRequest; import org.openecomp.sdc.be.distribution.api.client.ServerListResponse; import org.openecomp.sdc.be.distribution.api.client.TopicRegistrationResponse; @@ -149,6 +150,58 @@ public class DistributionServlet extends BeGenericServlet { } } + /** + * @param requestId UUID to track the incoming request + * @param instanceId UUID to identify the requesting instance + * @param accept Determines the format of the body of the response + * @param authorization Username and password auth towards SDC + * @return KafkaDataResponse (Kafka bootstrap server and topic list to be used by clients) + */ + @GET + @Path("/distributionKafkaData") + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + @Operation(description = "Kafka data", method = "GET", summary = "return the kafka cluster and topic list", responses = { + @ApiResponse(responseCode = "200", description = "ECOMP component is authenticated and kafka endpoint and topic list is returned", content = @Content(array = @ArraySchema(schema = @Schema(implementation = KafkaDataResponse.class)))), + @ApiResponse(responseCode = "400", description = "Missing 'X-ECOMP-InstanceID' HTTP header - POL5001"), + @ApiResponse(responseCode = "401", description = "ECOMP component should authenticate itself and to re-send again HTTP request with its credentials for Basic Authentication - POL5002"), + @ApiResponse(responseCode = "403", description = "ECOMP component is not authorized - POL5003"), + @ApiResponse(responseCode = "405", description = "Method Not Allowed: Invalid HTTP method type used ( PUT,DELETE,POST will be rejected) - POL4050"), + @ApiResponse(responseCode = "500", description = "The GET request failed either due to internal SDC problem or Cambria Service failure. ECOMP Component should continue the attempts to get the needed information - POL5000")}) + public Response getKafkaData( + @Parameter(description = "X-ECOMP-RequestID header", required = false) @HeaderParam(value = Constants.X_ECOMP_REQUEST_ID_HEADER) String requestId, + @Parameter(description = "X-ECOMP-InstanceID header", required = true) @HeaderParam(value = Constants.X_ECOMP_INSTANCE_ID_HEADER) String instanceId, + @Parameter(description = "Determines the format of the body of the response", required = false) @HeaderParam(value = Constants.ACCEPT_HEADER) String accept, + @Parameter(description = "The username and password", required = true) @HeaderParam(value = Constants.AUTHORIZATION_HEADER) String authorization) { + String url = request.getMethod() + " " + request.getRequestURI(); + log.debug(START_HANDLE_REQUEST_OF, url); + ResponseFormat responseFormat; + if (instanceId == null) { + responseFormat = getComponentsUtils().getResponseFormat(ActionStatus.MISSING_X_ECOMP_INSTANCE_ID); + getComponentsUtils().auditGetUebCluster(null, responseFormat.getStatus().toString(), responseFormat.getFormattedMessage()); + return buildErrorResponse(responseFormat); + } + try { + Response response; + Either actionResponse = distributionLogic.getKafkaData(); + if (actionResponse.isRight()) { + responseFormat = actionResponse.right().value(); + response = buildErrorResponse(responseFormat); + } else { + responseFormat = getComponentsUtils().getResponseFormat(ActionStatus.OK); + response = buildOkResponse(responseFormat, actionResponse.left().value()); + } + getComponentsUtils().auditGetUebCluster(instanceId, responseFormat.getStatus().toString(), responseFormat.getFormattedMessage()); + return response; + } catch (Exception e) { + BeEcompErrorManager.getInstance().logBeRestApiGeneralError("failed to get kafka cluster and topic list from configuration"); + log.debug("failed to get kafka cluster and topic list from configuration", e); + responseFormat = getComponentsUtils().getResponseFormat(ActionStatus.GENERAL_ERROR); + getComponentsUtils().auditGetUebCluster(instanceId, responseFormat.getStatus().toString(), responseFormat.getFormattedMessage()); + return buildErrorResponse(responseFormat); + } + } + /** * @param requestId * @param instanceId -- cgit 1.2.3-korg