summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--README.md24
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/http/SdcConnectorClient.java3
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java181
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionStatusMessageJsonBuilderFactory.java33
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/utils/DistributionStatusEnum.java8
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/utils/NotificationSender.java76
-rw-r--r--sdc-distribution-client/src/test/java/org/onap/sdc/utils/NotificationSenderTest.java132
7 files changed, 305 insertions, 152 deletions
diff --git a/README.md b/README.md
index 1f455f7..17c519c 100644
--- a/README.md
+++ b/README.md
@@ -23,18 +23,18 @@ Every client that wants to use the JAR, need to implement IConfiguration interfa
Configuration parameters:
--------------------------
-AsdcAddress : ASDC Distribution Engine address. Value can be either hostname (with or without port), IP:port or FQDN (Fully Qualified Domain Name).
-User : User Name for ASDC distribution consumer authentication.
-Password : User Password for ASDC distribution consumer authentication.
-PollingInterval : Distribution Client Polling Interval towards UEB in seconds. Can Be reconfigured in runtime.
-PollingTimeout : Distribution Client Timeout in seconds waiting to UEB server response in each fetch interval. Can Be reconfigured in runtime.
-RelevantArtifactTypes : List of artifact types. If the service contains any of the artifacts in the list, the callback will be activated. Can Be reconfigured in runtime.
-ConsumerGroup : Returns the consumer group defined for this ONAP component, if no consumer group is defined return null.
-EnvironmentName : Returns the environment name (testing, production etc... Can Be reconfigured in runtime.
-ConsumerID : Unique ID of ONAP component instance (e.x INSTAR name).
-KeyStorePath : Return full path to Client's Key Store that contains either CA certificate or the ASDC's public key (e.g /etc/keystore/asdc-client.jks). file will be deployed with asdc-distribution jar
-KeyStorePassword : Return client's Key Store password.
-activateServerTLSAuth : Sets whether ASDC server TLS authentication is activated. If set to false, Key Store path and password are not needed to be set.
+- AsdcAddress : ASDC Distribution Engine address. Value can be either hostname (with or without port), IP:port or FQDN (Fully Qualified Domain Name).
+- User : User Name for ASDC distribution consumer authentication.
+- Password : User Password for ASDC distribution consumer authentication.
+- PollingInterval : Distribution Client Polling Interval towards UEB in seconds. Can Be reconfigured in runtime.
+- PollingTimeout : Distribution Client Timeout in seconds waiting to UEB server response in each fetch interval. Can Be reconfigured in runtime.
+- RelevantArtifactTypes : List of artifact types. If the service contains any of the artifacts in the list, the callback will be activated. Can Be reconfigured in runtime.
+- ConsumerGroup : Returns the consumer group defined for this ONAP component, if no consumer group is defined return null.
+- EnvironmentName : Returns the environment name (testing, production etc... Can Be reconfigured in runtime.
+- ConsumerID : Unique ID of ONAP component instance (e.x INSTAR name).
+- KeyStorePath : Return full path to Client's Key Store that contains either CA certificate or the ASDC's public key (e.g /etc/keystore/asdc-client.jks). file will be deployed with asdc-distribution jar
+- KeyStorePassword : Return client's Key Store password.
+- activateServerTLSAuth : Sets whether ASDC server TLS authentication is activated. If set to false, Key Store path and password are not needed to be set.
Example of configuration file implementing IConfiguration interface:
--------------------------------------------------------------------
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/http/SdcConnectorClient.java b/sdc-distribution-client/src/main/java/org/onap/sdc/http/SdcConnectorClient.java
index 2999ebe..36044c7 100644
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/http/SdcConnectorClient.java
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/http/SdcConnectorClient.java
@@ -190,12 +190,11 @@ public class SdcConnectorClient {
HttpAsdcResponse downloadResponse = downloadPair.getFirst();
int status = downloadResponse.getStatus();
- if (status == HttpStatus.SC_OK) {
+ if (status == HttpStatus.SC_OK) {
response = parseDownloadArtifactResponse(artifactInfo, downloadResponse);
} else {
response = handleAsdcDownloadArtifactError(downloadResponse);
-
}
handeAsdcConnectionClose(downloadPair);
return response;
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java
index 136d43e..8e3aa9b 100644
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java
@@ -55,6 +55,7 @@ import org.onap.sdc.http.TopicRegistrationResponse;
import org.onap.sdc.utils.DistributionActionResultEnum;
import org.onap.sdc.utils.DistributionClientConstants;
import org.onap.sdc.utils.GeneralUtils;
+import org.onap.sdc.utils.NotificationSender;
import org.onap.sdc.utils.Pair;
import org.onap.sdc.utils.Wrapper;
import org.onap.sdc.api.notification.IVfModuleMetadata;
@@ -71,7 +72,6 @@ import com.att.nsa.cambria.client.CambriaClientBuilders.IdentityManagerBuilder;
import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
import com.att.nsa.cambria.client.CambriaConsumer;
import com.att.nsa.cambria.client.CambriaIdentityManager;
-import com.att.nsa.cambria.client.CambriaPublisher.message;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
@@ -80,25 +80,26 @@ import fj.data.Either;
public class DistributionClientImpl implements IDistributionClient {
- public static final int POLLING_TIMEOUT_MULTIPLIER = 1000;
- public static final long SLEEPING_THREAD_TIME = 1000L;
- public static final long PUBLISHER_TIMEOUT = 10L;
- public static final int TERMINATION_TIMEOUT = 60;
- private static Logger log = LoggerFactory.getLogger(DistributionClientImpl.class.getName());
+ private static final int POLLING_TIMEOUT_MULTIPLIER = 1000;
+ private static final int TERMINATION_TIMEOUT = 60;
+ private static final Logger log = LoggerFactory.getLogger(DistributionClientImpl.class);
private SdcConnectorClient asdcConnector;
private ScheduledExecutorService executorPool = null;
protected CambriaIdentityManager cambriaIdentityManager = null;
private List<String> brokerServers;
- protected ApiCredential credential;
+ private ApiCredential credential;
protected Configuration configuration;
private INotificationCallback callback;
private IStatusCallback statusCallback;
private String notificationTopic;
private String statusTopic;
private boolean isConsumerGroupGenerated = false;
+ private NotificationSender notificationSender;
- private boolean isInitialized, isStarted, isTerminated;
+ private boolean isInitialized;
+ private boolean isStarted;
+ private boolean isTerminated;
@Override
public IConfiguration getConfiguration() {
@@ -292,7 +293,10 @@ public class DistributionClientImpl implements IDistributionClient {
}
// 1. get ueb server list from configuration
if (errorWrapper.isEmpty()) {
- initUebServerList(errorWrapper);
+ List<String> servers = initUebServerList(errorWrapper);
+ if (servers != null) {
+ this.brokerServers = servers;
+ }
}
// 2.validate artifact types against asdc server
if (errorWrapper.isEmpty()) {
@@ -301,11 +305,19 @@ public class DistributionClientImpl implements IDistributionClient {
// 3. create keys
if (errorWrapper.isEmpty()) {
this.callback = callback;
- createUebKeys(errorWrapper);
+ ApiCredential apiCredential = createUebKeys(errorWrapper);
+ if (apiCredential != null) {
+ this.credential = apiCredential;
+ }
}
// 4. register for topics
if (errorWrapper.isEmpty()) {
- registerForTopics(errorWrapper);
+ TopicRegistrationResponse topics = registerForTopics(errorWrapper, this.credential);
+ if (topics != null) {
+ this.notificationTopic = topics.getDistrNotificationTopicName();
+ this.statusTopic = topics.getDistrStatusTopicName();
+ this.notificationSender = createNotificationSender();
+ }
}
IDistributionClientResult result;
@@ -323,7 +335,11 @@ public class DistributionClientImpl implements IDistributionClient {
return new SdcConnectorClient(configuration, new HttpAsdcClient(configuration));
}
- private void registerForTopics(Wrapper<IDistributionClientResult> errorWrapper) {
+ private NotificationSender createNotificationSender() {
+ return new NotificationSender(brokerServers);
+ }
+
+ private TopicRegistrationResponse registerForTopics(Wrapper<IDistributionClientResult> errorWrapper, ApiCredential credential) {
Either<TopicRegistrationResponse, DistributionClientResultImpl> registerAsdcTopics = asdcConnector.registerAsdcTopics(credential);
if (registerAsdcTopics.isRight()) {
@@ -334,22 +350,25 @@ public class DistributionClientImpl implements IDistributionClient {
}
errorWrapper.setInnerElement(registerAsdcTopics.right().value());
} else {
- TopicRegistrationResponse topics = registerAsdcTopics.left().value();
- notificationTopic = topics.getDistrNotificationTopicName();
- statusTopic = topics.getDistrStatusTopicName();
+ return registerAsdcTopics.left().value();
}
-
+ return null;
}
- private void createUebKeys(Wrapper<IDistributionClientResult> errorWrapper) {
+ private ApiCredential createUebKeys(Wrapper<IDistributionClientResult> errorWrapper) {
+ ApiCredential apiCredential = null;
+
initCambriaClient(errorWrapper);
if (errorWrapper.isEmpty()) {
log.debug("create keys");
- DistributionClientResultImpl createKeysResponse = createUebKeys();
+ Pair<DistributionClientResultImpl, ApiCredential> uebKeys = createUebKeys();
+ DistributionClientResultImpl createKeysResponse = uebKeys.getFirst();
+ apiCredential = uebKeys.getSecond();
if (createKeysResponse.getDistributionActionResult() != DistributionActionResultEnum.SUCCESS) {
errorWrapper.setInnerElement(createKeysResponse);
}
}
+ return apiCredential;
}
private void validateArtifactTypesWithAsdcServer(IConfiguration conf, Wrapper<IDistributionClientResult> errorWrapper) {
@@ -376,17 +395,18 @@ public class DistributionClientImpl implements IDistributionClient {
}
}
- private void initUebServerList(Wrapper<IDistributionClientResult> errorWrapper) {
+ private List<String> initUebServerList(Wrapper<IDistributionClientResult> errorWrapper) {
+ List<String> brokerServers = null;
log.debug("get ueb cluster server list from component(configuration file)");
Either<List<String>, IDistributionClientResult> serverListResponse = getUEBServerList();
if (serverListResponse.isRight()) {
errorWrapper.setInnerElement(serverListResponse.right().value());
} else {
-
brokerServers = serverListResponse.left().value();
}
+ return brokerServers;
}
private void validateNotInitilized(Wrapper<IDistributionClientResult> errorWrapper) {
@@ -400,58 +420,28 @@ public class DistributionClientImpl implements IDistributionClient {
@Override
public IDistributionClientResult sendDownloadStatus(IDistributionStatusMessage statusMessage) {
log.info("DistributionClient - sendDownloadStatus");
- Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
- validateRunReady(errorWrapper);
- if (!errorWrapper.isEmpty()) {
- return errorWrapper.getInnerElement();
- }
-
- return sendStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage));
+ return sendErrorStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage));
}
- private IDistributionClientResult sendStatus(IDistributionStatusMessageJsonBuilder builder) {
- DistributionClientResultImpl statusResult = new DistributionClientResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "Failed to send status");
- log.info("DistributionClient - sendStatus");
- Either<CambriaBatchingPublisher, IDistributionClientResult> eitherPublisher = getCambriaPublisher();
- if (eitherPublisher.isRight()) {
- return eitherPublisher.right().value();
- }
- CambriaBatchingPublisher pub = eitherPublisher.left().value();
-
- log.debug("after create publisher server list " + brokerServers.toString());
- String jsonRequest = builder.build();
-
- log.debug("try to send status " + jsonRequest);
-
- try {
- pub.send("MyPartitionKey", jsonRequest);
- Thread.sleep(SLEEPING_THREAD_TIME);
- } catch (IOException e) {
- log.debug("DistributionClient - sendDownloadStatus. Failed to send download status");
- } catch (InterruptedException e) {
- log.debug("DistributionClient - sendDownloadStatus. thread was interrupted");
- } finally {
-
- try {
- List<message> stuck = pub.close(PUBLISHER_TIMEOUT, TimeUnit.SECONDS);
-
- if (!stuck.isEmpty()) {
- log.debug("DistributionClient - sendDownloadStatus. " + stuck.size() + " messages unsent");
- } else {
- statusResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "messages successfully sent");
- }
- } catch (IOException | InterruptedException e) {
- log.debug("DistributionClient - sendDownloadStatus. failed to send messages and close publisher ");
- }
+ private IDistributionClientResult sendStatus(IDistributionStatusMessageJsonBuilder statusBuilder) {
+ IDistributionClientResult distributionResult;
+ Either<CambriaBatchingPublisher, IDistributionClientResult> cambriaPublisher = getCambriaPublisher(statusTopic, configuration, brokerServers, credential);
+ if (cambriaPublisher.isRight()) {
+ distributionResult = cambriaPublisher.right().value();
+ } else {
+ String statusMessage = statusBuilder.build();
+ distributionResult = notificationSender.send(cambriaPublisher.left().value(), statusMessage);
}
- return statusResult;
+
+ return distributionResult;
}
- private Either<CambriaBatchingPublisher, IDistributionClientResult> getCambriaPublisher() {
+ private Either<CambriaBatchingPublisher, IDistributionClientResult> getCambriaPublisher(String statusTopic, Configuration configuration, List<String> brokerServers, ApiCredential credential) {
CambriaBatchingPublisher cambriaPublisher = null;
try {
- cambriaPublisher = new PublisherBuilder().onTopic(statusTopic).usingHttps(configuration.isUseHttpsWithDmaap()).usingHosts(brokerServers).build();
+ cambriaPublisher = new PublisherBuilder().onTopic(statusTopic).usingHttps(configuration.isUseHttpsWithDmaap())
+ .usingHosts(brokerServers).build();
cambriaPublisher.setApiCredentials(credential.getApiKey(), credential.getApiSecret());
} catch (MalformedURLException | GeneralSecurityException e) {
Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
@@ -464,12 +454,7 @@ public class DistributionClientImpl implements IDistributionClient {
@Override
public IDistributionClientResult sendDeploymentStatus(IDistributionStatusMessage statusMessage) {
log.info("DistributionClient - sendDeploymentStatus");
- Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
- validateRunReady(errorWrapper);
- if (!errorWrapper.isEmpty()) {
- return errorWrapper.getInnerElement();
- }
- return sendStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage));
+ return sendErrorStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage));
}
IDistributionClientResult sendNotificationStatus(long currentTimeMillis, String distributionId, ArtifactInfoImpl artifactInfo, boolean isNotified) {
@@ -479,13 +464,15 @@ public class DistributionClientImpl implements IDistributionClient {
if (!errorWrapper.isEmpty()) {
return errorWrapper.getInnerElement();
}
- return sendStatus(DistributionStatusMessageJsonBuilderFactory.prepareBuilderForNotificationStatus(getConfiguration().getConsumerID(), currentTimeMillis, distributionId, artifactInfo, isNotified));
+ IDistributionStatusMessageJsonBuilder builder = DistributionStatusMessageJsonBuilderFactory.prepareBuilderForNotificationStatus(getConfiguration().getConsumerID(), currentTimeMillis, distributionId, artifactInfo, isNotified);
+ return sendStatus(builder);
}
/* *************************** Private Methods *************************************************** */
- protected DistributionClientResultImpl createUebKeys() {
+ protected Pair<DistributionClientResultImpl, ApiCredential> createUebKeys() {
DistributionClientResultImpl response = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "keys created successfuly");
+ ApiCredential credential = null;
try {
String description = String.format(DistributionClientConstants.CLIENT_DESCRIPTION, configuration.getConsumerID());
credential = cambriaIdentityManager.createApiKey(DistributionClientConstants.EMAIL, description);
@@ -495,7 +482,7 @@ public class DistributionClientImpl implements IDistributionClient {
response = new DistributionClientResultImpl(DistributionActionResultEnum.UEB_KEYS_CREATION_FAILED, "failed to create keys: " + e.getMessage());
log.error(response.toString());
}
- return response;
+ return new Pair<>(response, credential);
}
private IDistributionClientResult restartConsumer() {
@@ -619,7 +606,7 @@ public class DistributionClientImpl implements IDistributionClient {
private void validateRunReady(Wrapper<IDistributionClientResult> errorWrapper) {
if (errorWrapper.isEmpty()) {
- validateInitilized(errorWrapper);
+ validateInitialized(errorWrapper);
}
if (errorWrapper.isEmpty()) {
validateNotTerminated(errorWrapper);
@@ -627,7 +614,7 @@ public class DistributionClientImpl implements IDistributionClient {
}
- private void validateInitilized(Wrapper<IDistributionClientResult> errorWrapper) {
+ private void validateInitialized(Wrapper<IDistributionClientResult> errorWrapper) {
if (!isInitialized) {
log.debug("client was not initialized");
IDistributionClientResult result = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_NOT_INITIALIZED, "distribution client was not initialized");
@@ -694,37 +681,30 @@ public class DistributionClientImpl implements IDistributionClient {
@Override
public IDistributionClientResult sendDownloadStatus(IDistributionStatusMessage statusMessage, String errorReason) {
log.info("DistributionClient - sendDownloadStatus with errorReason");
- Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
- validateRunReady(errorWrapper);
- if (!errorWrapper.isEmpty()) {
- return errorWrapper.getInnerElement();
- }
-
- return sendStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason));
+ return sendErrorStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason));
}
@Override
public IDistributionClientResult sendDeploymentStatus(IDistributionStatusMessage statusMessage, String errorReason) {
log.info("DistributionClient - sendDeploymentStatus with errorReason");
+ return sendErrorStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason));
+
+ }
+
+ private IDistributionClientResult sendErrorStatus(IDistributionStatusMessageJsonBuilder builder) {
Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
validateRunReady(errorWrapper);
if (!errorWrapper.isEmpty()) {
return errorWrapper.getInnerElement();
}
- return sendStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason));
-
+ return sendStatus(builder);
}
@Override
public IDistributionClientResult sendComponentDoneStatus(IComponentDoneStatusMessage statusMessage) {
log.info("DistributionClient - sendComponentDone status");
- Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
- validateRunReady(errorWrapper);
- if (!errorWrapper.isEmpty()) {
- return errorWrapper.getInnerElement();
- }
- return sendStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage));
+ return sendErrorStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage));
}
@@ -732,12 +712,7 @@ public class DistributionClientImpl implements IDistributionClient {
public IDistributionClientResult sendComponentDoneStatus(IComponentDoneStatusMessage statusMessage,
String errorReason) {
log.info("DistributionClient - sendComponentDone status with errorReason");
- Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
- validateRunReady(errorWrapper);
- if (!errorWrapper.isEmpty()) {
- return errorWrapper.getInnerElement();
- }
- return sendStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason));
+ return sendErrorStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason));
}
@@ -754,12 +729,7 @@ public class DistributionClientImpl implements IDistributionClient {
public IDistributionClientResult sendFinalDistrStatus(IFinalDistrStatusMessage statusMessage) {
log.info("DistributionClient - sendFinalDistributionStatus status");
- Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
- validateRunReady(errorWrapper);
- if (!errorWrapper.isEmpty()) {
- return errorWrapper.getInnerElement();
- }
- return sendStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage));
+ return sendErrorStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage));
}
@@ -768,12 +738,7 @@ public class DistributionClientImpl implements IDistributionClient {
public IDistributionClientResult sendFinalDistrStatus(IFinalDistrStatusMessage statusMessage,
String errorReason) {
log.info("DistributionClient - sendFinalDistributionStatus status with errorReason");
- Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
- validateRunReady(errorWrapper);
- if (!errorWrapper.isEmpty()) {
- return errorWrapper.getInnerElement();
- }
- return sendStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason));
+ return sendErrorStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason));
}
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionStatusMessageJsonBuilderFactory.java b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionStatusMessageJsonBuilderFactory.java
index 9792467..62be395 100644
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionStatusMessageJsonBuilderFactory.java
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionStatusMessageJsonBuilderFactory.java
@@ -77,19 +77,10 @@ public class DistributionStatusMessageJsonBuilderFactory {
static IDistributionStatusMessageJsonBuilder prepareBuilderForNotificationStatus(final String consumerId, final long currentTimeMillis, final String distributionId,
final ArtifactInfoImpl artifactInfo, boolean isNotified) {
- final DistributionStatusEnum fakeStatusToReplace = DistributionStatusEnum.DOWNLOAD_OK;
- final String jsonRequest = buildDistributionStatusJson(consumerId, currentTimeMillis, distributionId, artifactInfo, fakeStatusToReplace);
-
- DistributionStatusNotificationEnum notificationStatus = isNotified ? DistributionStatusNotificationEnum.NOTIFIED : DistributionStatusNotificationEnum.NOT_NOTIFIED;
- final String changedRequest = jsonRequest.replace(fakeStatusToReplace.name(), notificationStatus.name());
- IDistributionStatusMessageJsonBuilder builder = new IDistributionStatusMessageJsonBuilder() {
- @Override
- public String build() {
- return changedRequest;
- }
- };
- return builder;
+ final DistributionStatusEnum distributionStatus = isNotified ? DistributionStatusEnum.NOTIFIED : DistributionStatusEnum.NOT_NOTIFIED;
+ final String jsonRequest = buildDistributionStatusJson(consumerId, currentTimeMillis, distributionId, artifactInfo, distributionStatus);
+ return () -> jsonRequest;
}
private static String buildDistributionStatusJson(final String consumerId,
@@ -125,24 +116,10 @@ public class DistributionStatusMessageJsonBuilderFactory {
};
DistributionStatusMessageImpl message = new DistributionStatusMessageImpl(statusMessage);
- final String jsonRequest = gson.toJson(message);
- return jsonRequest;
+ return gson.toJson(message);
}
private static IDistributionStatusMessageJsonBuilder prepareBuilderFromImpl(DistributionStatusMessageImpl message) {
- final String jsonRequest = gson.toJson(message);
- IDistributionStatusMessageJsonBuilder builder = new IDistributionStatusMessageJsonBuilder() {
- @Override
- public String build() {
- return jsonRequest;
- }
- };
- return builder;
- }
-
- private enum DistributionStatusNotificationEnum {
- NOTIFIED, NOT_NOTIFIED
+ return () -> gson.toJson(message);
}
-
-
}
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/DistributionStatusEnum.java b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/DistributionStatusEnum.java
index 3e7f061..d77ac60 100644
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/DistributionStatusEnum.java
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/DistributionStatusEnum.java
@@ -54,7 +54,6 @@ public enum DistributionStatusEnum {
* ONAP component is requested to publish this status once component successfully complete downloading and storing all the data it needs from the service.
*/
COMPONENT_DONE_OK,
-
/**
* ONAP component is requested to publish this status when component failed to download or failed to store one or more of the mandatory information it requires from the service model.
* <p>
@@ -66,5 +65,10 @@ public enum DistributionStatusEnum {
*/
DISTRIBUTION_COMPLETE_OK,
- DISTRIBUTION_COMPLETE_ERROR
+ DISTRIBUTION_COMPLETE_ERROR,
+
+ NOTIFIED,
+
+ NOT_NOTIFIED
+
}
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/utils/NotificationSender.java b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/NotificationSender.java
new file mode 100644
index 0000000..1fb71a6
--- /dev/null
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/utils/NotificationSender.java
@@ -0,0 +1,76 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * sdc-distribution-client
+ * ================================================================================
+ * Copyright (C) 2020 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.sdc.utils;
+
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.att.nsa.cambria.client.CambriaPublisher;
+import org.onap.sdc.api.results.IDistributionClientResult;
+import org.onap.sdc.impl.DistributionClientResultImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class NotificationSender {
+
+ private static final Logger log = LoggerFactory.getLogger(NotificationSender.class);
+ private static final long PUBLISHER_CLOSING_TIMEOUT = 10L;
+ private static final long SLEEP_TIME = 1;
+
+ private final List<String> brokerServers;
+
+ public NotificationSender(List<String> brokerServers) {
+ this.brokerServers = brokerServers;
+ }
+
+ public IDistributionClientResult send(CambriaBatchingPublisher publisher, String status) {
+ log.info("DistributionClient - sendStatus");
+ DistributionClientResultImpl distributionResult;
+ try {
+ log.debug("Publisher server list: {}", brokerServers);
+ log.debug("Trying to send status: {}", status);
+ publisher.send("MyPartitionKey", status);
+ TimeUnit.SECONDS.sleep(SLEEP_TIME);
+ } catch (IOException | InterruptedException e) {
+ log.error("DistributionClient - sendDownloadStatus. Failed to send download status", e);
+ } finally {
+ distributionResult = closePublisher(publisher);
+ }
+ return distributionResult;
+ }
+
+ private DistributionClientResultImpl closePublisher(CambriaBatchingPublisher publisher) {
+ DistributionClientResultImpl distributionResult = new DistributionClientResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "Failed to send status");
+ try {
+ List<CambriaPublisher.message> notSentMessages = publisher.close(PUBLISHER_CLOSING_TIMEOUT, TimeUnit.SECONDS);
+ if (notSentMessages.isEmpty()) {
+ distributionResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "Messages successfully sent");
+ } else {
+ log.debug("DistributionClient - sendDownloadStatus. {} messages were not sent", notSentMessages.size());
+ }
+ } catch (IOException | InterruptedException e) {
+ log.error("DistributionClient - sendDownloadStatus. Failed to send messages and close publisher.", e);
+ }
+ return distributionResult;
+ }
+}
diff --git a/sdc-distribution-client/src/test/java/org/onap/sdc/utils/NotificationSenderTest.java b/sdc-distribution-client/src/test/java/org/onap/sdc/utils/NotificationSenderTest.java
new file mode 100644
index 0000000..0be7793
--- /dev/null
+++ b/sdc-distribution-client/src/test/java/org/onap/sdc/utils/NotificationSenderTest.java
@@ -0,0 +1,132 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * sdc-distribution-client
+ * ================================================================================
+ * Copyright (C) 2020 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.sdc.utils;
+
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.att.nsa.cambria.client.CambriaPublisher;
+import fj.data.Either;
+import org.junit.Test;
+import org.onap.sdc.api.results.IDistributionClientResult;
+import org.onap.sdc.impl.DistributionClientResultImpl;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class NotificationSenderTest {
+
+ private final String status = "status";
+ private final CambriaPublisher.message message = new CambriaPublisher.message("sample-partition", "sample-message");
+ private final List<CambriaPublisher.message> notEmptySendingFailedMessages = Collections.singletonList(message);
+ private final DistributionClientResultImpl successResponse = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "Messages successfully sent");
+ private final DistributionClientResultImpl generalErrorResponse = new DistributionClientResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "Failed to send status");
+
+ private final CambriaBatchingPublisher publisher = mock(CambriaBatchingPublisher.class);
+ private final List<String> emptyServers = Collections.emptyList();
+ private final NotificationSender validNotificationSender = new NotificationSender(emptyServers);;
+
+
+ @Test
+ public void whenPublisherIsValidAndNoExceptionsAreThrownShouldReturnSuccessStatus() throws IOException, InterruptedException {
+ //given
+ when(publisher.send(anyString(), anyString())).thenReturn(0);
+ when(publisher.close(anyLong(), any())).thenReturn(Collections.emptyList());
+
+ //when
+ IDistributionClientResult result = validNotificationSender.send(publisher, status);
+
+ //then
+ assertEquals(successResponse.getDistributionActionResult(), result.getDistributionActionResult());
+ }
+
+ @Test
+ public void whenPublisherCouldNotSendShouldReturnGeneralErrorStatus() throws IOException, InterruptedException {
+ //given
+ when(publisher.send(anyString(), anyString())).thenReturn(0);
+ when(publisher.close(anyLong(), any())).thenReturn(notEmptySendingFailedMessages);
+
+ //when
+ IDistributionClientResult result = validNotificationSender.send(publisher, status);
+
+ //then
+ assertEquals(generalErrorResponse.getDistributionActionResult(), result.getDistributionActionResult());
+ }
+
+ @Test
+ public void whenSendingThrowsIOExceptionShouldReturnGeneralErrorStatus() throws IOException, InterruptedException {
+ //given
+ when(publisher.send(anyString(), anyString())).thenThrow(new IOException());
+ when(publisher.close(anyLong(), any())).thenReturn(notEmptySendingFailedMessages);
+
+ //when
+ IDistributionClientResult result = validNotificationSender.send(publisher, status);
+
+ //then
+ assertEquals(generalErrorResponse.getDistributionActionResult(), result.getDistributionActionResult());
+ }
+
+ @Test
+ public void whenSendingThrowsInterruptedExceptionShouldReturnGeneralErrorStatus() throws IOException, InterruptedException {
+ //given
+ when(publisher.send(anyString(), anyString())).thenAnswer(invocationOnMock -> {throw new InterruptedException();});
+ when(publisher.close(anyLong(), any())).thenReturn(notEmptySendingFailedMessages);
+
+ //when
+ IDistributionClientResult result = validNotificationSender.send(publisher, status);
+
+ //then
+ assertEquals(generalErrorResponse.getDistributionActionResult(), result.getDistributionActionResult());
+ }
+
+ @Test
+ public void whenClosingThrowsIOExceptionShouldReturnGeneralErrorStatus() throws IOException, InterruptedException {
+ //given
+ when(publisher.send(anyString(), anyString())).thenReturn(0);
+ when(publisher.close(anyLong(), any())).thenThrow(new IOException());
+
+ //when
+ IDistributionClientResult result = validNotificationSender.send(publisher, status);
+
+ //then
+ assertEquals(generalErrorResponse.getDistributionActionResult(), result.getDistributionActionResult());
+ }
+
+ @Test
+ public void whenClosingThrowsInterruptedExceptionShouldReturnGeneralErrorStatus() throws IOException, InterruptedException {
+ //given
+ when(publisher.send(anyString(), anyString())).thenReturn(0);
+ when(publisher.close(anyLong(), any())).thenAnswer(invocationOnMock -> {throw new InterruptedException();});
+
+ //when
+ IDistributionClientResult result = validNotificationSender.send(publisher, status);
+
+ //then
+ assertEquals(generalErrorResponse.getDistributionActionResult(), result.getDistributionActionResult());
+ }
+} \ No newline at end of file