diff options
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java')
-rw-r--r-- | policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java | 75 |
1 files changed, 23 insertions, 52 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java index e94bdffa..9b724072 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java +++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java @@ -2,8 +2,10 @@ * ============LICENSE_START======================================================= * policy-endpoints * ================================================================================ - * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved. * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd. + * Modifications Copyright (C) 2020 Bell Canada. All rights reserved. + * Modifications Copyright (C) 2023-2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +24,8 @@ package org.onap.policy.common.endpoints.event.comm.bus.internal; import java.util.UUID; - +import lombok.Getter; +import lombok.Setter; import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSink; import org.onap.policy.common.endpoints.utils.NetLoggerUtil; import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType; @@ -30,8 +33,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Transport Agnostic Bus Topic Sink to carry out the core functionality to interact with a sink - * regardless if it is UEB or DMaaP. + * Transport Agnostic Bus Topic Sink to carry out the core functionality to interact with a sink. * */ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopicSink { @@ -44,7 +46,9 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi /** * The partition key to publish to. */ - protected String partitionId; + @Getter + @Setter + protected String partitionKey; /** * Message bus publisher. @@ -60,17 +64,18 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi * apiSecret api secret * partitionId partition id * useHttps does connection use HTTPS? + * allowTracing is tracing allowed? * allowSelfSignedCerts are self-signed certificates allow * - * @throws IllegalArgumentException in invalid parameters are passed in + * @throws IllegalArgumentException if invalid parameters are passed in */ - public InlineBusTopicSink(BusTopicParams busTopicParams) { + protected InlineBusTopicSink(BusTopicParams busTopicParams) { super(busTopicParams); if (busTopicParams.isPartitionIdInvalid()) { - this.partitionId = UUID.randomUUID().toString(); + this.partitionKey = UUID.randomUUID().toString(); } else { - this.partitionId = busTopicParams.getPartitionId(); + this.partitionKey = busTopicParams.getPartitionId(); } } @@ -84,17 +89,14 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi logger.info("{}: starting", this); synchronized (this) { + if (!this.alive) { + if (locked) { + throw new IllegalStateException(this + " is locked."); + } - if (this.alive) { - return true; - } - - if (locked) { - throw new IllegalStateException(this + " is locked."); + this.init(); + this.alive = true; } - - this.init(); - this.alive = true; } return true; @@ -142,7 +144,7 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi NetLoggerUtil.log(EventType.OUT, this.getTopicCommInfrastructure(), this.topic, message); - publisher.send(this.partitionId, message); + publisher.send(this.partitionKey, message); broadcast(message); } catch (Exception e) { logger.warn("{}: cannot send because of {}", this, e.getMessage(), e); @@ -153,44 +155,13 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi } @Override - public void setPartitionKey(String partitionKey) { - this.partitionId = partitionKey; - } - - @Override - public String getPartitionKey() { - return this.partitionId; - } - - @Override public void shutdown() { this.stop(); } @Override - protected boolean anyNullOrEmpty(String... args) { - for (String arg : args) { - if (arg == null || arg.isEmpty()) { - return true; - } - } - - return false; - } - - @Override - protected boolean allNullOrEmpty(String... args) { - for (String arg : args) { - if (!(arg == null || arg.isEmpty())) { - return false; - } - } - - return true; - } - - @Override public String toString() { - return "InlineBusTopicSink [partitionId=" + partitionId + ", alive=" + alive + ", publisher=" + publisher + "]"; + return "InlineBusTopicSink [partitionId=" + partitionKey + ", alive=" + alive + ", publisher=" + publisher + + "]"; } } |