summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--pom.xml2
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java3
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/service/impl/TopicServiceImpl.java15
-rw-r--r--src/test/java/org/onap/dmaap/mr/cambria/service/impl/TopicServiceImplTest.java18
-rw-r--r--version.properties2
5 files changed, 36 insertions, 4 deletions
diff --git a/pom.xml b/pom.xml
index c213101..cb47037 100644
--- a/pom.xml
+++ b/pom.xml
@@ -14,7 +14,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.onap.dmaap.messagerouter.msgrtr</groupId>
<artifactId>msgrtr</artifactId>
- <version>1.1.14-SNAPSHOT</version>
+ <version>1.1.15-SNAPSHOT</version>
<packaging>jar</packaging>
<name>dmaap-messagerouter-msgrtr</name>
<description>Message Router - Restful interface built for kafka</description>
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java
index 2ec323e..93374fb 100644
--- a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java
+++ b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/Kafka011Consumer.java
@@ -34,6 +34,7 @@ import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -85,7 +86,7 @@ public class Kafka011Consumer implements Consumer {
String consumerTimeOut = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
"consumer.timeout");
- if (null != consumerTimeOut) {
+ if (StringUtils.isNotEmpty(consumerTimeOut)) {
consumerPollTimeOut = Integer.parseInt(consumerTimeOut);
}
synchronized (kConsumer) {
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/service/impl/TopicServiceImpl.java b/src/main/java/org/onap/dmaap/dmf/mr/service/impl/TopicServiceImpl.java
index f2ba222..626828b 100644
--- a/src/main/java/org/onap/dmaap/dmf/mr/service/impl/TopicServiceImpl.java
+++ b/src/main/java/org/onap/dmaap/dmf/mr/service/impl/TopicServiceImpl.java
@@ -26,6 +26,7 @@ package org.onap.dmaap.dmf.mr.service.impl;
import java.io.IOException;
+import org.apache.commons.lang.StringUtils;
import org.apache.http.HttpStatus;
import org.json.JSONArray;
import org.json.JSONException;
@@ -281,14 +282,28 @@ public class TopicServiceImpl implements TopicService {
final String desc = topicBean.getTopicDescription();
int partition = topicBean.getPartitionCount();
// int replica = topicBean.getReplicationCount();
+ String defaultPartitions = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+ "default.partitions");
+ String defaultReplicas = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+ "default.replicas");
if (partition == 0) {
+ if(StringUtils.isNotEmpty(defaultPartitions)){
+ partition=Integer.parseInt(defaultPartitions);
+ }
+ else{
partition = 1;
+ }
}
final int partitions = partition;
int replica = topicBean.getReplicationCount();
if (replica == 0) {
+ if(StringUtils.isNotEmpty(defaultReplicas)){
+ replica=Integer.parseInt(defaultReplicas);
+ }
+ else{
replica = 1;
+ }
}
final int replicas = replica;
boolean transactionEnabled = topicBean.isTransactionEnabled();
diff --git a/src/test/java/org/onap/dmaap/mr/cambria/service/impl/TopicServiceImplTest.java b/src/test/java/org/onap/dmaap/mr/cambria/service/impl/TopicServiceImplTest.java
index e5d3233..7cbdf79 100644
--- a/src/test/java/org/onap/dmaap/mr/cambria/service/impl/TopicServiceImplTest.java
+++ b/src/test/java/org/onap/dmaap/mr/cambria/service/impl/TopicServiceImplTest.java
@@ -141,6 +141,10 @@ public class TopicServiceImplTest {
when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "enforced.topic.name.AAF"))
.thenReturn("enfTopicName");
+ when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "default.partitions"))
+ .thenReturn("1");
+ when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "default.replicas"))
+ .thenReturn("1");
when(httpServReq.getHeader("AppName")).thenReturn("MyApp");
when(dmaapContext.getRequest()).thenReturn(httpServReq);
@@ -165,7 +169,11 @@ public class TopicServiceImplTest {
when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "enforced.topic.name.AAF"))
.thenReturn("enfTopicName");
-
+ when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "default.partitions"))
+ .thenReturn("1");
+ when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "default.replicas"))
+ .thenReturn("1");
+
when(dmaapContext.getRequest()).thenReturn(httpServReq);
when(dmaapContext.getResponse()).thenReturn(httpServRes);
@@ -208,6 +216,10 @@ public class TopicServiceImplTest {
when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "enforced.topic.name.AAF"))
.thenReturn("enfTopicName");
+ when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "default.partitions"))
+ .thenReturn("1");
+ when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "default.replicas"))
+ .thenReturn("1");
when(httpServReq.getHeader("Authorization")).thenReturn(null);
when(dmaapContext.getRequest()).thenReturn(httpServReq);
@@ -232,6 +244,10 @@ public class TopicServiceImplTest {
when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "enforced.topic.name.AAF"))
.thenReturn("enfTopicName");
+ when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "default.partitions"))
+ .thenReturn("1");
+ when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "default.replicas"))
+ .thenReturn("1");
when(httpServReq.getHeader("Authorization")).thenReturn("Authorization");
when(dmaapContext.getRequest()).thenReturn(httpServReq);
diff --git a/version.properties b/version.properties
index c9b51c7..aabaad9 100644
--- a/version.properties
+++ b/version.properties
@@ -27,7 +27,7 @@
major=1
minor=1
-patch=14
+patch=15
base_version=${major}.${minor}.${patch}