aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java30
-rw-r--r--bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java39
2 files changed, 26 insertions, 43 deletions
diff --git a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java
index 1a253887dd..f215d496cf 100644
--- a/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java
+++ b/bpmn/so-bpmn-infrastructure-common/src/main/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java
@@ -50,7 +50,8 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
private int topicListenerDelayInSeconds;
private volatile ScheduledThreadPoolExecutor executor;
private volatile boolean dmaapThreadListenerIsRunning;
- private String topicName;
+
+
@Autowired
public PnfEventReadyDmaapClient(Environment env) {
@@ -58,31 +59,19 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>();
topicListenerDelayInSeconds = env.getProperty("pnf.dmaap.topicListenerDelayInSeconds", Integer.class);
executor = null;
- topicName = env.getProperty("pnf.dmaap.topicName");
- String[] topic = topicName.split("\\s");
- String pnf_ready = null;
- String pnf_update = null;
- for (String t : topic) {
- if (t.matches("(.*)PNF_READY(.*)")) {
- pnf_ready = t;
- } else if (t.matches("(.*)PNF_UPDATE(.*)")) {
- pnf_update = t;
- } else {
- return;
- }
- }
getRequestForpnfReady = new HttpGet(UriBuilder.fromUri(env.getProperty("pnf.dmaap.uriPathPrefix"))
.scheme(env.getProperty("pnf.dmaap.protocol")).host(env.getProperty("pnf.dmaap.host"))
- .port(env.getProperty("pnf.dmaap.port", Integer.class)).path(pnf_ready)
- .path(env.getProperty("pnf.dmaap.consumerGroup")).path(env.getProperty("pnf.dmaap.consumerId"))
- .build());
+ .port(env.getProperty("pnf.dmaap.port", Integer.class))
+ .path(env.getProperty("pnf.dmaap.pnfReadyTopicName")).path(env.getProperty("pnf.dmaap.consumerGroup"))
+ .path(env.getProperty("pnf.dmaap.consumerId")).build());
getRequestForPnfUpdate = new HttpGet(UriBuilder.fromUri(env.getProperty("pnf.dmaap.uriPathPrefix"))
.scheme(env.getProperty("pnf.dmaap.protocol")).host(env.getProperty("pnf.dmaap.host"))
- .port(env.getProperty("pnf.dmaap.port", Integer.class)).path(pnf_update)
- .path(env.getProperty("pnf.dmaap.consumerGroup")).path(env.getProperty("pnf.dmaap.consumerId"))
- .build());
+ .port(env.getProperty("pnf.dmaap.port", Integer.class))
+ .path(env.getProperty("pnf.dmaap.pnfUpdateTopicName")).path(env.getProperty("pnf.dmaap.consumerGroup"))
+ .path(env.getProperty("pnf.dmaap.consumerIdUpdate")).build());
}
+
@Override
public synchronized void registerForUpdate(String pnfCorrelationId, Runnable informConsumer) {
logger.debug("registering for pnf ready dmaap event for pnf correlation id: {}", pnfCorrelationId);
@@ -162,3 +151,4 @@ public class PnfEventReadyDmaapClient implements DmaapClient {
}
}
}
+
diff --git a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java
index 5d98464be4..bbb6aad49b 100644
--- a/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java
+++ b/bpmn/so-bpmn-infrastructure-common/src/test/java/org/onap/so/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClientTest.java
@@ -67,9 +67,11 @@ public class PnfEventReadyDmaapClientTest {
private static final int PORT = 1234;
private static final String PROTOCOL = "http";
private static final String URI_PATH_PREFIX = "eventsForTesting";
- private static final String TOPIC_NAME = "PNF_READY_Test PNF_UPDATE_Test";
- private static final String CONSUMER_ID = "consumerTestId";
- private static final String CONSUMER_GROUP = "consumerGroupTest";
+ private static final String TOPIC_NAME = "unauthenticated.PNF_READY";
+ private static final String TOPIC_NAME_UPDATE = "unauthenticated.PNF_UPDATE";
+ private static final String CONSUMER_ID = "so-bpmn-infra-pnfready";
+ private static final String CONSUMER_ID_UPDATE = "so-bpmn-infra-pnfupdate";
+ private static final String CONSUMER_GROUP = "so-consumer";
private static final int TOPIC_LISTENER_DELAY_IN_SECONDS = 5;
@Mock
@@ -87,8 +89,10 @@ public class PnfEventReadyDmaapClientTest {
when(env.getProperty(eq("pnf.dmaap.host"))).thenReturn(HOST);
when(env.getProperty(eq("pnf.dmaap.protocol"))).thenReturn(PROTOCOL);
when(env.getProperty(eq("pnf.dmaap.uriPathPrefix"))).thenReturn(URI_PATH_PREFIX);
- when(env.getProperty(eq("pnf.dmaap.topicName"))).thenReturn(TOPIC_NAME);
+ when(env.getProperty(eq("pnf.dmaap.pnfReadyTopicName"))).thenReturn(TOPIC_NAME);
+ when(env.getProperty(eq("pnf.dmaap.pnfUpdateTopicName"))).thenReturn(TOPIC_NAME_UPDATE);
when(env.getProperty(eq("pnf.dmaap.consumerId"))).thenReturn(CONSUMER_ID);
+ when(env.getProperty(eq("pnf.dmaap.consumerIdUpdate"))).thenReturn(CONSUMER_ID_UPDATE);
when(env.getProperty(eq("pnf.dmaap.consumerGroup"))).thenReturn(CONSUMER_GROUP);
when(env.getProperty(eq("pnf.dmaap.topicListenerDelayInSeconds"), eq(Integer.class)))
.thenReturn(TOPIC_LISTENER_DELAY_IN_SECONDS);
@@ -120,15 +124,9 @@ public class PnfEventReadyDmaapClientTest {
assertEquals(captor1.getValue().getURI().getHost(), HOST);
assertEquals(captor1.getValue().getURI().getPort(), PORT);
assertEquals(captor1.getValue().getURI().getScheme(), PROTOCOL);
- String[] topic = TOPIC_NAME.split("\\s");
- String pnf_update = null;
- for (String t : topic) {
- if (t.matches("(.*)PNF_UPDATE(.*)")) {
- pnf_update = t;
- assertEquals(captor1.getValue().getURI().getPath(),
- "/" + URI_PATH_PREFIX + "/" + pnf_update + "/" + CONSUMER_GROUP + "/" + CONSUMER_ID + "");
- }
- }
+ assertEquals(captor1.getValue().getURI().getPath(),
+ "/" + URI_PATH_PREFIX + "/" + TOPIC_NAME_UPDATE + "/" + CONSUMER_GROUP + "/" + CONSUMER_ID_UPDATE + "");
+
verify(threadMockToNotifyCamundaFlow).run();
verify(executorMock).shutdown();
}
@@ -146,15 +144,11 @@ public class PnfEventReadyDmaapClientTest {
assertEquals(captor1.getValue().getURI().getHost(), HOST);
assertEquals(captor1.getValue().getURI().getPort(), PORT);
assertEquals(captor1.getValue().getURI().getScheme(), PROTOCOL);
- String[] topic = TOPIC_NAME.split("\\s");
- String pnf_ready = null;
- for (String t : topic) {
- if (t.matches("(.*)PNF_READY(.*)")) {
- pnf_ready = t;
- assertEquals(captor1.getValue().getURI().getPath(),
- "/" + URI_PATH_PREFIX + "/" + pnf_ready + "/" + CONSUMER_GROUP + "/" + CONSUMER_ID + "");
- }
- }
+
+ assertEquals(captor1.getValue().getURI().getPath(),
+ "/" + URI_PATH_PREFIX + "/" + TOPIC_NAME + "/" + CONSUMER_GROUP + "/" + CONSUMER_ID + "");
+
+
verify(threadMockToNotifyCamundaFlow).run();
verify(executorMock).shutdown();
}
@@ -234,4 +228,3 @@ public class PnfEventReadyDmaapClientTest {
}
}
-