diff options
Diffstat (limited to 'sdc-distribution-client/src/main/java/org/onap/sdc/utils')
7 files changed, 260 insertions, 60 deletions
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/DistributionActionResultEnum.java b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/DistributionActionResultEnum.java index 514630f..834751a 100644 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/DistributionActionResultEnum.java +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/DistributionActionResultEnum.java @@ -36,23 +36,19 @@ public enum DistributionActionResultEnum { CONFIGURATION_IS_MISSING, CONF_MISSING_USERNAME, CONF_MISSING_PASSWORD, - CONF_MISSING_ASDC_FQDN, + CONF_MISSING_SDC_FQDN, CONF_MISSING_ARTIFACT_TYPES, CONF_CONTAINS_INVALID_ARTIFACT_TYPES, CONF_MISSING_CONSUMER_ID, CONF_MISSING_ENVIRONMENT_NAME, - CONF_MISSING_CONSUMER_GROUP, - CONF_INVALID_ASDC_FQDN, + CONF_INVALID_SDC_FQDN, CONF_INVALID_CONSUME_PRODUCE_STATUS_TOPIC_FALG, - CONF_MISSING_MSG_BUS_ADDRESS, - CONF_INVALID_MSG_BUS_ADDRESS, - ASDC_AUTHENTICATION_FAILED, - ASDC_AUTHORIZATION_FAILED, - ASDC_NOT_FOUND, - ASDC_SERVER_PROBLEM, - ASDC_CONNECTION_FAILED, - ASDC_SERVER_TIMEOUT, + SDC_AUTHENTICATION_FAILED, + SDC_AUTHORIZATION_FAILED, + SDC_NOT_FOUND, + SDC_SERVER_PROBLEM, + SDC_CONNECTION_FAILED, + SDC_SERVER_TIMEOUT, - CAMBRIA_INIT_FAILED, - UEB_KEYS_CREATION_FAILED + MESSAGING_CLIENT_INIT_FAILED } diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/DistributionClientConstants.java b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/DistributionClientConstants.java index 8432611..f4ebbcc 100644 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/DistributionClientConstants.java +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/DistributionClientConstants.java @@ -28,7 +28,7 @@ import java.util.regex.Pattern; * @author mshitrit */ public final class DistributionClientConstants { - public static final String CLIENT_DESCRIPTION = "ASDC Distribution Client Key for %s"; + public static final String CLIENT_DESCRIPTION = "SDC Distribution Client Key for %s"; public static final Pattern FQDN_PATTERN = Pattern.compile( "^" + "([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\\-]{0,61}[a-zA-Z0-9])(\\.([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\\-]{0,61}[a-zA-Z0-9]))*(:[0-9]{2,5})*$", Pattern.CASE_INSENSITIVE); diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/GeneralUtils.java b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/GeneralUtils.java index ff5d201..6a69d8b 100644 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/GeneralUtils.java +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/GeneralUtils.java @@ -70,24 +70,4 @@ public class GeneralUtils { } return isEncoded; } - - - public static Either<List<String>, IDistributionClientResult> convertToValidHostName(List<String> msgBusAddresses) { - List<String> uebLocalHostsNames = new ArrayList<>(); - for (String name : msgBusAddresses) { - try { - uebLocalHostsNames.add(InetAddress.getByName(name).getHostName()); - } catch (UnknownHostException e) { - LOGGER.debug("UnknownHost: {}", e.getMessage(), e); - } - } - Either<List<String>, IDistributionClientResult> response; - if (uebLocalHostsNames.isEmpty()) { - response = Either.right(new DistributionClientResultImpl(DistributionActionResultEnum.CONF_INVALID_MSG_BUS_ADDRESS, "configuration is invalid: " + DistributionActionResultEnum.CONF_INVALID_MSG_BUS_ADDRESS.name())); - - } else { - response = Either.left(uebLocalHostsNames); - } - return response; - } } diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/NotificationSender.java b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/NotificationSender.java index 1fb71a6..44a9ddb 100644 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/NotificationSender.java +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/NotificationSender.java @@ -20,55 +20,46 @@ package org.onap.sdc.utils; -import com.att.nsa.cambria.client.CambriaBatchingPublisher; -import com.att.nsa.cambria.client.CambriaPublisher; +import java.util.concurrent.TimeUnit; +import org.apache.kafka.common.KafkaException; import org.onap.sdc.api.results.IDistributionClientResult; import org.onap.sdc.impl.DistributionClientResultImpl; +import org.onap.sdc.utils.kafka.SdcKafkaProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.List; -import java.util.concurrent.TimeUnit; - public class NotificationSender { private static final Logger log = LoggerFactory.getLogger(NotificationSender.class); - private static final long PUBLISHER_CLOSING_TIMEOUT = 10L; private static final long SLEEP_TIME = 1; + private final SdcKafkaProducer producer; - private final List<String> brokerServers; - - public NotificationSender(List<String> brokerServers) { - this.brokerServers = brokerServers; + public NotificationSender(SdcKafkaProducer producer) { + this.producer = producer; } - public IDistributionClientResult send(CambriaBatchingPublisher publisher, String status) { + public IDistributionClientResult send(String topic, String status) { log.info("DistributionClient - sendStatus"); DistributionClientResultImpl distributionResult; try { - log.debug("Publisher server list: {}", brokerServers); - log.debug("Trying to send status: {}", status); - publisher.send("MyPartitionKey", status); + log.debug("Publisher server list: {}", producer.getMsgBusAddresses()); + log.info("Trying to send status: {} \n to topic {}", status, producer.getTopicName()); + producer.send(topic, "MyPartitionKey", status); TimeUnit.SECONDS.sleep(SLEEP_TIME); - } catch (IOException | InterruptedException e) { - log.error("DistributionClient - sendDownloadStatus. Failed to send download status", e); + } catch (KafkaException | InterruptedException e) { + log.error("DistributionClient - sendStatus. Failed to send status", e); } finally { - distributionResult = closePublisher(publisher); + distributionResult = closeProducer(); } return distributionResult; } - private DistributionClientResultImpl closePublisher(CambriaBatchingPublisher publisher) { + private DistributionClientResultImpl closeProducer() { DistributionClientResultImpl distributionResult = new DistributionClientResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "Failed to send status"); try { - List<CambriaPublisher.message> notSentMessages = publisher.close(PUBLISHER_CLOSING_TIMEOUT, TimeUnit.SECONDS); - if (notSentMessages.isEmpty()) { - distributionResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "Messages successfully sent"); - } else { - log.debug("DistributionClient - sendDownloadStatus. {} messages were not sent", notSentMessages.size()); - } - } catch (IOException | InterruptedException e) { + producer.flush(); + distributionResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "Messages successfully sent"); + } catch (KafkaException | IllegalArgumentException e) { log.error("DistributionClient - sendDownloadStatus. Failed to send messages and close publisher.", e); } return distributionResult; diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/KafkaDataResponse.java b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/KafkaDataResponse.java new file mode 100644 index 0000000..ac1d2ea --- /dev/null +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/KafkaDataResponse.java @@ -0,0 +1,35 @@ +/*- + * ============LICENSE_START======================================================= + * sdc-distribution-client + * ================================================================================ + * Copyright (C) 2022 Nordix Foundation. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.sdc.utils.kafka; + +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@Getter +@Setter +@NoArgsConstructor +public class KafkaDataResponse { + + private String kafkaBootStrapServer; + private String distrNotificationTopicName; + private String distrStatusTopicName; +}
\ No newline at end of file diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaConsumer.java b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaConsumer.java new file mode 100644 index 0000000..71f793d --- /dev/null +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaConsumer.java @@ -0,0 +1,100 @@ +/*- + * ============LICENSE_START======================================================= + * sdc-distribution-client + * ================================================================================ + * Copyright (C) 2022 Nordix Foundation. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.sdc.utils.kafka; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.errors.InvalidGroupIdException; +import org.onap.sdc.impl.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility class that provides a KafkaConsumer to communicate with a kafka cluster + */ +public class SdcKafkaConsumer { + + private static final Logger log = LoggerFactory.getLogger(SdcKafkaConsumer.class); + final KafkaConsumer<String, String> consumer; + private final int pollTimeout; + private String topicName; + + /** + * + * @param configuration The config provided to the client + */ + public SdcKafkaConsumer(Configuration configuration) { + Properties props = new Properties(); + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, configuration.getMsgBusAddress()); + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, configuration.getKafkaSecurityProtocolConfig()); + props.put(SaslConfigs.SASL_MECHANISM, configuration.getKafkaSaslMechanism()); + props.put(SaslConfigs.SASL_JAAS_CONFIG, configuration.getKafkaSaslJaasConfig()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, configuration.getConsumerGroup()); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, configuration.getConsumerID() + "-consumer-" + UUID.randomUUID()); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); + props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + consumer = new KafkaConsumer<>(props); + pollTimeout = configuration.getPollingTimeout(); + } + + /** + * + * @param topic The kafka topic to subscribe to + */ + public void subscribe(String topic) { + try { + consumer.subscribe(Collections.singleton(topic)); + this.topicName = topic; + } + catch (InvalidGroupIdException e) { + log.error("Invalid Group {}", e.getMessage()); + } + } + + /** + * + * @return The list of records returned from the poll + */ + public List<String> poll() { + List<String> msgs = new ArrayList<>(); + ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(pollTimeout)); + for (ConsumerRecord<String, String> rec : records) { + msgs.add(rec.value()); + } + return msgs; + } + + public String getTopicName() { + return topicName; + } +}
\ No newline at end of file diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaProducer.java b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaProducer.java new file mode 100644 index 0000000..9826f8b --- /dev/null +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/kafka/SdcKafkaProducer.java @@ -0,0 +1,98 @@ +/*- + * ============LICENSE_START======================================================= + * sdc-distribution-client + * ================================================================================ + * Copyright (C) 2022 Nordix Foundation. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.sdc.utils.kafka; + +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.Future; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.SaslConfigs; +import org.onap.sdc.impl.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility class that provides a KafkaProducer to communicate with a kafka cluster + */ +public class SdcKafkaProducer { + + private static final Logger log = LoggerFactory.getLogger(SdcKafkaProducer.class); + final KafkaProducer<String, String> producer; + private final List<String> msgBusAddresses; + private final String topicName; + + public SdcKafkaProducer(Configuration configuration) { + Properties props = new Properties(); + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, configuration.getMsgBusAddress()); + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, configuration.getKafkaSecurityProtocolConfig()); + props.put(SaslConfigs.SASL_MECHANISM, configuration.getKafkaSaslMechanism()); + props.put(SaslConfigs.SASL_JAAS_CONFIG, configuration.getKafkaSaslJaasConfig()); + props.put(ProducerConfig.CLIENT_ID_CONFIG, configuration.getConsumerID() + "-producer-" + UUID.randomUUID()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + producer = new KafkaProducer<>(props); + msgBusAddresses = configuration.getMsgBusAddress(); + topicName = configuration.getStatusTopicName(); + } + + /** + * + * @param topicName The name of the topic to publish to + * @param key The key value of the ProducerRecord + * @param value The value of the ProducerRecord + * @return The RecordMetedata of the request + */ + public Future<RecordMetadata> send(String topicName, String key, String value) { + Future<RecordMetadata> data; + try { + data = producer.send(new ProducerRecord<>(topicName, key, value)); + } catch (KafkaException e) { + log.error("Failed the send data: exc {}", e.getMessage()); + throw e; + } + return data; + } + /** + * + */ + public void flush() { + try { + producer.flush(); + } + catch (KafkaException e) { + log.error("Failed to send data: exc {}", e.getMessage()); + } + } + + public List<String> getMsgBusAddresses() { + return msgBusAddresses; + } + + public String getTopicName() { + return topicName; + } +}
\ No newline at end of file |