aboutsummaryrefslogtreecommitdiffstats
path: root/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java
diff options
context:
space:
mode:
authorMichal Banka <michal.banka@nokia.com>2020-12-02 10:41:26 +0100
committerMicha? Ba?ka <michal.banka@nokia.com>2020-12-03 10:26:42 +0000
commitda39a4cd6d73278a8be6e48602f2acb5af1dc1c3 (patch)
tree35af112a1c722a9118f4bd5b5a14c5d6b867103c /sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java
parentf9ff1605045ad31e52fffac2da25f7195d5c7455 (diff)
Improve code quality
- Extracted NotificationSender class from DistributionClientImpl - Fixed list formatting in README - Other small refactors - +2% code coverage Change-Id: I753502d13504057804fcb3557c808dae2520cc74 Signed-off-by: Michal Banka <michal.banka@nokia.com> Issue-ID: SDC-3388
Diffstat (limited to 'sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java')
-rw-r--r--sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java181
1 files changed, 73 insertions, 108 deletions
diff --git a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java
index 136d43e..8e3aa9b 100644
--- a/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java
+++ b/sdc-distribution-client/src/main/java/org/onap/sdc/impl/DistributionClientImpl.java
@@ -55,6 +55,7 @@ import org.onap.sdc.http.TopicRegistrationResponse;
import org.onap.sdc.utils.DistributionActionResultEnum;
import org.onap.sdc.utils.DistributionClientConstants;
import org.onap.sdc.utils.GeneralUtils;
+import org.onap.sdc.utils.NotificationSender;
import org.onap.sdc.utils.Pair;
import org.onap.sdc.utils.Wrapper;
import org.onap.sdc.api.notification.IVfModuleMetadata;
@@ -71,7 +72,6 @@ import com.att.nsa.cambria.client.CambriaClientBuilders.IdentityManagerBuilder;
import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
import com.att.nsa.cambria.client.CambriaConsumer;
import com.att.nsa.cambria.client.CambriaIdentityManager;
-import com.att.nsa.cambria.client.CambriaPublisher.message;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
@@ -80,25 +80,26 @@ import fj.data.Either;
public class DistributionClientImpl implements IDistributionClient {
- public static final int POLLING_TIMEOUT_MULTIPLIER = 1000;
- public static final long SLEEPING_THREAD_TIME = 1000L;
- public static final long PUBLISHER_TIMEOUT = 10L;
- public static final int TERMINATION_TIMEOUT = 60;
- private static Logger log = LoggerFactory.getLogger(DistributionClientImpl.class.getName());
+ private static final int POLLING_TIMEOUT_MULTIPLIER = 1000;
+ private static final int TERMINATION_TIMEOUT = 60;
+ private static final Logger log = LoggerFactory.getLogger(DistributionClientImpl.class);
private SdcConnectorClient asdcConnector;
private ScheduledExecutorService executorPool = null;
protected CambriaIdentityManager cambriaIdentityManager = null;
private List<String> brokerServers;
- protected ApiCredential credential;
+ private ApiCredential credential;
protected Configuration configuration;
private INotificationCallback callback;
private IStatusCallback statusCallback;
private String notificationTopic;
private String statusTopic;
private boolean isConsumerGroupGenerated = false;
+ private NotificationSender notificationSender;
- private boolean isInitialized, isStarted, isTerminated;
+ private boolean isInitialized;
+ private boolean isStarted;
+ private boolean isTerminated;
@Override
public IConfiguration getConfiguration() {
@@ -292,7 +293,10 @@ public class DistributionClientImpl implements IDistributionClient {
}
// 1. get ueb server list from configuration
if (errorWrapper.isEmpty()) {
- initUebServerList(errorWrapper);
+ List<String> servers = initUebServerList(errorWrapper);
+ if (servers != null) {
+ this.brokerServers = servers;
+ }
}
// 2.validate artifact types against asdc server
if (errorWrapper.isEmpty()) {
@@ -301,11 +305,19 @@ public class DistributionClientImpl implements IDistributionClient {
// 3. create keys
if (errorWrapper.isEmpty()) {
this.callback = callback;
- createUebKeys(errorWrapper);
+ ApiCredential apiCredential = createUebKeys(errorWrapper);
+ if (apiCredential != null) {
+ this.credential = apiCredential;
+ }
}
// 4. register for topics
if (errorWrapper.isEmpty()) {
- registerForTopics(errorWrapper);
+ TopicRegistrationResponse topics = registerForTopics(errorWrapper, this.credential);
+ if (topics != null) {
+ this.notificationTopic = topics.getDistrNotificationTopicName();
+ this.statusTopic = topics.getDistrStatusTopicName();
+ this.notificationSender = createNotificationSender();
+ }
}
IDistributionClientResult result;
@@ -323,7 +335,11 @@ public class DistributionClientImpl implements IDistributionClient {
return new SdcConnectorClient(configuration, new HttpAsdcClient(configuration));
}
- private void registerForTopics(Wrapper<IDistributionClientResult> errorWrapper) {
+ private NotificationSender createNotificationSender() {
+ return new NotificationSender(brokerServers);
+ }
+
+ private TopicRegistrationResponse registerForTopics(Wrapper<IDistributionClientResult> errorWrapper, ApiCredential credential) {
Either<TopicRegistrationResponse, DistributionClientResultImpl> registerAsdcTopics = asdcConnector.registerAsdcTopics(credential);
if (registerAsdcTopics.isRight()) {
@@ -334,22 +350,25 @@ public class DistributionClientImpl implements IDistributionClient {
}
errorWrapper.setInnerElement(registerAsdcTopics.right().value());
} else {
- TopicRegistrationResponse topics = registerAsdcTopics.left().value();
- notificationTopic = topics.getDistrNotificationTopicName();
- statusTopic = topics.getDistrStatusTopicName();
+ return registerAsdcTopics.left().value();
}
-
+ return null;
}
- private void createUebKeys(Wrapper<IDistributionClientResult> errorWrapper) {
+ private ApiCredential createUebKeys(Wrapper<IDistributionClientResult> errorWrapper) {
+ ApiCredential apiCredential = null;
+
initCambriaClient(errorWrapper);
if (errorWrapper.isEmpty()) {
log.debug("create keys");
- DistributionClientResultImpl createKeysResponse = createUebKeys();
+ Pair<DistributionClientResultImpl, ApiCredential> uebKeys = createUebKeys();
+ DistributionClientResultImpl createKeysResponse = uebKeys.getFirst();
+ apiCredential = uebKeys.getSecond();
if (createKeysResponse.getDistributionActionResult() != DistributionActionResultEnum.SUCCESS) {
errorWrapper.setInnerElement(createKeysResponse);
}
}
+ return apiCredential;
}
private void validateArtifactTypesWithAsdcServer(IConfiguration conf, Wrapper<IDistributionClientResult> errorWrapper) {
@@ -376,17 +395,18 @@ public class DistributionClientImpl implements IDistributionClient {
}
}
- private void initUebServerList(Wrapper<IDistributionClientResult> errorWrapper) {
+ private List<String> initUebServerList(Wrapper<IDistributionClientResult> errorWrapper) {
+ List<String> brokerServers = null;
log.debug("get ueb cluster server list from component(configuration file)");
Either<List<String>, IDistributionClientResult> serverListResponse = getUEBServerList();
if (serverListResponse.isRight()) {
errorWrapper.setInnerElement(serverListResponse.right().value());
} else {
-
brokerServers = serverListResponse.left().value();
}
+ return brokerServers;
}
private void validateNotInitilized(Wrapper<IDistributionClientResult> errorWrapper) {
@@ -400,58 +420,28 @@ public class DistributionClientImpl implements IDistributionClient {
@Override
public IDistributionClientResult sendDownloadStatus(IDistributionStatusMessage statusMessage) {
log.info("DistributionClient - sendDownloadStatus");
- Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
- validateRunReady(errorWrapper);
- if (!errorWrapper.isEmpty()) {
- return errorWrapper.getInnerElement();
- }
-
- return sendStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage));
+ return sendErrorStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage));
}
- private IDistributionClientResult sendStatus(IDistributionStatusMessageJsonBuilder builder) {
- DistributionClientResultImpl statusResult = new DistributionClientResultImpl(DistributionActionResultEnum.GENERAL_ERROR, "Failed to send status");
- log.info("DistributionClient - sendStatus");
- Either<CambriaBatchingPublisher, IDistributionClientResult> eitherPublisher = getCambriaPublisher();
- if (eitherPublisher.isRight()) {
- return eitherPublisher.right().value();
- }
- CambriaBatchingPublisher pub = eitherPublisher.left().value();
-
- log.debug("after create publisher server list " + brokerServers.toString());
- String jsonRequest = builder.build();
-
- log.debug("try to send status " + jsonRequest);
-
- try {
- pub.send("MyPartitionKey", jsonRequest);
- Thread.sleep(SLEEPING_THREAD_TIME);
- } catch (IOException e) {
- log.debug("DistributionClient - sendDownloadStatus. Failed to send download status");
- } catch (InterruptedException e) {
- log.debug("DistributionClient - sendDownloadStatus. thread was interrupted");
- } finally {
-
- try {
- List<message> stuck = pub.close(PUBLISHER_TIMEOUT, TimeUnit.SECONDS);
-
- if (!stuck.isEmpty()) {
- log.debug("DistributionClient - sendDownloadStatus. " + stuck.size() + " messages unsent");
- } else {
- statusResult = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "messages successfully sent");
- }
- } catch (IOException | InterruptedException e) {
- log.debug("DistributionClient - sendDownloadStatus. failed to send messages and close publisher ");
- }
+ private IDistributionClientResult sendStatus(IDistributionStatusMessageJsonBuilder statusBuilder) {
+ IDistributionClientResult distributionResult;
+ Either<CambriaBatchingPublisher, IDistributionClientResult> cambriaPublisher = getCambriaPublisher(statusTopic, configuration, brokerServers, credential);
+ if (cambriaPublisher.isRight()) {
+ distributionResult = cambriaPublisher.right().value();
+ } else {
+ String statusMessage = statusBuilder.build();
+ distributionResult = notificationSender.send(cambriaPublisher.left().value(), statusMessage);
}
- return statusResult;
+
+ return distributionResult;
}
- private Either<CambriaBatchingPublisher, IDistributionClientResult> getCambriaPublisher() {
+ private Either<CambriaBatchingPublisher, IDistributionClientResult> getCambriaPublisher(String statusTopic, Configuration configuration, List<String> brokerServers, ApiCredential credential) {
CambriaBatchingPublisher cambriaPublisher = null;
try {
- cambriaPublisher = new PublisherBuilder().onTopic(statusTopic).usingHttps(configuration.isUseHttpsWithDmaap()).usingHosts(brokerServers).build();
+ cambriaPublisher = new PublisherBuilder().onTopic(statusTopic).usingHttps(configuration.isUseHttpsWithDmaap())
+ .usingHosts(brokerServers).build();
cambriaPublisher.setApiCredentials(credential.getApiKey(), credential.getApiSecret());
} catch (MalformedURLException | GeneralSecurityException e) {
Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
@@ -464,12 +454,7 @@ public class DistributionClientImpl implements IDistributionClient {
@Override
public IDistributionClientResult sendDeploymentStatus(IDistributionStatusMessage statusMessage) {
log.info("DistributionClient - sendDeploymentStatus");
- Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
- validateRunReady(errorWrapper);
- if (!errorWrapper.isEmpty()) {
- return errorWrapper.getInnerElement();
- }
- return sendStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage));
+ return sendErrorStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage));
}
IDistributionClientResult sendNotificationStatus(long currentTimeMillis, String distributionId, ArtifactInfoImpl artifactInfo, boolean isNotified) {
@@ -479,13 +464,15 @@ public class DistributionClientImpl implements IDistributionClient {
if (!errorWrapper.isEmpty()) {
return errorWrapper.getInnerElement();
}
- return sendStatus(DistributionStatusMessageJsonBuilderFactory.prepareBuilderForNotificationStatus(getConfiguration().getConsumerID(), currentTimeMillis, distributionId, artifactInfo, isNotified));
+ IDistributionStatusMessageJsonBuilder builder = DistributionStatusMessageJsonBuilderFactory.prepareBuilderForNotificationStatus(getConfiguration().getConsumerID(), currentTimeMillis, distributionId, artifactInfo, isNotified);
+ return sendStatus(builder);
}
/* *************************** Private Methods *************************************************** */
- protected DistributionClientResultImpl createUebKeys() {
+ protected Pair<DistributionClientResultImpl, ApiCredential> createUebKeys() {
DistributionClientResultImpl response = new DistributionClientResultImpl(DistributionActionResultEnum.SUCCESS, "keys created successfuly");
+ ApiCredential credential = null;
try {
String description = String.format(DistributionClientConstants.CLIENT_DESCRIPTION, configuration.getConsumerID());
credential = cambriaIdentityManager.createApiKey(DistributionClientConstants.EMAIL, description);
@@ -495,7 +482,7 @@ public class DistributionClientImpl implements IDistributionClient {
response = new DistributionClientResultImpl(DistributionActionResultEnum.UEB_KEYS_CREATION_FAILED, "failed to create keys: " + e.getMessage());
log.error(response.toString());
}
- return response;
+ return new Pair<>(response, credential);
}
private IDistributionClientResult restartConsumer() {
@@ -619,7 +606,7 @@ public class DistributionClientImpl implements IDistributionClient {
private void validateRunReady(Wrapper<IDistributionClientResult> errorWrapper) {
if (errorWrapper.isEmpty()) {
- validateInitilized(errorWrapper);
+ validateInitialized(errorWrapper);
}
if (errorWrapper.isEmpty()) {
validateNotTerminated(errorWrapper);
@@ -627,7 +614,7 @@ public class DistributionClientImpl implements IDistributionClient {
}
- private void validateInitilized(Wrapper<IDistributionClientResult> errorWrapper) {
+ private void validateInitialized(Wrapper<IDistributionClientResult> errorWrapper) {
if (!isInitialized) {
log.debug("client was not initialized");
IDistributionClientResult result = new DistributionClientResultImpl(DistributionActionResultEnum.DISTRIBUTION_CLIENT_NOT_INITIALIZED, "distribution client was not initialized");
@@ -694,37 +681,30 @@ public class DistributionClientImpl implements IDistributionClient {
@Override
public IDistributionClientResult sendDownloadStatus(IDistributionStatusMessage statusMessage, String errorReason) {
log.info("DistributionClient - sendDownloadStatus with errorReason");
- Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
- validateRunReady(errorWrapper);
- if (!errorWrapper.isEmpty()) {
- return errorWrapper.getInnerElement();
- }
-
- return sendStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason));
+ return sendErrorStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason));
}
@Override
public IDistributionClientResult sendDeploymentStatus(IDistributionStatusMessage statusMessage, String errorReason) {
log.info("DistributionClient - sendDeploymentStatus with errorReason");
+ return sendErrorStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason));
+
+ }
+
+ private IDistributionClientResult sendErrorStatus(IDistributionStatusMessageJsonBuilder builder) {
Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
validateRunReady(errorWrapper);
if (!errorWrapper.isEmpty()) {
return errorWrapper.getInnerElement();
}
- return sendStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason));
-
+ return sendStatus(builder);
}
@Override
public IDistributionClientResult sendComponentDoneStatus(IComponentDoneStatusMessage statusMessage) {
log.info("DistributionClient - sendComponentDone status");
- Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
- validateRunReady(errorWrapper);
- if (!errorWrapper.isEmpty()) {
- return errorWrapper.getInnerElement();
- }
- return sendStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage));
+ return sendErrorStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage));
}
@@ -732,12 +712,7 @@ public class DistributionClientImpl implements IDistributionClient {
public IDistributionClientResult sendComponentDoneStatus(IComponentDoneStatusMessage statusMessage,
String errorReason) {
log.info("DistributionClient - sendComponentDone status with errorReason");
- Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
- validateRunReady(errorWrapper);
- if (!errorWrapper.isEmpty()) {
- return errorWrapper.getInnerElement();
- }
- return sendStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason));
+ return sendErrorStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason));
}
@@ -754,12 +729,7 @@ public class DistributionClientImpl implements IDistributionClient {
public IDistributionClientResult sendFinalDistrStatus(IFinalDistrStatusMessage statusMessage) {
log.info("DistributionClient - sendFinalDistributionStatus status");
- Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
- validateRunReady(errorWrapper);
- if (!errorWrapper.isEmpty()) {
- return errorWrapper.getInnerElement();
- }
- return sendStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage));
+ return sendErrorStatus(DistributionStatusMessageJsonBuilderFactory.getSimpleBuilder(statusMessage));
}
@@ -768,12 +738,7 @@ public class DistributionClientImpl implements IDistributionClient {
public IDistributionClientResult sendFinalDistrStatus(IFinalDistrStatusMessage statusMessage,
String errorReason) {
log.info("DistributionClient - sendFinalDistributionStatus status with errorReason");
- Wrapper<IDistributionClientResult> errorWrapper = new Wrapper<>();
- validateRunReady(errorWrapper);
- if (!errorWrapper.isEmpty()) {
- return errorWrapper.getInnerElement();
- }
- return sendStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason));
+ return sendErrorStatus(DistributionStatusMessageJsonBuilderFactory.getErrorReasonBuilder(statusMessage, errorReason));
}