aboutsummaryrefslogtreecommitdiffstats
path: root/catalog-be/src/main
diff options
context:
space:
mode:
authorefiacor <fiachra.corcoran@est.tech>2022-08-12 13:14:17 +0100
committerMichael Morris <michael.morris@est.tech>2022-10-04 08:17:31 +0000
commit425ebb15f5177f99103198aedc158b5691763fd9 (patch)
treec645e774ed65c900c9d723da626c4ddd23d150a6 /catalog-be/src/main
parentef496e13e2ad9bea5975df7b671442404fbf38b6 (diff)
[KAFKA] Allow kafka params to be passed as config
Allow topic names to be passed Add new api endpoint to retrieve the kafka and topic info Signed-off-by: efiacor <fiachra.corcoran@est.tech> Issue-ID: DMAAP-1744 Change-Id: Id7bdcf54c6191a5953bc94092218595bf608a733
Diffstat (limited to 'catalog-be/src/main')
-rw-r--r--catalog-be/src/main/docker/backend/chef-repo/cookbooks/sdc-catalog-be/templates/default/BE-distribution-engine-configuration.yaml.erb11
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/distribution/DistributionBusinessLogic.java21
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/distribution/api/client/KafkaDataResponse.java35
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/distribution/servlet/DistributionServlet.java53
-rw-r--r--catalog-be/src/main/resources/config/distribution-engine-configuration.yaml2
5 files changed, 122 insertions, 0 deletions
diff --git a/catalog-be/src/main/docker/backend/chef-repo/cookbooks/sdc-catalog-be/templates/default/BE-distribution-engine-configuration.yaml.erb b/catalog-be/src/main/docker/backend/chef-repo/cookbooks/sdc-catalog-be/templates/default/BE-distribution-engine-configuration.yaml.erb
index 7eeb7a8ff8..0989def1f9 100644
--- a/catalog-be/src/main/docker/backend/chef-repo/cookbooks/sdc-catalog-be/templates/default/BE-distribution-engine-configuration.yaml.erb
+++ b/catalog-be/src/main/docker/backend/chef-repo/cookbooks/sdc-catalog-be/templates/default/BE-distribution-engine-configuration.yaml.erb
@@ -6,8 +6,19 @@ uebServers:
uebPublicKey: <%= node['UEB']['PublicKey'] %>
uebSecretKey: <%= node['UEB']['SecretKey'] %>
+<% if node.exist?('DistributionTopics','notificationTopicName') -%>
+distributionNotifTopicName: <%= node['DistributionTopics']['notificationTopicName'] %>
+<% else %>
distributionNotifTopicName: SDC-DISTR-NOTIF-TOPIC
+<% end -%>
+
+<% if node.exist?('DistributionTopics','statusTopicName') -%>
+distributionStatusTopicName: <%= node['DistributionTopics']['statusTopicName'] %>
+<% else %>
distributionStatusTopicName: SDC-DISTR-STATUS-TOPIC
+<% end -%>
+
+kafkaBootStrapServers: <%= node['Kafka']['bootstrap'] %>
initRetryIntervalSec: 5
initMaxIntervalSec: 60
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/distribution/DistributionBusinessLogic.java b/catalog-be/src/main/java/org/openecomp/sdc/be/distribution/DistributionBusinessLogic.java
index d293e9b615..7d9c7357c2 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/distribution/DistributionBusinessLogic.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/distribution/DistributionBusinessLogic.java
@@ -42,6 +42,7 @@ import org.openecomp.sdc.be.config.ConfigurationManager;
import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
import org.openecomp.sdc.be.dao.api.ActionStatus;
import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
+import org.openecomp.sdc.be.distribution.api.client.KafkaDataResponse;
import org.openecomp.sdc.be.distribution.api.client.RegistrationRequest;
import org.openecomp.sdc.be.distribution.api.client.ServerListResponse;
import org.openecomp.sdc.be.distribution.api.client.TopicRegistrationResponse;
@@ -102,6 +103,26 @@ public class DistributionBusinessLogic {
}
}
+ public Either<KafkaDataResponse, ResponseFormat> getKafkaData() {
+ DistributionEngineConfiguration distributionEngineConfiguration = ConfigurationManager.getConfigurationManager()
+ .getDistributionEngineConfiguration();
+ String bootStrapServers = distributionEngineConfiguration.getKafkaBootStrapServers();
+ if (bootStrapServers != null) {
+ String statusTopicName = DistributionEngineInitTask
+ .buildTopicName(distributionEngineConfiguration.getDistributionStatusTopicName(), distributionEngineConfiguration.getEnvironments().get(0));
+ String notificationTopicName = DistributionEngineInitTask
+ .buildTopicName(distributionEngineConfiguration.getDistributionNotifTopicName(), distributionEngineConfiguration.getEnvironments().get(0));
+ KafkaDataResponse kafkaDataResponse = new KafkaDataResponse();
+ kafkaDataResponse.setKafkaBootStrapServer(bootStrapServers);
+ kafkaDataResponse.setDistrStatusTopicName(statusTopicName);
+ kafkaDataResponse.setDistrNotificationTopicName(notificationTopicName);
+ return Either.left(kafkaDataResponse);
+ } else {
+ ResponseFormat errorResponseWrapper = getResponseFormatManager().getResponseFormat(ActionStatus.GENERAL_ERROR);
+ return Either.right(errorResponseWrapper);
+ }
+ }
+
public void handleRegistration(Wrapper<Response> responseWrapper, RegistrationRequest registrationRequest, AuditHandler auditHandler) {
CambriaErrorResponse registerResponse = null;
try {
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/distribution/api/client/KafkaDataResponse.java b/catalog-be/src/main/java/org/openecomp/sdc/be/distribution/api/client/KafkaDataResponse.java
new file mode 100644
index 0000000000..db61e884c5
--- /dev/null
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/distribution/api/client/KafkaDataResponse.java
@@ -0,0 +1,35 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * SDC
+ * ================================================================================
+ * Copyright (C) 2022 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.sdc.be.distribution.api.client;
+
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+@Getter
+@Setter
+@NoArgsConstructor
+public class KafkaDataResponse {
+
+ private String kafkaBootStrapServer;
+ private String distrNotificationTopicName;
+ private String distrStatusTopicName;
+}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/distribution/servlet/DistributionServlet.java b/catalog-be/src/main/java/org/openecomp/sdc/be/distribution/servlet/DistributionServlet.java
index ee28d948e0..fb8a34830d 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/distribution/servlet/DistributionServlet.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/distribution/servlet/DistributionServlet.java
@@ -49,6 +49,7 @@ import org.openecomp.sdc.be.config.BeEcompErrorManager;
import org.openecomp.sdc.be.dao.api.ActionStatus;
import org.openecomp.sdc.be.distribution.AuditHandler;
import org.openecomp.sdc.be.distribution.DistributionBusinessLogic;
+import org.openecomp.sdc.be.distribution.api.client.KafkaDataResponse;
import org.openecomp.sdc.be.distribution.api.client.RegistrationRequest;
import org.openecomp.sdc.be.distribution.api.client.ServerListResponse;
import org.openecomp.sdc.be.distribution.api.client.TopicRegistrationResponse;
@@ -150,6 +151,58 @@ public class DistributionServlet extends BeGenericServlet {
}
/**
+ * @param requestId UUID to track the incoming request
+ * @param instanceId UUID to identify the requesting instance
+ * @param accept Determines the format of the body of the response
+ * @param authorization Username and password auth towards SDC
+ * @return KafkaDataResponse (Kafka bootstrap server and topic list to be used by clients)
+ */
+ @GET
+ @Path("/distributionKafkaData")
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Operation(description = "Kafka data", method = "GET", summary = "return the kafka cluster and topic list", responses = {
+ @ApiResponse(responseCode = "200", description = "ECOMP component is authenticated and kafka endpoint and topic list is returned", content = @Content(array = @ArraySchema(schema = @Schema(implementation = KafkaDataResponse.class)))),
+ @ApiResponse(responseCode = "400", description = "Missing 'X-ECOMP-InstanceID' HTTP header - POL5001"),
+ @ApiResponse(responseCode = "401", description = "ECOMP component should authenticate itself and to re-send again HTTP request with its credentials for Basic Authentication - POL5002"),
+ @ApiResponse(responseCode = "403", description = "ECOMP component is not authorized - POL5003"),
+ @ApiResponse(responseCode = "405", description = "Method Not Allowed: Invalid HTTP method type used ( PUT,DELETE,POST will be rejected) - POL4050"),
+ @ApiResponse(responseCode = "500", description = "The GET request failed either due to internal SDC problem or Cambria Service failure. ECOMP Component should continue the attempts to get the needed information - POL5000")})
+ public Response getKafkaData(
+ @Parameter(description = "X-ECOMP-RequestID header", required = false) @HeaderParam(value = Constants.X_ECOMP_REQUEST_ID_HEADER) String requestId,
+ @Parameter(description = "X-ECOMP-InstanceID header", required = true) @HeaderParam(value = Constants.X_ECOMP_INSTANCE_ID_HEADER) String instanceId,
+ @Parameter(description = "Determines the format of the body of the response", required = false) @HeaderParam(value = Constants.ACCEPT_HEADER) String accept,
+ @Parameter(description = "The username and password", required = true) @HeaderParam(value = Constants.AUTHORIZATION_HEADER) String authorization) {
+ String url = request.getMethod() + " " + request.getRequestURI();
+ log.debug(START_HANDLE_REQUEST_OF, url);
+ ResponseFormat responseFormat;
+ if (instanceId == null) {
+ responseFormat = getComponentsUtils().getResponseFormat(ActionStatus.MISSING_X_ECOMP_INSTANCE_ID);
+ getComponentsUtils().auditGetUebCluster(null, responseFormat.getStatus().toString(), responseFormat.getFormattedMessage());
+ return buildErrorResponse(responseFormat);
+ }
+ try {
+ Response response;
+ Either<KafkaDataResponse, ResponseFormat> actionResponse = distributionLogic.getKafkaData();
+ if (actionResponse.isRight()) {
+ responseFormat = actionResponse.right().value();
+ response = buildErrorResponse(responseFormat);
+ } else {
+ responseFormat = getComponentsUtils().getResponseFormat(ActionStatus.OK);
+ response = buildOkResponse(responseFormat, actionResponse.left().value());
+ }
+ getComponentsUtils().auditGetUebCluster(instanceId, responseFormat.getStatus().toString(), responseFormat.getFormattedMessage());
+ return response;
+ } catch (Exception e) {
+ BeEcompErrorManager.getInstance().logBeRestApiGeneralError("failed to get kafka cluster and topic list from configuration");
+ log.debug("failed to get kafka cluster and topic list from configuration", e);
+ responseFormat = getComponentsUtils().getResponseFormat(ActionStatus.GENERAL_ERROR);
+ getComponentsUtils().auditGetUebCluster(instanceId, responseFormat.getStatus().toString(), responseFormat.getFormattedMessage());
+ return buildErrorResponse(responseFormat);
+ }
+ }
+
+ /**
* @param requestId
* @param instanceId
* @param accept
diff --git a/catalog-be/src/main/resources/config/distribution-engine-configuration.yaml b/catalog-be/src/main/resources/config/distribution-engine-configuration.yaml
index 1f8d59bfcb..cc7a38d498 100644
--- a/catalog-be/src/main/resources/config/distribution-engine-configuration.yaml
+++ b/catalog-be/src/main/resources/config/distribution-engine-configuration.yaml
@@ -12,6 +12,8 @@ uebSecretKey: 4ZRPzNJfEUK0sSNBvccd2m7X
distributionNotifTopicName: ASDC-DISTR-NOTIF-TOPIC
distributionStatusTopicName: ASDC-DISTR-STATUS-TOPIC
+kafkaBootStrapServers: kafka-bootstrap:9092
+
initRetryIntervalSec: 5
initMaxIntervalSec: 60