aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordglFromAtt <dgl@research.att.com>2018-11-10 13:29:08 -0500
committerdglFromAtt <dgl@research.att.com>2018-11-10 13:29:13 -0500
commit1360b9df89a422d51ef40644ea5f9cf52cb84c6f (patch)
treee808d83fd9b9eb925367a1cdc05f5c109c9394a1
parent0e39c2d9a88a26693de1cd522766df5894917b3f (diff)
Limit number of topics per mmagent whitelist
Change-Id: Id91106072aa2cc843414db78d568ccd1ecd69657 Signed-off-by: dglFromAtt <dgl@research.att.com> Issue-ID: DMAAP-880
-rw-r--r--src/main/java/org/onap/dmaap/dbcapi/database/DBFieldHandler.java4
-rw-r--r--src/main/java/org/onap/dmaap/dbcapi/model/BrTopic.java9
-rw-r--r--src/main/java/org/onap/dmaap/dbcapi/model/MirrorMaker.java22
-rw-r--r--src/main/java/org/onap/dmaap/dbcapi/resources/BridgeResource.java158
-rw-r--r--src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java60
-rw-r--r--src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java8
-rw-r--r--src/test/java/org/onap/dmaap/dbcapi/model/MirrorMakerTest.java2
7 files changed, 181 insertions, 82 deletions
diff --git a/src/main/java/org/onap/dmaap/dbcapi/database/DBFieldHandler.java b/src/main/java/org/onap/dmaap/dbcapi/database/DBFieldHandler.java
index 59d610c..5ec55d4 100644
--- a/src/main/java/org/onap/dmaap/dbcapi/database/DBFieldHandler.java
+++ b/src/main/java/org/onap/dmaap/dbcapi/database/DBFieldHandler.java
@@ -24,6 +24,8 @@ import java.lang.reflect.*;
import java.sql.*;
import java.util.*;
+import org.onap.dmaap.dbcapi.model.*;
+
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
@@ -47,7 +49,7 @@ public class DBFieldHandler {
try {
objget = c.getMethod("is" + camelcase);
} catch (Exception e) {
- errorLogger.error("Error", e);
+ errorLogger.warn("No 'is' method for " + c.getName() + " so trying 'get' method");
objget = c.getMethod("get" + camelcase);
}
objset = c.getMethod("set" + camelcase, objget.getReturnType());
diff --git a/src/main/java/org/onap/dmaap/dbcapi/model/BrTopic.java b/src/main/java/org/onap/dmaap/dbcapi/model/BrTopic.java
index a7041d0..7de42f1 100644
--- a/src/main/java/org/onap/dmaap/dbcapi/model/BrTopic.java
+++ b/src/main/java/org/onap/dmaap/dbcapi/model/BrTopic.java
@@ -30,6 +30,7 @@ public class BrTopic {
private String brSource;
private String brTarget;
+ private String mmAgentName;
private int topicCount;
// no-op constructor used by framework
@@ -60,6 +61,14 @@ public class BrTopic {
this.topicCount = topicCount;
}
+ public String getMmAgentName() {
+ return mmAgentName;
+ }
+
+ public void setMmAgentName(String mmAgentName) {
+ this.mmAgentName = mmAgentName;
+ }
+
}
diff --git a/src/main/java/org/onap/dmaap/dbcapi/model/MirrorMaker.java b/src/main/java/org/onap/dmaap/dbcapi/model/MirrorMaker.java
index 6447123..e693afe 100644
--- a/src/main/java/org/onap/dmaap/dbcapi/model/MirrorMaker.java
+++ b/src/main/java/org/onap/dmaap/dbcapi/model/MirrorMaker.java
@@ -29,7 +29,7 @@ import org.onap.dmaap.dbcapi.logging.DmaapbcLogMessageEnum;
import org.onap.dmaap.dbcapi.service.MirrorMakerService;
public class MirrorMaker extends DmaapObject {
- static final Logger logger = Logger.getLogger(MirrorMaker.class);
+
private String sourceCluster;
private String targetCluster;
@@ -41,8 +41,20 @@ public class MirrorMaker extends DmaapObject {
public MirrorMaker(){
}
-
+ public MirrorMaker(String source, String target, int i) {
+ initMM( source, target );
+ // original mm names did not have any index, so leave off index 0 for
+ // backwards compatibility
+ if ( i != 0 ) {
+ String n = this.getMmName() + "_" + i;
+ this.setMmName(n);
+ }
+ }
public MirrorMaker(String source, String target) {
+ initMM( source, target );
+ }
+
+ private void initMM(String source, String target) {
sourceCluster = source;
targetCluster = target;
mmName = genKey(source, target);
@@ -72,7 +84,7 @@ public class MirrorMaker extends DmaapObject {
}
}
*/
- public String updateWhiteList() {
+ public String getWhitelistUpdateJSON() {
StringBuilder str = new StringBuilder( "{ \"messageID\": \"" + MirrorMakerService.genTransactionId() + "\", \"updateWhiteList\": {" );
str.append( " \"name\": \"" + this.getMmName() + "\", \"whitelist\": \"" );
int numTargets = 0;
@@ -109,9 +121,9 @@ public class MirrorMaker extends DmaapObject {
StringBuilder str = new StringBuilder( "{ \"messageID\": \"" + MirrorMakerService.genTransactionId() + "\", \"createMirrorMaker\": {" );
str.append( " \"name\": \"" + this.getMmName() + "\", " );
str.append( " \"consumer\": \"" + this.sourceCluster + ":" + consumerPort + "\", " );
- str.append( " \"producer\": \"" + this.targetCluster + ":" + producerPort + "\" ");
+ str.append( " \"producer\": \"" + this.targetCluster + ":" + producerPort + "\", ");
- str.append( " } }" );
+ str.append( " \"numStreams\": \"10\" } }" );
return str.toString();
}
diff --git a/src/main/java/org/onap/dmaap/dbcapi/resources/BridgeResource.java b/src/main/java/org/onap/dmaap/dbcapi/resources/BridgeResource.java
index aab1cac..f1466ee 100644
--- a/src/main/java/org/onap/dmaap/dbcapi/resources/BridgeResource.java
+++ b/src/main/java/org/onap/dmaap/dbcapi/resources/BridgeResource.java
@@ -56,108 +56,126 @@ public class BridgeResource extends BaseLoggingClass {
@GET
@ApiOperation( value = "return BrTopic details",
- notes = "Returns array of `BrTopic` objects. If source and target query params are specified, only report on that bridge. If detail param is true, list topics names, else just a count is returned",
+ notes = "Returns array of `BrTopic` objects. If source and target query params are specified, only report on that bridge. "
+ + "If detail param is true, list topics names, else just a count is returned.",
response = BrTopic.class)
@ApiResponses( value = {
@ApiResponse( code = 200, message = "Success", response = Dmaap.class),
@ApiResponse( code = 400, message = "Error", response = ApiError.class )
})
- public Response getBridgedTopics(@QueryParam("source") String source,
- @QueryParam("target") String target,
+ public Response getBridgedTopics(@QueryParam("mmagent") String mmagent,
@QueryParam("detail") Boolean detailFlag ){
ApiService check = new ApiService();
+
+ if ( mmagent == null ) {
+ return check.success(getMMcounts(Boolean.TRUE.equals(detailFlag)));
+
+ }
+ logger.info( "getBridgeTopics():" + " mmagent=" + mmagent);
if ( ! Boolean.TRUE.equals(detailFlag)) {
BrTopic brTopic = new BrTopic();
- logger.info( "getBridgeTopics():" + " source=" + source + ", target=" + target);
-
- if (source != null && target != null) { // get topics between 2 bridged locations
- brTopic.setBrSource(source);
- brTopic.setBrTarget(target);
- MirrorMaker mm = mmService.getMirrorMaker(source, target);
- if ( mm != null ) {
- brTopic.setTopicCount( mm.getTopicCount() );
- }
-
- logger.info( "topicCount [2 locations]: " + brTopic.getTopicCount() );
- }
- else if (source == null && target == null ) {
- List<String> mmList = mmService.getAllMirrorMakers();
- brTopic.setBrSource("all");
- brTopic.setBrTarget("all");
- int totCnt = 0;
- for( String key: mmList ) {
- int mCnt = 0;
- MirrorMaker mm = mmService.getMirrorMaker(key);
- if ( mm != null ) {
- mCnt = mm.getTopicCount();
- }
- logger.info( "Count for "+ key + ": " + mCnt);
- totCnt += mCnt;
- }
-
- logger.info( "topicCount [all locations]: " + totCnt );
- brTopic.setTopicCount(totCnt);
-
- }
- else {
-
- logger.error( "source or target is missing");
- check.setCode(Status.BAD_REQUEST.getStatusCode());
- check.setMessage("Either both source and target or neither must be provided");
- return check.error();
- }
- return check.success(brTopic);
- } else {
-
+ // get topics between 2 bridged locations
+
+ MirrorMaker mm = mmService.getMirrorMaker(mmagent);
+ if ( mm == null ) {
+ return check.notFound();
+ }
+
+ brTopic.setTopicCount( mm.getTopicCount() );
+ brTopic.setBrSource( mm.getSourceCluster());
+ brTopic.setBrTarget( mm.getTargetCluster());
+ brTopic.setMmAgentName(mm.getMmName());
- logger.info( "getBridgeTopics() detail:" + " source=" + source + ", target=" + target);
-
- if (source != null && target != null) { // get topics between 2 bridged locations
-
- MirrorMaker mm = mmService.getMirrorMaker(source, target);
- if ( mm == null ) {
- return check.notFound();
- }
-
- return check.success(mm);
- }
+ logger.info( "topicCount [2 locations]: " + brTopic.getTopicCount() );
+
+ return check.success(brTopic);
+ } else {
+ logger.info( "getBridgeTopics() detail:" + " mmagent=" + mmagent);
+ // get topics between 2 bridged locations
+ MirrorMaker mm = mmService.getMirrorMaker(mmagent);
+ if ( mm == null ) {
+ return check.notFound();
+ }
- else {
+ return check.success(mm);
+ }
+ }
- logger.error( "source and target are required when detail=true");
- check.setCode(Status.BAD_REQUEST.getStatusCode());
- check.setMessage("source and target are required when detail=true");
- return check.error();
+ private BrTopic[] getMMcounts( Boolean showDetail ) {
+
+ List<String> mmList = mmService.getAllMirrorMakers();
+ int s = 1;
+ if ( showDetail ) {
+ s = mmList.size() + 1;
+ }
+ BrTopic[] brTopic = new BrTopic[s];
+
+ int totCnt = 0;
+ s = 0;
+ for( String key: mmList ) {
+ int mCnt = 0;
+ MirrorMaker mm = mmService.getMirrorMaker(key);
+ if ( mm != null ) {
+ mCnt = mm.getTopicCount();
+ }
+ logger.info( "Count for "+ key + ": " + mCnt);
+ totCnt += mCnt;
+ if (showDetail) {
+ brTopic[s] = new BrTopic();
+ brTopic[s].setBrSource( mm.getSourceCluster());
+ brTopic[s].setBrTarget(mm.getTargetCluster());
+ brTopic[s].setMmAgentName(mm.getMmName());
+ brTopic[s].setTopicCount(mm.getTopicCount());
+ s++;
}
}
+
+ logger.info( "topicCount [all locations]: " + totCnt );
+ brTopic[s] = new BrTopic();
+ brTopic[s].setBrSource("all");
+ brTopic[s].setBrTarget("all");
+ brTopic[s].setMmAgentName("n/a");
+ brTopic[s].setTopicCount(totCnt);
+ return brTopic;
}
@PUT
@ApiOperation( value = "update MirrorMaker details",
- notes = "replace the topic list for a specific Bridge. Use JSON Body for value to replace whitelist, but if refreshFlag param is true, simply refresh using existing whitelist",
+ notes = "replace the topic list for a specific Bridge. Use JSON Body for value to replace whitelist, "
+ + "but if refreshFlag param is true, simply refresh using existing whitelist."
+ + "If split param is true, spread whitelist over smaller mmagents.",
response = MirrorMaker.class)
@ApiResponses( value = {
@ApiResponse( code = 200, message = "Success", response = Dmaap.class),
@ApiResponse( code = 400, message = "Error", response = ApiError.class )
})
- public Response putBridgedTopics(@QueryParam("source") String source,
- @QueryParam("target") String target,
+ public Response putBridgedTopics(@QueryParam("mmagent") String mmagent,
@QueryParam("refresh") Boolean refreshFlag,
+ @QueryParam("split") Boolean splitFlag,
MirrorMaker newBridge ){
ApiService check = new ApiService();
- logger.info( "putBridgeTopics() detail:" + " source=" + source + ", target=" + target);
+ logger.info( "putBridgeTopics() mmagent:" + mmagent );
- if (source != null && target != null) { // get topics between 2 bridged locations
+ if ( mmagent != null ) { // put topics between 2 bridged locations
- MirrorMaker mm = mmService.getMirrorMaker(source, target);
+ MirrorMaker mm = mmService.getMirrorMaker(mmagent);
if ( mm == null ) {
return check.notFound();
}
- if ( refreshFlag != null && refreshFlag == false ) {
- logger.info( "setting whitelist from message body");
+
+ if ( splitFlag != null && splitFlag == true ) {
+ mm = mmService.splitMM( mm );
+ } else if ( refreshFlag == null || refreshFlag == false ) {
+ logger.info( "setting whitelist from message body containing mmName=" + newBridge.getMmName());
+ if ( ! mmagent.equals(newBridge.getMmName()) ){
+ logger.error( "mmagent query param does not match mmName in body");
+ check.setCode(Status.BAD_REQUEST.getStatusCode());
+ check.setMessage("mmagent query param does not match mmName in body");
+ return check.error();
+ }
mm.setTopics( newBridge.getTopics() );
} else {
logger.info( "refreshing whitelist from memory");
@@ -168,9 +186,9 @@ public class BridgeResource extends BaseLoggingClass {
else {
- logger.error( "source and target are required when detail=true");
+ logger.error( "mmagent is required for PUT");
check.setCode(Status.BAD_REQUEST.getStatusCode());
- check.setMessage("source and target are required when detail=true");
+ check.setMessage("mmagent is required for PUT");
return check.error();
}
diff --git a/src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java b/src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java
index 413590f..8acc4f3 100644
--- a/src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java
+++ b/src/main/java/org/onap/dmaap/dbcapi/service/MirrorMakerService.java
@@ -56,6 +56,7 @@ public class MirrorMakerService extends BaseLoggingClass {
private static String defaultProducerPort;
private static String defaultConsumerPort;
private static String centralFqdn;
+ private int maxTopicsPerMM;
public MirrorMakerService() {
super();
@@ -66,6 +67,7 @@ public class MirrorMakerService extends BaseLoggingClass {
defaultProducerPort = p.getProperty( "MR.SourceReplicationPort", "9092");
defaultConsumerPort = p.getProperty( "MR.TargetReplicationPort", "2181");
centralFqdn = p.getProperty("MR.CentralCname", "notSet");
+ maxTopicsPerMM = Integer.valueOf( p.getProperty( "MaxTopicsPerMM", "5"));
}
// will create a MM on MMagent if needed
@@ -90,7 +92,7 @@ public class MirrorMakerService extends BaseLoggingClass {
mm.setStatus(DmaapObject_Status.INVALID);
} else {
prov.makeTopicConnection(central, dmaap.getBridgeAdminFqtn(), centralFqdn );
- resp = prov.doPostMessage(mm.updateWhiteList());
+ resp = prov.doPostMessage(mm.getWhitelistUpdateJSON());
if ( ! resp.is2xx()) {
errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR,"MR Bridge", Integer.toString(resp.getCode()), resp.getMessage());
mm.setStatus(DmaapObject_Status.INVALID);
@@ -109,6 +111,19 @@ public class MirrorMakerService extends BaseLoggingClass {
mm.setLastMod();
return mirrors.put( mm.getMmName(), mm);
}
+ public MirrorMaker getMirrorMaker( String part1, String part2, int index ) {
+ String targetPart;
+
+ // original mm names did not have any index, so leave off index 0 for
+ // backwards compatibility
+ if ( index == 0 ) {
+ targetPart = part2;
+ } else {
+ targetPart = part2 + "_" + index;
+ }
+ logger.info( "getMirrorMaker using " + part1 + " and " + targetPart );
+ return mirrors.get(MirrorMaker.genKey(part1, targetPart));
+ }
public MirrorMaker getMirrorMaker( String part1, String part2 ) {
logger.info( "getMirrorMaker using " + part1 + " and " + part2 );
return mirrors.get(MirrorMaker.genKey(part1, part2));
@@ -139,5 +154,48 @@ public class MirrorMakerService extends BaseLoggingClass {
return ret;
}
+
+ public MirrorMaker getNextMM( String source, String target ) {
+ int i = 0;
+ MirrorMaker mm = null;
+ while( mm == null ) {
+
+ mm = this.getMirrorMaker( source, target, i);
+ if ( mm == null ) {
+ mm = new MirrorMaker(source, target, i);
+ }
+ if ( mm.getTopicCount() >= maxTopicsPerMM ) {
+ logger.info( "getNextMM: MM " + mm.getMmName() + " has " + mm.getTopicCount() + " topics. Moving to next MM");
+ i++;
+ mm = null;
+ }
+ }
+
+
+ return mm;
+ }
+
+ public MirrorMaker splitMM( MirrorMaker orig ) {
+
+ int index = 1;
+ String source = orig.getSourceCluster();
+ String target = orig.getTargetCluster();
+
+
+ ArrayList<String> whitelist = orig.getTopics();
+ while( whitelist.size() > maxTopicsPerMM ) {
+ MirrorMaker mm = this.getNextMM( source, target );
+ int last = whitelist.size() - 1;
+ String topic = whitelist.get(last);
+ whitelist.remove(last);
+ mm.addTopic(topic);
+ this.updateMirrorMaker(mm);
+ }
+
+ orig.setTopics(whitelist);
+
+ return orig;
+
+ }
}
diff --git a/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java b/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java
index e4bc96c..a633982 100644
--- a/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java
+++ b/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java
@@ -66,10 +66,12 @@ public class TopicService extends BaseLoggingClass {
private static String centralCname;
+
public TopicService(){
DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
defaultGlobalMrHost = p.getProperty("MR.globalHost", "global.host.not.set");
centralCname = p.getProperty("MR.CentralCname");
+
logger.info( "TopicService properties: CentralCname=" + centralCname +
" defaultGlobarlMrHost=" + defaultGlobalMrHost );
@@ -431,10 +433,7 @@ public class TopicService extends BaseLoggingClass {
if ( source != null && target != null ) {
try {
logger.info( "Create a MM from " + source + " to " + target );
- MirrorMaker mm = bridge.getMirrorMaker( source, target);
- if ( mm == null ) {
- mm = new MirrorMaker(source, target);
- }
+ MirrorMaker mm = bridge.getNextMM( source, target);
mm.addTopic(topic.getFqtn());
bridge.updateMirrorMaker(mm);
} catch ( Exception ex ) {
@@ -452,6 +451,7 @@ public class TopicService extends BaseLoggingClass {
}
+
/*
* Prior to 1707, we only supported EDGE_TO_CENTRAL replication.
* This was determined automatically based on presence of edge publishers and central subscribers.
diff --git a/src/test/java/org/onap/dmaap/dbcapi/model/MirrorMakerTest.java b/src/test/java/org/onap/dmaap/dbcapi/model/MirrorMakerTest.java
index 547bfc9..39de2be 100644
--- a/src/test/java/org/onap/dmaap/dbcapi/model/MirrorMakerTest.java
+++ b/src/test/java/org/onap/dmaap/dbcapi/model/MirrorMakerTest.java
@@ -92,7 +92,7 @@ public class MirrorMakerTest {
int i = t.getTopicCount();
- String s = t.updateWhiteList();
+ String s = t.getWhitelistUpdateJSON();
s = t.createMirrorMaker(p1, p2);