diff options
7 files changed, 305 insertions, 152 deletions
@@ -23,18 +23,18 @@ Every client that wants to use the JAR, need to implement IConfiguration interfa Configuration parameters: -------------------------- -AsdcAddress : ASDC Distribution Engine address. Value can be either hostname (with or without port), IP:port or FQDN (Fully Qualified Domain Name). -User : User Name for ASDC distribution consumer authentication. -Password : User Password for ASDC distribution consumer authentication. -PollingInterval : Distribution Client Polling Interval towards UEB in seconds. Can Be reconfigured in runtime. -PollingTimeout : Distribution Client Timeout in seconds waiting to UEB server response in each fetch interval. Can Be reconfigured in runtime. -RelevantArtifactTypes : List of artifact types. If the service contains any of the artifacts in the list, the callback will be activated. Can Be reconfigured in runtime. -ConsumerGroup : Returns the consumer group defined for this ONAP component, if no consumer group is defined return null. -EnvironmentName : Returns the environment name (testing, production etc... Can Be reconfigured in runtime. -ConsumerID : Unique ID of ONAP component instance (e.x INSTAR name). -KeyStorePath : Return full path to Client's Key Store that contains either CA certificate or the ASDC's public key (e.g /etc/keystore/asdc-client.jks). file will be deployed with asdc-distribution jar -KeyStorePassword : Return client's Key Store password. -activateServerTLSAuth : Sets whether ASDC server TLS authentication is activated. If set to false, Key Store path and password are not needed to be set. +- AsdcAddress : ASDC Distribution Engine address. Value can be either hostname (with or without port), IP:port or FQDN (Fully Qualified Domain Name). +- User : User Name for ASDC distribution consumer authentication. +- Password : User Password for ASDC distribution consumer authentication. +- PollingInterval : Distribution Client Polling Interval towards UEB in seconds. Can Be reconfigured in runtime. +- PollingTimeout : Distribution Client Timeout in seconds waiting to UEB server response in each fetch interval. Can Be reconfigured in runtime. +- RelevantArtifactTypes : List of artifact types. If the service contains any of the artifacts in the list, the callback will be activated. Can Be reconfigured in runtime. +- ConsumerGroup : Returns the consumer group defined for this ONAP component, if no consumer group is defined return null. +- EnvironmentName : Returns the environment name (testing, production etc... Can Be reconfigured in runtime. +- ConsumerID : Unique ID of ONAP component instance (e.x INSTAR name). +- KeyStorePath : Return full path to Client's Key Store that contains either CA certificate or the ASDC's public key (e.g /etc/keystore/asdc-client.jks). file will be deployed with asdc-distribution jar +- KeyStorePassword : Return client's Key Store password. +- activateServerTLSAuth : Sets whether ASDC server TLS authentication is activated. If set to false, Key Store path and password are not needed to be set. Example of configuration file implementing IConfiguration interface: -------------------------------------------------------------------- diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/http/SdcConnectorClient.java b/sdc-distribution-client/src/main/java/org/onap/sdc/http/SdcConnectorClient.java index 2999ebe..36044c7 100644 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/http/SdcConnectorClient.java +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/http/SdcConnectorClient.java @@ -190,12 +190,11 @@ public class SdcConnectorClient { HttpAsdcResponse downloadResponse = downloadPair.getFirst(); int status = downloadResponse.getStatus(); - if (status == HttpStatus.SC_OK) { + if (status == HttpStatus.SC_OK) { response = parseDownloadArtifactResponse(artifactInfo, downloadResponse); } else { response = handleAsdcDownloadArtifactError(downloadResponse); - } handeAsdcConnectionClose(downloadPair); return response; diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java index 136d43e..8e3aa9b 100644 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java @@ -55,6 +55,7 @@ import org.onap.sdc.http.TopicRegistrationResponse; import org.onap.sdc.utils.DistributionActionResultEnum; import org.onap.sdc.utils.DistributionClientConstants; import org.onap.sdc.utils.GeneralUtils; +import org.onap.sdc.utils.NotificationSender; import org.onap.sdc.utils.Pair; import org.onap.sdc.utils.Wrapper; import org.onap.sdc.api.notification.IVfModuleMetadata; @@ -71,7 +72,6 @@ import com.att.nsa.cambria.client.CambriaClientBuilders.IdentityManagerBuilder; import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; import com.att.nsa.cambria.client.CambriaConsumer; import com.att.nsa.cambria.client.CambriaIdentityManager; -import com.att.nsa.cambria.client.CambriaPublisher.message; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.reflect.TypeToken; @@ -80,25 +80,26 @@ import fj.data.Either; public class DistributionClientImpl implements IDistributionClient { - public static final int POLLING_TIMEOUT_MULTIPLIER = 1000; - public static final long SLEEPING_THREAD_TIME = 1000L; - public static final long PUBLISHER_TIMEOUT = 10L; - public static final int TERMINATION_TIMEOUT = 60; - private static Logger log = LoggerFactory.getLogger(DistributionClientImpl.class.getName()); + private static final int POLLING_TIMEOUT_MULTIPLIER = 1000; + private static final int TERMINATION_TIMEOUT = 60; + private static final Logger log = LoggerFactory.getLogger(DistributionClientImpl.class); private SdcConnectorClient asdcConnector; private ScheduledExecutorService executorPool = null; protected CambriaIdentityManager cambriaIdentityManager = null; private List<String> brokerServers; - protected ApiCredential credential; + private ApiCredential credential; protected Configuration configuration; private INotificationCallback callback; private IStatusCallback statusCallback; private String notificationTopic; private String statusTopic; private boolean isConsumerGroupGenerated = false; + private NotificationSender notificationSender; - private boolean isInitialized, isStarted, isTerminated; + private boolean isInitialized; + private boolean isStarted; + private boolean isTerminated; @Override public IConfiguration getConfiguration() { @@ -292,7 +293,10 @@ public class DistributionClientImpl implements IDistributionClient { } // 1. get ueb server list from configuration if (errorWrapper.isEmpty()) { - initUebServerList(errorWrapper); + List<String> servers = initUebServerList(errorWrapper); + if (servers != null) { + this.brokerServers = servers; + } } // 2.validate artifact types against asdc server if (errorWrapper.isEmpty()) { @@ -301,11 +305,19 @@ public class DistributionClientImpl implements IDistributionClient { // 3. create keys if (errorWrapper.isEmpty()) { this.callback = callback; - createUebKeys(errorWrapper); + ApiCredential apiCredential = createUebKeys(errorWrapper); + if (apiCredential != null) { + this.credential = apiCredential; + } } // 4. register for topics if (errorWrapper.isEmpty()) { - registerForTopics(errorWrapper); + TopicRegistrationResponse topics = registerForTopics(errorWrapper, this.credential); + if (topics != null) { + this.notificationTopic = topics.getDistrNotificationTopicName(); + this.statusTopic = topics.getDistrStatusTopicName(); + this.notificationSender = createNotificationSender(); + } } IDistributionClientResult result; @@ -323,7 +335,11 @@ public class DistributionClientImpl implements IDistributionClient { return new SdcConnectorClient(configuration, new HttpAsdcClient(configuration)); } - private void registerForTopics(Wrapper<IDistributionClientResult> errorWrapper) { + private NotificationSender createNotificationSender() { + return new NotificationSender(brokerServers); + } + + private TopicRegistrationResponse registerForTopics(Wrapper<IDistributionClientResult> errorWrapper, ApiCredential credential) { Either<TopicRegistrationResponse, DistributionClientResultImpl> registerAsdcTopics = asdcConnector.registerAsdcTopics(credential); if (registerAsdcTopics.isRight()) { @@ -334,22 +350,25 @@ public class DistributionClientImpl implements IDistributionClient { } errorWrapper.setInnerElement(registerAsdcTopics.right().value()); } else { - TopicRegistrationResponse topics = registerAsdcTopics.left().value(); - notificationTopic = topics.getDistrNotificationTopicName(); - statusTopic = topics.getDistrStatusTopicName(); + return registerAsdcTopics.left().value(); } - + return null; } - private void createUebKeys(Wrapper<IDistributionClientResult> errorWrapper) { + private ApiCredential createUebKeys(Wrapper<IDistributionClientResult> errorWrapper) { + ApiCredential apiCredential = null; + initCambriaClient(errorWrapper); if (errorWrapper.isEmpty()) { log.debug("create keys"); - DistributionClientResultImpl createKeysResponse = createUebKeys(); + Pair<DistributionClientResultImpl, ApiCredential> uebKeys = createUebKeys(); + DistributionClientResultImpl createKeysResponse = uebKeys.getFirst(); + apiCredential = uebKeys.getSecond(); if (createKeysResponse.getDistributionActionResult() != DistributionActionResultEnum.SUCCESS) { errorWrapper.setInnerElement(createKeysResponse); } } + return apiCredential; } private void validateArtifactTypesWithAsdcServer(IConfiguration conf, Wrapper<IDistributionClientResult> errorWrapper) { @@ -376,17 +395,18 @@ public class DistributionClientImpl implements IDistributionClient { } } - private void initUebServerList(Wrapper<IDistributionClientResult> errorWrapper) { + private List<String> initUebServerList(Wrapper<IDistributionClientResult> errorWrapper) { + List<String> brokerServers = null; log.debug("get ueb cluster server list from component(configuration file)"); Either<List<String>, IDistributionClientResult> serverListResponse = getUEBServerList(); if (serverListResponse.isRight()) { errorWrapper.setInnerElement(serverListResponse.right().value()); } else { - brokerServers = serverListResponse.left().value(); } + return brokerServers; } private void validateNotInitilized(Wrapper<IDistributionClientResult> errorWrapper) { @@ -400,58 +420,28 @@ public class DistributionClientImpl implements IDistributionClient { @Override public IDistributionClientResult sendDownloadStatus(IDistributionStatusMessage statusMessage) { log.info("DistributionClient - sendDownloadStatus"); - Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); - validateRunReady(errorWrapper); - if (!errorWrapper.isEmpty()) { - return errorWrapper.getInnerElement(); - } - - return sendStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage)); + return sendErrorStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage)); } - private IDistributionClientResult sendStatus(IDistributionStatusMessageJsonBuilder builder) { - DistributionClientResultImpl statusResult = new DistributionClientResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "Failed to send status"); - log.info("DistributionClient - sendStatus"); - Either<CambriaBatchingPublisher, IDistributionClientResult> eitherPublisher = getCambriaPublisher(); - if (eitherPublisher.isRight()) { - return eitherPublisher.right().value(); - } - CambriaBatchingPublisher pub = eitherPublisher.left().value(); - - log.debug("after create publisher server list " + brokerServers.toString()); - String jsonRequest = builder.build(); - - log.debug("try to send status " + jsonRequest); - - try { - pub.send("MyPartitionKey", jsonRequest); - Thread.sleep(SLEEPING_THREAD_TIME); - } catch (IOException e) { - log.debug("DistributionClient - sendDownloadStatus. Failed to send download status"); - } catch (InterruptedException e) { - log.debug("DistributionClient - sendDownloadStatus. thread was interrupted"); - } finally { - - try { - List<message> stuck = pub.close(PUBLISHER_TIMEOUT, TimeUnit.SECONDS); - - if (!stuck.isEmpty()) { - log.debug("DistributionClient - sendDownloadStatus. " + stuck.size() + " messages unsent"); - } else { - statusResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "messages successfully sent"); - } - } catch (IOException | InterruptedException e) { - log.debug("DistributionClient - sendDownloadStatus. failed to send messages and close publisher "); - } + private IDistributionClientResult sendStatus(IDistributionStatusMessageJsonBuilder statusBuilder) { + IDistributionClientResult distributionResult; + Either<CambriaBatchingPublisher, IDistributionClientResult> cambriaPublisher = getCambriaPublisher(statusTopic, configuration, brokerServers, credential); + if (cambriaPublisher.isRight()) { + distributionResult = cambriaPublisher.right().value(); + } else { + String statusMessage = statusBuilder.build(); + distributionResult = notificationSender.send(cambriaPublisher.left().value(), statusMessage); } - return statusResult; + + return distributionResult; } - private Either<CambriaBatchingPublisher, IDistributionClientResult> getCambriaPublisher() { + private Either<CambriaBatchingPublisher, IDistributionClientResult> getCambriaPublisher(String statusTopic, Configuration configuration, List<String> brokerServers, ApiCredential credential) { CambriaBatchingPublisher cambriaPublisher = null; try { - cambriaPublisher = new PublisherBuilder().onTopic(statusTopic).usingHttps(configuration.isUseHttpsWithDmaap()).usingHosts(brokerServers).build(); + cambriaPublisher = new PublisherBuilder().onTopic(statusTopic).usingHttps(configuration.isUseHttpsWithDmaap()) + .usingHosts(brokerServers).build(); cambriaPublisher.setApiCredentials(credential.getApiKey(), credential.getApiSecret()); } catch (MalformedURLException | GeneralSecurityException e) { Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); @@ -464,12 +454,7 @@ public class DistributionClientImpl implements IDistributionClient { @Override public IDistributionClientResult sendDeploymentStatus(IDistributionStatusMessage statusMessage) { log.info("DistributionClient - sendDeploymentStatus"); - Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); - validateRunReady(errorWrapper); - if (!errorWrapper.isEmpty()) { - return errorWrapper.getInnerElement(); - } - return sendStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage)); + return sendErrorStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage)); } IDistributionClientResult sendNotificationStatus(long currentTimeMillis, String distributionId, ArtifactInfoImpl artifactInfo, boolean isNotified) { @@ -479,13 +464,15 @@ public class DistributionClientImpl implements IDistributionClient { if (!errorWrapper.isEmpty()) { return errorWrapper.getInnerElement(); } - return sendStatus(DistributionStatusMessageJsonBuilderFactory.prepareBuilderForNotificationStatus(getConfiguration().getConsumerID(), currentTimeMillis, distributionId, artifactInfo, isNotified)); + IDistributionStatusMessageJsonBuilder builder = DistributionStatusMessageJsonBuilderFactory.prepareBuilderForNotificationStatus(getConfiguration().getConsumerID(), currentTimeMillis, distributionId, artifactInfo, isNotified); + return sendStatus(builder); } /* *************************** Private Methods *************************************************** */ - protected DistributionClientResultImpl createUebKeys() { + protected Pair<DistributionClientResultImpl, ApiCredential> createUebKeys() { DistributionClientResultImpl response = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "keys created successfuly"); + ApiCredential credential = null; try { String description = String.format(DistributionClientConstants.CLIENT_DESCRIPTION, configuration.getConsumerID()); credential = cambriaIdentityManager.createApiKey(DistributionClientConstants.EMAIL, description); @@ -495,7 +482,7 @@ public class DistributionClientImpl implements IDistributionClient { response = new DistributionClientResultImpl(DistributionActionResultEnum.UEB_KEYS_CREATION_FAILED, "failed to create keys: " + e.getMessage()); log.error(response.toString()); } - return response; + return new Pair<>(response, credential); } private IDistributionClientResult restartConsumer() { @@ -619,7 +606,7 @@ public class DistributionClientImpl implements IDistributionClient { private void validateRunReady(Wrapper<IDistributionClientResult> errorWrapper) { if (errorWrapper.isEmpty()) { - validateInitilized(errorWrapper); + validateInitialized(errorWrapper); } if (errorWrapper.isEmpty()) { validateNotTerminated(errorWrapper); @@ -627,7 +614,7 @@ public class DistributionClientImpl implements IDistributionClient { } - private void validateInitilized(Wrapper<IDistributionClientResult> errorWrapper) { + private void validateInitialized(Wrapper<IDistributionClientResult> errorWrapper) { if (!isInitialized) { log.debug("client was not initialized"); IDistributionClientResult result = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_NOT_INITIALIZED, "distribution client was not initialized"); @@ -694,37 +681,30 @@ public class DistributionClientImpl implements IDistributionClient { @Override public IDistributionClientResult sendDownloadStatus(IDistributionStatusMessage statusMessage, String errorReason) { log.info("DistributionClient - sendDownloadStatus with errorReason"); - Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); - validateRunReady(errorWrapper); - if (!errorWrapper.isEmpty()) { - return errorWrapper.getInnerElement(); - } - - return sendStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason)); + return sendErrorStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason)); } @Override public IDistributionClientResult sendDeploymentStatus(IDistributionStatusMessage statusMessage, String errorReason) { log.info("DistributionClient - sendDeploymentStatus with errorReason"); + return sendErrorStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason)); + + } + + private IDistributionClientResult sendErrorStatus(IDistributionStatusMessageJsonBuilder builder) { Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); validateRunReady(errorWrapper); if (!errorWrapper.isEmpty()) { return errorWrapper.getInnerElement(); } - return sendStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason)); - + return sendStatus(builder); } @Override public IDistributionClientResult sendComponentDoneStatus(IComponentDoneStatusMessage statusMessage) { log.info("DistributionClient - sendComponentDone status"); - Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); - validateRunReady(errorWrapper); - if (!errorWrapper.isEmpty()) { - return errorWrapper.getInnerElement(); - } - return sendStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage)); + return sendErrorStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage)); } @@ -732,12 +712,7 @@ public class DistributionClientImpl implements IDistributionClient { public IDistributionClientResult sendComponentDoneStatus(IComponentDoneStatusMessage statusMessage, String errorReason) { log.info("DistributionClient - sendComponentDone status with errorReason"); - Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); - validateRunReady(errorWrapper); - if (!errorWrapper.isEmpty()) { - return errorWrapper.getInnerElement(); - } - return sendStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason)); + return sendErrorStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason)); } @@ -754,12 +729,7 @@ public class DistributionClientImpl implements IDistributionClient { public IDistributionClientResult sendFinalDistrStatus(IFinalDistrStatusMessage statusMessage) { log.info("DistributionClient - sendFinalDistributionStatus status"); - Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); - validateRunReady(errorWrapper); - if (!errorWrapper.isEmpty()) { - return errorWrapper.getInnerElement(); - } - return sendStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage)); + return sendErrorStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage)); } @@ -768,12 +738,7 @@ public class DistributionClientImpl implements IDistributionClient { public IDistributionClientResult sendFinalDistrStatus(IFinalDistrStatusMessage statusMessage, String errorReason) { log.info("DistributionClient - sendFinalDistributionStatus status with errorReason"); - Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>(); - validateRunReady(errorWrapper); - if (!errorWrapper.isEmpty()) { - return errorWrapper.getInnerElement(); - } - return sendStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason)); + return sendErrorStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason)); } diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionStatusMessageJsonBuilderFactory.java b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionStatusMessageJsonBuilderFactory.java index 9792467..62be395 100644 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionStatusMessageJsonBuilderFactory.java +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionStatusMessageJsonBuilderFactory.java @@ -77,19 +77,10 @@ public class DistributionStatusMessageJsonBuilderFactory { static IDistributionStatusMessageJsonBuilder prepareBuilderForNotificationStatus(final String consumerId, final long currentTimeMillis, final String distributionId, final ArtifactInfoImpl artifactInfo, boolean isNotified) { - final DistributionStatusEnum fakeStatusToReplace = DistributionStatusEnum.DOWNLOAD_OK; - final String jsonRequest = buildDistributionStatusJson(consumerId, currentTimeMillis, distributionId, artifactInfo, fakeStatusToReplace); - - DistributionStatusNotificationEnum notificationStatus = isNotified ? DistributionStatusNotificationEnum.NOTIFIED : DistributionStatusNotificationEnum.NOT_NOTIFIED; - final String changedRequest = jsonRequest.replace(fakeStatusToReplace.name(), notificationStatus.name()); - IDistributionStatusMessageJsonBuilder builder = new IDistributionStatusMessageJsonBuilder() { - @Override - public String build() { - return changedRequest; - } - }; - return builder; + final DistributionStatusEnum distributionStatus = isNotified ? DistributionStatusEnum.NOTIFIED : DistributionStatusEnum.NOT_NOTIFIED; + final String jsonRequest = buildDistributionStatusJson(consumerId, currentTimeMillis, distributionId, artifactInfo, distributionStatus); + return () -> jsonRequest; } private static String buildDistributionStatusJson(final String consumerId, @@ -125,24 +116,10 @@ public class DistributionStatusMessageJsonBuilderFactory { }; DistributionStatusMessageImpl message = new DistributionStatusMessageImpl(statusMessage); - final String jsonRequest = gson.toJson(message); - return jsonRequest; + return gson.toJson(message); } private static IDistributionStatusMessageJsonBuilder prepareBuilderFromImpl(DistributionStatusMessageImpl message) { - final String jsonRequest = gson.toJson(message); - IDistributionStatusMessageJsonBuilder builder = new IDistributionStatusMessageJsonBuilder() { - @Override - public String build() { - return jsonRequest; - } - }; - return builder; - } - - private enum DistributionStatusNotificationEnum { - NOTIFIED, NOT_NOTIFIED + return () -> gson.toJson(message); } - - } diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/DistributionStatusEnum.java b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/DistributionStatusEnum.java index 3e7f061..d77ac60 100644 --- a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/DistributionStatusEnum.java +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/DistributionStatusEnum.java @@ -54,7 +54,6 @@ public enum DistributionStatusEnum { * ONAP component is requested to publish this status once component successfully complete downloading and storing all the data it needs from the service. */ COMPONENT_DONE_OK, - /** * ONAP component is requested to publish this status when component failed to download or failed to store one or more of the mandatory information it requires from the service model. * <p> @@ -66,5 +65,10 @@ public enum DistributionStatusEnum { */ DISTRIBUTION_COMPLETE_OK, - DISTRIBUTION_COMPLETE_ERROR + DISTRIBUTION_COMPLETE_ERROR, + + NOTIFIED, + + NOT_NOTIFIED + } diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/NotificationSender.java b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/NotificationSender.java new file mode 100644 index 0000000..1fb71a6 --- /dev/null +++ b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/NotificationSender.java @@ -0,0 +1,76 @@ +/*- + * ============LICENSE_START======================================================= + * sdc-distribution-client + * ================================================================================ + * Copyright (C) 2020 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.sdc.utils; + +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import com.att.nsa.cambria.client.CambriaPublisher; +import org.onap.sdc.api.results.IDistributionClientResult; +import org.onap.sdc.impl.DistributionClientResultImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class NotificationSender { + + private static final Logger log = LoggerFactory.getLogger(NotificationSender.class); + private static final long PUBLISHER_CLOSING_TIMEOUT = 10L; + private static final long SLEEP_TIME = 1; + + private final List<String> brokerServers; + + public NotificationSender(List<String> brokerServers) { + this.brokerServers = brokerServers; + } + + public IDistributionClientResult send(CambriaBatchingPublisher publisher, String status) { + log.info("DistributionClient - sendStatus"); + DistributionClientResultImpl distributionResult; + try { + log.debug("Publisher server list: {}", brokerServers); + log.debug("Trying to send status: {}", status); + publisher.send("MyPartitionKey", status); + TimeUnit.SECONDS.sleep(SLEEP_TIME); + } catch (IOException | InterruptedException e) { + log.error("DistributionClient - sendDownloadStatus. Failed to send download status", e); + } finally { + distributionResult = closePublisher(publisher); + } + return distributionResult; + } + + private DistributionClientResultImpl closePublisher(CambriaBatchingPublisher publisher) { + DistributionClientResultImpl distributionResult = new DistributionClientResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "Failed to send status"); + try { + List<CambriaPublisher.message> notSentMessages = publisher.close(PUBLISHER_CLOSING_TIMEOUT, TimeUnit.SECONDS); + if (notSentMessages.isEmpty()) { + distributionResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "Messages successfully sent"); + } else { + log.debug("DistributionClient - sendDownloadStatus. {} messages were not sent", notSentMessages.size()); + } + } catch (IOException | InterruptedException e) { + log.error("DistributionClient - sendDownloadStatus. Failed to send messages and close publisher.", e); + } + return distributionResult; + } +} diff --git a/sdc-distribution-client/src/test/java/org/onap/sdc/utils/NotificationSenderTest.java b/sdc-distribution-client/src/test/java/org/onap/sdc/utils/NotificationSenderTest.java new file mode 100644 index 0000000..0be7793 --- /dev/null +++ b/sdc-distribution-client/src/test/java/org/onap/sdc/utils/NotificationSenderTest.java @@ -0,0 +1,132 @@ +/*- + * ============LICENSE_START======================================================= + * sdc-distribution-client + * ================================================================================ + * Copyright (C) 2020 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.sdc.utils; + +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import com.att.nsa.cambria.client.CambriaPublisher; +import fj.data.Either; +import org.junit.Test; +import org.onap.sdc.api.results.IDistributionClientResult; +import org.onap.sdc.impl.DistributionClientResultImpl; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class NotificationSenderTest { + + private final String status = "status"; + private final CambriaPublisher.message message = new CambriaPublisher.message("sample-partition", "sample-message"); + private final List<CambriaPublisher.message> notEmptySendingFailedMessages = Collections.singletonList(message); + private final DistributionClientResultImpl successResponse = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "Messages successfully sent"); + private final DistributionClientResultImpl generalErrorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "Failed to send status"); + + private final CambriaBatchingPublisher publisher = mock(CambriaBatchingPublisher.class); + private final List<String> emptyServers = Collections.emptyList(); + private final NotificationSender validNotificationSender = new NotificationSender(emptyServers);; + + + @Test + public void whenPublisherIsValidAndNoExceptionsAreThrownShouldReturnSuccessStatus() throws IOException, InterruptedException { + //given + when(publisher.send(anyString(), anyString())).thenReturn(0); + when(publisher.close(anyLong(), any())).thenReturn(Collections.emptyList()); + + //when + IDistributionClientResult result = validNotificationSender.send(publisher, status); + + //then + assertEquals(successResponse.getDistributionActionResult(), result.getDistributionActionResult()); + } + + @Test + public void whenPublisherCouldNotSendShouldReturnGeneralErrorStatus() throws IOException, InterruptedException { + //given + when(publisher.send(anyString(), anyString())).thenReturn(0); + when(publisher.close(anyLong(), any())).thenReturn(notEmptySendingFailedMessages); + + //when + IDistributionClientResult result = validNotificationSender.send(publisher, status); + + //then + assertEquals(generalErrorResponse.getDistributionActionResult(), result.getDistributionActionResult()); + } + + @Test + public void whenSendingThrowsIOExceptionShouldReturnGeneralErrorStatus() throws IOException, InterruptedException { + //given + when(publisher.send(anyString(), anyString())).thenThrow(new IOException()); + when(publisher.close(anyLong(), any())).thenReturn(notEmptySendingFailedMessages); + + //when + IDistributionClientResult result = validNotificationSender.send(publisher, status); + + //then + assertEquals(generalErrorResponse.getDistributionActionResult(), result.getDistributionActionResult()); + } + + @Test + public void whenSendingThrowsInterruptedExceptionShouldReturnGeneralErrorStatus() throws IOException, InterruptedException { + //given + when(publisher.send(anyString(), anyString())).thenAnswer(invocationOnMock -> {throw new InterruptedException();}); + when(publisher.close(anyLong(), any())).thenReturn(notEmptySendingFailedMessages); + + //when + IDistributionClientResult result = validNotificationSender.send(publisher, status); + + //then + assertEquals(generalErrorResponse.getDistributionActionResult(), result.getDistributionActionResult()); + } + + @Test + public void whenClosingThrowsIOExceptionShouldReturnGeneralErrorStatus() throws IOException, InterruptedException { + //given + when(publisher.send(anyString(), anyString())).thenReturn(0); + when(publisher.close(anyLong(), any())).thenThrow(new IOException()); + + //when + IDistributionClientResult result = validNotificationSender.send(publisher, status); + + //then + assertEquals(generalErrorResponse.getDistributionActionResult(), result.getDistributionActionResult()); + } + + @Test + public void whenClosingThrowsInterruptedExceptionShouldReturnGeneralErrorStatus() throws IOException, InterruptedException { + //given + when(publisher.send(anyString(), anyString())).thenReturn(0); + when(publisher.close(anyLong(), any())).thenAnswer(invocationOnMock -> {throw new InterruptedException();}); + + //when + IDistributionClientResult result = validNotificationSender.send(publisher, status); + + //then + assertEquals(generalErrorResponse.getDistributionActionResult(), result.getDistributionActionResult()); + } +}
\ No newline at end of file |