aboutsummaryrefslogtreecommitdiffstats
path: root/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java
diff options
context:
space:
mode:
Diffstat (limited to 'policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java')
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java75
1 files changed, 23 insertions, 52 deletions
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java
index e94bdffa..9b724072 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/InlineBusTopicSink.java
@@ -2,8 +2,10 @@
* ============LICENSE_START=======================================================
* policy-endpoints
* ================================================================================
- * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
* Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd.
+ * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
+ * Modifications Copyright (C) 2023-2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,7 +24,8 @@
package org.onap.policy.common.endpoints.event.comm.bus.internal;
import java.util.UUID;
-
+import lombok.Getter;
+import lombok.Setter;
import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSink;
import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
@@ -30,8 +33,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Transport Agnostic Bus Topic Sink to carry out the core functionality to interact with a sink
- * regardless if it is UEB or DMaaP.
+ * Transport Agnostic Bus Topic Sink to carry out the core functionality to interact with a sink.
*
*/
public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopicSink {
@@ -44,7 +46,9 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi
/**
* The partition key to publish to.
*/
- protected String partitionId;
+ @Getter
+ @Setter
+ protected String partitionKey;
/**
* Message bus publisher.
@@ -60,17 +64,18 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi
* apiSecret api secret
* partitionId partition id
* useHttps does connection use HTTPS?
+ * allowTracing is tracing allowed?
* allowSelfSignedCerts are self-signed certificates allow *
- * @throws IllegalArgumentException in invalid parameters are passed in
+ * @throws IllegalArgumentException if invalid parameters are passed in
*/
- public InlineBusTopicSink(BusTopicParams busTopicParams) {
+ protected InlineBusTopicSink(BusTopicParams busTopicParams) {
super(busTopicParams);
if (busTopicParams.isPartitionIdInvalid()) {
- this.partitionId = UUID.randomUUID().toString();
+ this.partitionKey = UUID.randomUUID().toString();
} else {
- this.partitionId = busTopicParams.getPartitionId();
+ this.partitionKey = busTopicParams.getPartitionId();
}
}
@@ -84,17 +89,14 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi
logger.info("{}: starting", this);
synchronized (this) {
+ if (!this.alive) {
+ if (locked) {
+ throw new IllegalStateException(this + " is locked.");
+ }
- if (this.alive) {
- return true;
- }
-
- if (locked) {
- throw new IllegalStateException(this + " is locked.");
+ this.init();
+ this.alive = true;
}
-
- this.init();
- this.alive = true;
}
return true;
@@ -142,7 +144,7 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi
NetLoggerUtil.log(EventType.OUT, this.getTopicCommInfrastructure(), this.topic, message);
- publisher.send(this.partitionId, message);
+ publisher.send(this.partitionKey, message);
broadcast(message);
} catch (Exception e) {
logger.warn("{}: cannot send because of {}", this, e.getMessage(), e);
@@ -153,44 +155,13 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi
}
@Override
- public void setPartitionKey(String partitionKey) {
- this.partitionId = partitionKey;
- }
-
- @Override
- public String getPartitionKey() {
- return this.partitionId;
- }
-
- @Override
public void shutdown() {
this.stop();
}
@Override
- protected boolean anyNullOrEmpty(String... args) {
- for (String arg : args) {
- if (arg == null || arg.isEmpty()) {
- return true;
- }
- }
-
- return false;
- }
-
- @Override
- protected boolean allNullOrEmpty(String... args) {
- for (String arg : args) {
- if (!(arg == null || arg.isEmpty())) {
- return false;
- }
- }
-
- return true;
- }
-
- @Override
public String toString() {
- return "InlineBusTopicSink [partitionId=" + partitionId + ", alive=" + alive + ", publisher=" + publisher + "]";
+ return "InlineBusTopicSink [partitionId=" + partitionKey + ", alive=" + alive + ", publisher=" + publisher
+ + "]";
}
}