aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordglFromAtt <dgl@research.att.com>2018-05-24 04:29:30 -0400
committerdglFromAtt <dgl@research.att.com>2018-05-24 04:29:43 -0400
commitad29261e05ff057134d48b7d6a99da1cd07849e0 (patch)
tree9d10eb2c81d84382106664bcef7fcd791c56e770
parent9308e9cbaa6c20a0b171b6524a834ef36feaa9f0 (diff)
Changes for configurable kafka ports
Change-Id: I6b646785584aea75809aa2dae4a36e701e313058 Signed-off-by: dglFromAtt <dgl@research.att.com> Issue-ID: DMAAP-506
-rw-r--r--pom.xml2
-rw-r--r--src/main/java/org/onap/dmaap/dbcapi/model/MR_Cluster.java117
-rw-r--r--src/main/java/org/onap/dmaap/dbcapi/model/MirrorMaker.java6
-rw-r--r--src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java5
-rw-r--r--src/test/java/org/onap/dmaap/dbcapi/model/MirrorMakerTest.java4
-rw-r--r--version.properties2
6 files changed, 121 insertions, 15 deletions
diff --git a/pom.xml b/pom.xml
index 34cce77..a22096d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -344,7 +344,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<jettyVersion>9.3.9.v20160517</jettyVersion>
<eelf.version>0.0.1</eelf.version>
- <artifact.version>1.0.7-SNAPSHOT</artifact.version>
+ <artifact.version>1.0.8-SNAPSHOT</artifact.version>
<!-- SONAR -->
<jacoco.version>0.7.7.201606060606</jacoco.version>
<sonar-jacoco-listeners.version>3.2</sonar-jacoco-listeners.version>
diff --git a/src/main/java/org/onap/dmaap/dbcapi/model/MR_Cluster.java b/src/main/java/org/onap/dmaap/dbcapi/model/MR_Cluster.java
index a6827a9..166fc21 100644
--- a/src/main/java/org/onap/dmaap/dbcapi/model/MR_Cluster.java
+++ b/src/main/java/org/onap/dmaap/dbcapi/model/MR_Cluster.java
@@ -22,6 +22,7 @@ package org.onap.dmaap.dbcapi.model;
import javax.xml.bind.annotation.XmlRootElement;
+import org.onap.dmaap.dbcapi.util.DmaapConfig;
import org.onap.dmaap.dbcapi.util.DmaapTimestamp;
@@ -34,32 +35,53 @@ public class MR_Cluster extends DmaapObject {
private DmaapTimestamp lastMod;
private String topicProtocol;
private String topicPort;
+ private String replicationGroup;
+ private String sourceReplicationPort;
+ private String targetReplicationPort;
// TODO: make this a system property
- private static String defaultTopicProtocol = "https";
- private static String defaultTopicPort = "3905";
+ private static String defaultTopicProtocol;
+ private static String defaultTopicPort;
+ private static String defaultReplicationGroup;
+ private static String defaultSourceReplicationPort;
+ private static String defaultTargetReplicationPort;
-
+ private static void setDefaults() {
+ boolean been_here = false;
+ if ( been_here ) {
+ return;
+ }
+ DmaapConfig dc = (DmaapConfig)DmaapConfig.getConfig();
+ defaultTopicProtocol = dc.getProperty("MR.TopicProtocol", "https");
+ defaultTopicPort = dc.getProperty( "MR.TopicPort", "3905");
+ defaultReplicationGroup = dc.getProperty( "MR.ReplicationGroup", "" );
+ defaultSourceReplicationPort = dc.getProperty( "MR.SourceReplicationPort", "2181");
+ defaultTargetReplicationPort = dc.getProperty( "MR.TargetReplicationPort", "9092");
+ been_here = true;
+ }
public MR_Cluster() {
+ setDefaults();
this.topicProtocol = defaultTopicProtocol;
this.topicPort = defaultTopicPort;
+ this.replicationGroup = null;
+ this.sourceReplicationPort = defaultSourceReplicationPort;
+ this.targetReplicationPort = defaultTargetReplicationPort;
this.lastMod = new DmaapTimestamp();
this.lastMod.mark();
debugLogger.debug( "MR_Cluster constructor " + this.lastMod );
}
-
-
// new style constructor
public MR_Cluster( String dLN,
String f,
String prot,
- String port ) {
+ String port) {
+ setDefaults();
this.dcaeLocationName = dLN;
this.fqdn = f;
@@ -73,13 +95,59 @@ public class MR_Cluster extends DmaapObject {
} else {
this.topicPort = port;
}
+
+ this.replicationGroup = defaultReplicationGroup;
+ this.sourceReplicationPort = defaultSourceReplicationPort;
+ this.targetReplicationPort = defaultTargetReplicationPort;
+
+ this.lastMod = new DmaapTimestamp();
+ this.lastMod.mark();
-
+ debugLogger.debug( "MR_Cluster constructor w initialization complete" + this.lastMod.getVal() );
+ }
+
+ public MR_Cluster( String dLN,
+ String f,
+ String prot,
+ String port,
+ String repGroup,
+ String sourceRepPort,
+ String targetRepPort ) {
+ setDefaults();
+ this.dcaeLocationName = dLN;
+ this.fqdn = f;
+
+ if ( prot == null || prot.isEmpty() ) {
+ this.topicProtocol = defaultTopicProtocol;
+ } else {
+ this.topicProtocol = prot;
+ }
+ if ( port == null || port.isEmpty() ) {
+ this.topicPort = defaultTopicPort;
+ } else {
+ this.topicPort = port;
+ }
+ if ( repGroup == null || repGroup.isEmpty() ) {
+ this.replicationGroup = defaultReplicationGroup;
+ } else {
+ this.replicationGroup = repGroup;
+ }
+ if ( sourceRepPort == null || sourceRepPort.isEmpty()) {
+ this.sourceReplicationPort = defaultSourceReplicationPort;
+ } else {
+ this.sourceReplicationPort = sourceRepPort;
+ }
+ if ( targetRepPort == null || targetRepPort.isEmpty()) {
+ this.targetReplicationPort = defaultTargetReplicationPort;
+ } else {
+ this.targetReplicationPort = targetRepPort;
+ }
+
this.lastMod = new DmaapTimestamp();
this.lastMod.mark();
debugLogger.debug( "MR_Cluster constructor w initialization complete" + this.lastMod.getVal() );
-}
+ }
public String getDcaeLocationName() {
return dcaeLocationName;
}
@@ -113,6 +181,39 @@ public class MR_Cluster extends DmaapObject {
this.topicPort = topicPort;
}
+ public String getReplicationGroup() {
+ return replicationGroup;
+ }
+
+ public void setReplicationGroup(String replicationGroup) {
+ this.replicationGroup = replicationGroup;
+ }
+
+
+
+
+ public String getSourceReplicationPort() {
+ return sourceReplicationPort;
+ }
+
+
+
+ public void setSourceReplicationPort(String sourceReplicationPort) {
+ this.sourceReplicationPort = sourceReplicationPort;
+ }
+
+
+
+ public String getTargetReplicationPort() {
+ return targetReplicationPort;
+ }
+
+
+
+ public void setTargetReplicationPort(String targetReplicationPort) {
+ this.targetReplicationPort = targetReplicationPort;
+ }
+
public String genTopicURL(String overideFqdn, String topic) {
diff --git a/src/main/java/org/onap/dmaap/dbcapi/model/MirrorMaker.java b/src/main/java/org/onap/dmaap/dbcapi/model/MirrorMaker.java
index b1a2d3c..1e381b8 100644
--- a/src/main/java/org/onap/dmaap/dbcapi/model/MirrorMaker.java
+++ b/src/main/java/org/onap/dmaap/dbcapi/model/MirrorMaker.java
@@ -137,11 +137,11 @@ public class MirrorMaker extends DmaapObject {
}
}
*/
- public String createMirrorMaker() {
+ public String createMirrorMaker( String consumerPort, String producerPort ) {
StringBuilder str = new StringBuilder( "{ \"messageID\": \"" + MirrorMakerService.genTransactionId() + "\", \"createMirrorMaker\": {" );
str.append( " \"name\": \"" + this.getMmName() + "\", " );
- str.append( " \"consumer\": \"" + this.sourceCluster + ":2181\", " );
- str.append( " \"producer\": \"" + this.targetCluster + ":9092\" ");
+ str.append( " \"consumer\": \"" + this.sourceCluster + ":" + consumerPort + "\", " );
+ str.append( " \"producer\": \"" + this.targetCluster + ":" + producerPort + "\" ");
str.append( " } }" );
diff --git a/src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java b/src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java
index 29010b6..a73d981 100644
--- a/src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java
+++ b/src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java
@@ -64,6 +64,9 @@ public class MirrorMakerService extends BaseLoggingClass {
DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
String provUser = p.getProperty("MM.ProvUserMechId");
String provUserPwd = decryptor.decrypt(p.getProperty( "MM.ProvUserPwd", "notSet" ));
+ String defaultProducerPort = p.getProperty( "MM.KafkaProducerPort", "9092");
+ String defaultConsumerPort = p.getProperty( "MM.KafkaConsumerPort", "2181");
+
prov = new MrTopicConnection( provUser, provUserPwd );
String centralFqdn = p.getProperty("MR.CentralCname", "notSet");
@@ -76,7 +79,7 @@ public class MirrorMakerService extends BaseLoggingClass {
// but only send 1 message so MM Agents can read it relying on kafka delivery
for( MR_Cluster central: clusters.getCentralClusters() ) {
prov.makeTopicConnection(central, dmaap.getBridgeAdminFqtn(), centralFqdn );
- ApiError resp = prov.doPostMessage(mm.createMirrorMaker());
+ ApiError resp = prov.doPostMessage(mm.createMirrorMaker( defaultConsumerPort, defaultProducerPort ));
if ( ! resp.is2xx() ) {
errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR, "create MM", Integer.toString(resp.getCode()), resp.getMessage());
diff --git a/src/test/java/org/onap/dmaap/dbcapi/model/MirrorMakerTest.java b/src/test/java/org/onap/dmaap/dbcapi/model/MirrorMakerTest.java
index ba4b028..8b632ed 100644
--- a/src/test/java/org/onap/dmaap/dbcapi/model/MirrorMakerTest.java
+++ b/src/test/java/org/onap/dmaap/dbcapi/model/MirrorMakerTest.java
@@ -79,6 +79,8 @@ public class MirrorMakerTest {
String f = "org.onap.interestingTopic";
String c1 = "cluster1.onap.org";
String c2 = "cluster2.onap.org";
+ String p1 = "9092";
+ String p2 = "2081";
MirrorMaker t = new MirrorMaker( c1, c2 );
String m = t.getMmName();
@@ -94,7 +96,7 @@ public class MirrorMakerTest {
s = t.updateWhiteList();
- s = t.createMirrorMaker();
+ s = t.createMirrorMaker(p1, p2);
t.delVector( f, c1, c2 );
diff --git a/version.properties b/version.properties
index 618e64c..f4cdd52 100644
--- a/version.properties
+++ b/version.properties
@@ -27,7 +27,7 @@
major=1
minor=0
-patch=7
+patch=8
base_version=${major}.${minor}.${patch}
# Release must be completed with git revision # in Jenkins