summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorsunil unnava <sunil.unnava@att.com>2018-12-11 16:25:26 -0500
committersunil unnava <su622b@att.com>2018-12-11 21:37:38 +0000
commite157241cb7dfe023f57532f829e381644bdf18be (patch)
tree82116874a9ef0beac29bafe92866ffb82f5cbc60 /src
parent83746dbc42bad55e52d4bed2617d0d0ca8634cb5 (diff)
add configurable default partitions and replicas1.1.15
Issue-ID: DMAAP-903 Change-Id: Iabb3da85c3e42ddf68d049e6a8164449f7be8296 Signed-off-by: sunil unnava <sunil.unnava@att.com>
Diffstat (limited to 'src')
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java3
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/service/impl/TopicServiceImpl.java15
-rw-r--r--src/test/java/org/onap/dmaap/mr/cambria/service/impl/TopicServiceImplTest.java18
3 files changed, 34 insertions, 2 deletions
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java
index 2ec323e..93374fb 100644
--- a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java
+++ b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java
@@ -34,6 +34,7 @@ import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -85,7 +86,7 @@ public class Kafka011Consumer implements Consumer {
String consumerTimeOut = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
"consumer.timeout");
- if (null != consumerTimeOut) {
+ if (StringUtils.isNotEmpty(consumerTimeOut)) {
consumerPollTimeOut = Integer.parseInt(consumerTimeOut);
}
synchronized (kConsumer) {
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/service/impl/TopicServiceImpl.java b/src/main/java/org/onap/dmaap/dmf/mr/service/impl/TopicServiceImpl.java
index f2ba222..626828b 100644
--- a/src/main/java/org/onap/dmaap/dmf/mr/service/impl/TopicServiceImpl.java
+++ b/src/main/java/org/onap/dmaap/dmf/mr/service/impl/TopicServiceImpl.java
@@ -26,6 +26,7 @@ package org.onap.dmaap.dmf.mr.service.impl;
import java.io.IOException;
+import org.apache.commons.lang.StringUtils;
import org.apache.http.HttpStatus;
import org.json.JSONArray;
import org.json.JSONException;
@@ -281,14 +282,28 @@ public class TopicServiceImpl implements TopicService {
final String desc = topicBean.getTopicDescription();
int partition = topicBean.getPartitionCount();
// int replica = topicBean.getReplicationCount();
+ String defaultPartitions = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+ "default.partitions");
+ String defaultReplicas = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+ "default.replicas");
if (partition == 0) {
+ if(StringUtils.isNotEmpty(defaultPartitions)){
+ partition=Integer.parseInt(defaultPartitions);
+ }
+ else{
partition = 1;
+ }
}
final int partitions = partition;
int replica = topicBean.getReplicationCount();
if (replica == 0) {
+ if(StringUtils.isNotEmpty(defaultReplicas)){
+ replica=Integer.parseInt(defaultReplicas);
+ }
+ else{
replica = 1;
+ }
}
final int replicas = replica;
boolean transactionEnabled = topicBean.isTransactionEnabled();
diff --git a/src/test/java/org/onap/dmaap/mr/cambria/service/impl/TopicServiceImplTest.java b/src/test/java/org/onap/dmaap/mr/cambria/service/impl/TopicServiceImplTest.java
index e5d3233..7cbdf79 100644
--- a/src/test/java/org/onap/dmaap/mr/cambria/service/impl/TopicServiceImplTest.java
+++ b/src/test/java/org/onap/dmaap/mr/cambria/service/impl/TopicServiceImplTest.java
@@ -141,6 +141,10 @@ public class TopicServiceImplTest {
when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "enforced.topic.name.AAF"))
.thenReturn("enfTopicName");
+ when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "default.partitions"))
+ .thenReturn("1");
+ when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "default.replicas"))
+ .thenReturn("1");
when(httpServReq.getHeader("AppName")).thenReturn("MyApp");
when(dmaapContext.getRequest()).thenReturn(httpServReq);
@@ -165,7 +169,11 @@ public class TopicServiceImplTest {
when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "enforced.topic.name.AAF"))
.thenReturn("enfTopicName");
-
+ when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "default.partitions"))
+ .thenReturn("1");
+ when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "default.replicas"))
+ .thenReturn("1");
+
when(dmaapContext.getRequest()).thenReturn(httpServReq);
when(dmaapContext.getResponse()).thenReturn(httpServRes);
@@ -208,6 +216,10 @@ public class TopicServiceImplTest {
when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "enforced.topic.name.AAF"))
.thenReturn("enfTopicName");
+ when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "default.partitions"))
+ .thenReturn("1");
+ when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "default.replicas"))
+ .thenReturn("1");
when(httpServReq.getHeader("Authorization")).thenReturn(null);
when(dmaapContext.getRequest()).thenReturn(httpServReq);
@@ -232,6 +244,10 @@ public class TopicServiceImplTest {
when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "enforced.topic.name.AAF"))
.thenReturn("enfTopicName");
+ when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "default.partitions"))
+ .thenReturn("1");
+ when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "default.replicas"))
+ .thenReturn("1");
when(httpServReq.getHeader("Authorization")).thenReturn("Authorization");
when(dmaapContext.getRequest()).thenReturn(httpServReq);