From 30f5f28cdd17eb94ca70db6dbf027cc330bb59e7 Mon Sep 17 00:00:00 2001 From: dglFromAtt Date: Wed, 26 Dec 2018 14:55:03 -0500 Subject: Support kafka attributes Change-Id: I22a3183aeb0ed2d7553c7321e04e1f708813e2ce Signed-off-by: dglFromAtt Issue-ID: DMAAP-920 --- .../java/org/onap/dmaap/dbcapi/model/Topic.java | 53 +++++++++++++++++----- .../onap/dmaap/dbcapi/resources/TopicResource.java | 15 +++++- .../onap/dmaap/dbcapi/service/DmaapService.java | 6 ++- .../onap/dmaap/dbcapi/service/TopicService.java | 3 +- 4 files changed, 63 insertions(+), 14 deletions(-) (limited to 'src/main/java') diff --git a/src/main/java/org/onap/dmaap/dbcapi/model/Topic.java b/src/main/java/org/onap/dmaap/dbcapi/model/Topic.java index 6364382..c2f278d 100644 --- a/src/main/java/org/onap/dmaap/dbcapi/model/Topic.java +++ b/src/main/java/org/onap/dmaap/dbcapi/model/Topic.java @@ -31,6 +31,7 @@ import javax.xml.bind.annotation.XmlRootElement; import org.onap.dmaap.dbcapi.util.DmaapConfig; import org.onap.dmaap.dbcapi.service.DmaapService; +import org.onap.dmaap.dbcapi.service.TopicService; @XmlRootElement @@ -45,14 +46,20 @@ public class Topic extends DmaapObject { private ReplicationType replicationCase; private String globalMrURL; // optional: URL of global MR to replicate to/from private FqtnType fqtnStyle; - private String version; + private String version; + private String partitionCount; + private String replicationCount; + private ArrayList clients; private static Dmaap dmaap = new DmaapService().getDmaap(); - + + private static String defaultPartitionCount; + private static String defaultReplicationCount; + // during unit testing, discovered that presence of dots in some values // creates an unplanned topic namespace as we compose the FQTN. // this may create sensitivity (i.e. 403) for subsequent creation of AAF perms, so best to not allow it @@ -118,11 +125,24 @@ public class Topic extends DmaapObject { //this.dcaeLocationName = dcaeLocationName; this.tnxEnabled = tnxEnabled; this.owner = owner; + this.init(); this.setLastMod(); + logger.debug( "Topic constructor w args " + this.getLastMod() ); + } + + public Topic init() { + DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig(); + + defaultPartitionCount = p.getProperty( "MR.partitionCount", "2"); + defaultReplicationCount = p.getProperty( "MR.replicationCount", "1"); + this.setStatus( DmaapObject_Status.NEW ); this.replicationCase = ReplicationType.Validator("none"); this.fqtnStyle = FqtnType.Validator("none"); - logger.debug( "Topic constructor " + this.getLastMod() ); + this.setPartitionCount( defaultPartitionCount ); + this.setReplicationCount( defaultReplicationCount ); + + return this; } // expects a String in JSON format, with known fields to populate Topic object @@ -144,6 +164,7 @@ public class Topic extends DmaapObject { this.setStatus( (String) jsonObj.get( "status" ) ); this.setReplicationCase( ReplicationType.Validator( (String) jsonObj.get( "replicationCase" ) )); this.setFqtnStyle( FqtnType.Validator( (String) jsonObj.get( "fqtnStyle" ) ) ); + this.setPartitionCount( (String) jsonObj.get("partitionCount")); } public String getFqtn() { @@ -177,6 +198,18 @@ public class Topic extends DmaapObject { public void setOwner(String owner) { this.owner = owner; } + public String getPartitionCount() { + return partitionCount; + } + public void setPartitionCount(String partitions) { + this.partitionCount = partitions; + } + public String getReplicationCount() { + return replicationCount; + } + public void setReplicationCount(String replicationCount) { + this.replicationCount = replicationCount; + } public void setClients(ArrayList clients) { @@ -212,13 +245,6 @@ public class Topic extends DmaapObject { return replicationCase; } - - - /* - public void setReplicationCase(String val) { - this.replicationCase = ReplicationType.Validator(val); - } - */ public void setReplicationCase(ReplicationType t) { this.replicationCase = t; @@ -262,7 +288,12 @@ public class Topic extends DmaapObject { str.append( this.getFqtn() ); str.append( "\", \"topicDescription\": \""); str.append( this.getTopicDescription()); - str.append( "\", \"partitionCount\": \"2\", \"replicationCount\": \"1\" } "); + str.append( "\", \"partitionCount\": \""); + str.append( this.getPartitionCount()); + str.append( "\", \"replicationCount\": \""); + str.append( this.getReplicationCount()); + str.append( "\" } "); + logger.info( str.toString() ); return str.toString(); } diff --git a/src/main/java/org/onap/dmaap/dbcapi/resources/TopicResource.java b/src/main/java/org/onap/dmaap/dbcapi/resources/TopicResource.java index 8ed2558..dbeea63 100644 --- a/src/main/java/org/onap/dmaap/dbcapi/resources/TopicResource.java +++ b/src/main/java/org/onap/dmaap/dbcapi/resources/TopicResource.java @@ -58,11 +58,16 @@ import org.onap.dmaap.dbcapi.util.DmaapConfig; @Authorization public class TopicResource extends BaseLoggingClass { private static FqtnType defaultTopicStyle; + private static String defaultPartitionCount; + private static String defaultReplicationCount; TopicService mr_topicService = new TopicService(); public TopicResource() { DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig(); - defaultTopicStyle = FqtnType.Validator( p.getProperty("MR.topicStyle", "FQTN_LEGACY_FORMAT")); + defaultTopicStyle = FqtnType.Validator( p.getProperty("MR.topicStyle", "FQTN_LEGACY_FORMAT")); + defaultPartitionCount = p.getProperty( "MR.partitionCount", "2"); + defaultReplicationCount = p.getProperty( "MR.replicationCount", "1"); + logger.info( "Setting defaultTopicStyle=" + defaultTopicStyle ); } @@ -119,6 +124,14 @@ public class TopicResource extends BaseLoggingClass { logger.info( "setting defaultTopicStyle=" + defaultTopicStyle + " for topic " + topic.getTopicName() ); topic.setFqtnStyle( defaultTopicStyle ); } + String pc = topic.getPartitionCount(); + if ( pc == null ) { + topic.setPartitionCount(defaultPartitionCount); + } + String rc = topic.getReplicationCount(); + if ( rc == null ) { + topic.setReplicationCount(defaultReplicationCount); + } topic.setLastMod(); Boolean flag = false; if (useExisting != null) { diff --git a/src/main/java/org/onap/dmaap/dbcapi/service/DmaapService.java b/src/main/java/org/onap/dmaap/dbcapi/service/DmaapService.java index 5aae1d4..5c2c8a3 100644 --- a/src/main/java/org/onap/dmaap/dbcapi/service/DmaapService.java +++ b/src/main/java/org/onap/dmaap/dbcapi/service/DmaapService.java @@ -43,6 +43,7 @@ import org.onap.dmaap.dbcapi.logging.DmaapbcLogMessageEnum; import org.onap.dmaap.dbcapi.model.ApiError; import org.onap.dmaap.dbcapi.model.Dmaap; import org.onap.dmaap.dbcapi.model.MR_Client; +import org.onap.dmaap.dbcapi.model.ReplicationType; import org.onap.dmaap.dbcapi.model.Topic; import org.onap.dmaap.dbcapi.model.DmaapObject.DmaapObject_Status; import org.onap.dmaap.dbcapi.util.DmaapConfig; @@ -136,6 +137,7 @@ public class DmaapService extends BaseLoggingClass { if ( ! dmaap.isStatusValid() || ! nd.getDmaapName().equals(dmaap.getDmaapName()) || dmaap.getVersion().equals( "0") ) { nd.setLastMod(); dmaapholder.update(nd); //need to set this so the following perms will pick up any new vals. + dcaeTopicNs = dmaapholder.get().getTopicNsRoot(); ApiPolicy apiPolicy = new ApiPolicy(); if ( apiPolicy.getUseAuthClass()) { ApiPerms p = new ApiPerms(); @@ -279,12 +281,14 @@ public class DmaapService extends BaseLoggingClass { clients.add( nClient ); // initialize Topic - Topic mmaTopic = new Topic(); + Topic mmaTopic = new Topic().init(); mmaTopic.setTopicName(dmaap.getBridgeAdminTopic()); mmaTopic.setClients(clients); mmaTopic.setOwner("BusController"); mmaTopic.setTopicDescription("topic reserved for MirrorMaker Administration"); mmaTopic.setTnxEnabled("false"); + mmaTopic.setPartitionCount("1"); // a single partition should guarantee message order + ApiError err = new ApiError(); TopicService svc = new TopicService(); diff --git a/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java b/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java index 56ed967..49966d7 100644 --- a/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java +++ b/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java @@ -251,10 +251,11 @@ public class TopicService extends BaseLoggingClass { String mmAgentRole = p.getProperty("MM.AgentRole"); String[] Roles = { mmProvRole, mmAgentRole }; String[] actions = { "view", "pub", "sub" }; - Topic bridgeAdminTopic = new Topic(); + Topic bridgeAdminTopic = new Topic().init(); bridgeAdminTopic.setTopicName( dmaapSvc.getBridgeAdminFqtn() ); bridgeAdminTopic.setTopicDescription( "RESERVED topic for MirroMaker Provisioning"); bridgeAdminTopic.setOwner( "DBC" ); + ArrayList clients = new ArrayList(); for( String role: Roles ) { MR_Client client = new MR_Client(); -- cgit 1.2.3-korg