diff options
Diffstat (limited to 'src/main/java')
11 files changed, 260 insertions, 318 deletions
diff --git a/src/main/java/org/onap/dmaap/dbcapi/database/DatabaseClass.java b/src/main/java/org/onap/dmaap/dbcapi/database/DatabaseClass.java index 9b7c8ff..f0a5582 100644 --- a/src/main/java/org/onap/dmaap/dbcapi/database/DatabaseClass.java +++ b/src/main/java/org/onap/dmaap/dbcapi/database/DatabaseClass.java @@ -47,40 +47,8 @@ public class DatabaseClass extends BaseLoggingClass { private static long lastTime = 0L; - private static class MirrorVectorHandler implements DBFieldHandler.SqlOp { - public Object get(ResultSet rs, int index) throws Exception { - String val = rs.getString(index); - if (val == null) { - return(null); - } - Set<ReplicationVector> rv = new HashSet<ReplicationVector>(); - for (String s: val.split(",")) { - String[] f = s.split(";"); - if (f.length < 3) { - continue; - } - rv.add(new ReplicationVector(DBFieldHandler.funesc(f[0]), DBFieldHandler.funesc(f[1]), DBFieldHandler.funesc(f[2]))); - } - return(rv); - } - public void set(PreparedStatement ps, int index, Object val) throws Exception { - if (val == null) { - ps.setString(index, null); - return; - } - Set xv = (Set)val; - StringBuffer sb = new StringBuffer(); - String sep = ""; - for (Object o: xv) { - ReplicationVector rv = (ReplicationVector)o; - sb.append(sep).append(DBFieldHandler.fesc(rv.getFqtn())).append(';').append(DBFieldHandler.fesc(rv.getSourceCluster())).append(';').append(DBFieldHandler.fesc(rv.getTargetCluster())); - sep = ","; - } - ps.setString(index, sb.toString()); - } - } - // modified version of MirrorVectorHandler for Topics + private static class MirrorTopicsHandler implements DBFieldHandler.SqlOp { public Object get(ResultSet rs, int index) throws Exception { String val = rs.getString(index); @@ -192,8 +160,7 @@ public class DatabaseClass extends BaseLoggingClass { mr_clusters = new DBMap<MR_Cluster>(MR_Cluster.class, "mr_cluster", "dcae_location_name"); feeds = new DBMap<Feed>(Feed.class, "feed", "feed_id"); TableHandler.setSpecialCase("topic", "replication_case", new TopicReplicationTypeHandler()); - topics = new DBMap<Topic>(Topic.class, "topic", "fqtn"); - //TableHandler.setSpecialCase("mirror_maker", "vectors", new MirrorVectorHandler()); + topics = new DBMap<Topic>(Topic.class, "topic", "fqtn"); TableHandler.setSpecialCase("mirror_maker", "topics", new MirrorTopicsHandler()); mirrors = new DBMap<MirrorMaker>(MirrorMaker.class, "mirror_maker", "mm_name"); } catch (Exception e) { diff --git a/src/main/java/org/onap/dmaap/dbcapi/database/TableHandler.java b/src/main/java/org/onap/dmaap/dbcapi/database/TableHandler.java index 66539ed..c83b6be 100644 --- a/src/main/java/org/onap/dmaap/dbcapi/database/TableHandler.java +++ b/src/main/java/org/onap/dmaap/dbcapi/database/TableHandler.java @@ -20,9 +20,15 @@ package org.onap.dmaap.dbcapi.database; -import java.util.*; import java.lang.reflect.*; -import java.sql.*; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; +import java.util.Vector; class TableHandler<C> { protected ConnectionFactory cf; diff --git a/src/main/java/org/onap/dmaap/dbcapi/model/MirrorMaker.java b/src/main/java/org/onap/dmaap/dbcapi/model/MirrorMaker.java index 9f6f402..6447123 100644 --- a/src/main/java/org/onap/dmaap/dbcapi/model/MirrorMaker.java +++ b/src/main/java/org/onap/dmaap/dbcapi/model/MirrorMaker.java @@ -36,7 +36,6 @@ public class MirrorMaker extends DmaapObject { private String mmName; private ArrayList<String> topics; //re-using this var name for backwards DB compatibility - private Set<ReplicationVector> vectors; public MirrorMaker(){ @@ -47,7 +46,6 @@ public class MirrorMaker extends DmaapObject { sourceCluster = source; targetCluster = target; mmName = genKey(source, target); - vectors = new HashSet<ReplicationVector>(); topics = new ArrayList<String>(); } @@ -61,36 +59,6 @@ public class MirrorMaker extends DmaapObject { } - public void addVector( String fqtn, String source, String target ) { - logger.info( "addVector: fqtn=" + fqtn + " source=" + source + " target=" + target ); - if ( ! sourceCluster.equals( source ) ){ - errorLogger.error( DmaapbcLogMessageEnum.MM_CIRCULAR_REF, source, sourceCluster ); - } - vectors.add(new ReplicationVector( fqtn, source, target )); - } - - public void delVector( String fqtn, String source, String target ) { - vectors.remove(new ReplicationVector( fqtn, source, target)); - } - - - - public String toJSON() { - StringBuilder str = new StringBuilder( "{ \"source\": " + sourceCluster + ",\"topics\": [" ); - int numTargets = 0; - for (ReplicationVector rv: vectors) { - if ( numTargets > 0 ) { - str.append( ","); - } - str.append( " \"target\": " + rv.getTargetCluster() + ", \"topic\": " + rv.getFqtn()); - numTargets++; - } - str.append( "] }" ); - - return str.toString(); - } - - // returns the JSON for MM message containing which Topics to replicate /* * example: @@ -166,18 +134,11 @@ public class MirrorMaker extends DmaapObject { } - public Set<ReplicationVector> getVectors() { - return vectors; - } - - public void setVectors(Set<ReplicationVector> vectors) { - this.vectors = vectors; - } public ArrayList<String> getTopics() { return topics; } - //public void setVectors(Set<ReplicationVector> vectors) { + public void setTopics(ArrayList<String> topics) { this.topics = topics; } diff --git a/src/main/java/org/onap/dmaap/dbcapi/model/ReplicationType.java b/src/main/java/org/onap/dmaap/dbcapi/model/ReplicationType.java index 49b93a6..5d5b6c6 100644 --- a/src/main/java/org/onap/dmaap/dbcapi/model/ReplicationType.java +++ b/src/main/java/org/onap/dmaap/dbcapi/model/ReplicationType.java @@ -34,7 +34,13 @@ public enum ReplicationType { REPLICATION_CENTRAL_TO_EDGE(20), REPLICATION_CENTRAL_TO_GLOBAL(21), REPLICATION_GLOBAL_TO_CENTRAL(30), - REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE(120); + REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE(120), + REPLICATION_EDGE_TO_FQDN(40), + REPLICATION_FQDN_TO_EDGE(41), + REPLICATION_FQDN_TO_GLOBAL(50), + REPLICATION_GLOBAL_TO_FQDN(51), + REPLICATION_EDGE_TO_FQDN_TO_GLOBAL(130), + REPLICATION_GLOBAL_TO_FQDN_TO_EDGE (140); private int value; private static Map map = new HashMap<>(); @@ -69,10 +75,30 @@ public enum ReplicationType { } public boolean involvesGlobal() { - if ( this.compareTo(REPLICATION_CENTRAL_TO_GLOBAL) == 0 || - this.compareTo(REPLICATION_GLOBAL_TO_CENTRAL) == 0 || - this.compareTo(REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL) == 0 || - this.compareTo(REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE) == 0) { + + + if ( ( this.compareTo(REPLICATION_CENTRAL_TO_GLOBAL) == 0 ) || + ( this.compareTo(REPLICATION_GLOBAL_TO_CENTRAL) == 0 ) || + ( this.compareTo(REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL) == 0 ) || + ( this.compareTo(REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE) == 0 ) || + ( this.compareTo(REPLICATION_EDGE_TO_FQDN_TO_GLOBAL) == 0 ) || + ( this.compareTo(REPLICATION_GLOBAL_TO_FQDN_TO_EDGE) == 0 ) || + ( this.compareTo(REPLICATION_FQDN_TO_GLOBAL) == 0 ) || + ( this.compareTo(REPLICATION_GLOBAL_TO_FQDN) == 0 ) ) { + return true; + } + return false; + } + + public boolean involvesFQDN() { + if ( + ( this.compareTo(REPLICATION_EDGE_TO_FQDN) == 0 ) || + ( this.compareTo(REPLICATION_EDGE_TO_FQDN_TO_GLOBAL) == 0 ) || + ( this.compareTo(REPLICATION_GLOBAL_TO_FQDN_TO_EDGE) == 0 ) || + ( this.compareTo(REPLICATION_FQDN_TO_GLOBAL) == 0 ) || + ( this.compareTo(REPLICATION_GLOBAL_TO_FQDN) == 0 ) || + ( this.compareTo(REPLICATION_FQDN_TO_EDGE) == 0 ) + ) { return true; } return false; diff --git a/src/main/java/org/onap/dmaap/dbcapi/model/ReplicationVector.java b/src/main/java/org/onap/dmaap/dbcapi/model/ReplicationVector.java deleted file mode 100644 index add3998..0000000 --- a/src/main/java/org/onap/dmaap/dbcapi/model/ReplicationVector.java +++ /dev/null @@ -1,100 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.dmaap.dbcapi.model; - -public class ReplicationVector { - - public enum ReplicationVector_Status { - EMPTY, - NEW, - STAGED, - VALID, - INVALID, - INVALID_DUP, - DELETED - } - - String fqtn; - String sourceCluster; - String targetCluster; - ReplicationVector_Status status; - - public ReplicationVector(){ - - } - - public ReplicationVector(String fqtn, String sourceCluster, - String targetCluster) { - super(); - this.fqtn = fqtn; - this.sourceCluster = sourceCluster; - this.targetCluster = targetCluster; - } - - public String getFqtn() { - return fqtn; - } - - public void setFqtn(String fqtn) { - this.fqtn = fqtn; - } - - public String getSourceCluster() { - return sourceCluster; - } - - public void setSourceCluster(String sourceCluster) { - this.sourceCluster = sourceCluster; - } - - public String getTargetCluster() { - return targetCluster; - } - - public void setTargetCluster(String targetCluster) { - this.targetCluster = targetCluster; - } - - public int hashCode() { - StringBuilder tmp = new StringBuilder( this.fqtn ); - tmp.append(this.sourceCluster); - tmp.append(this.targetCluster); - - return tmp.toString().hashCode(); - } - private static boolean xeq(String s1, String s2) { - if (s1 == null) { - return(s2 == null); - } else { - return(s1.equals(s2)); - } - } - public boolean equals(Object o) { - if (o == this) { - return(true); - } - if (!(o instanceof ReplicationVector)) { - return(false); - } - ReplicationVector x = (ReplicationVector)o; - return(xeq(fqtn, x.fqtn) && xeq(sourceCluster, x.sourceCluster) && xeq(targetCluster, x.targetCluster)); - } -} diff --git a/src/main/java/org/onap/dmaap/dbcapi/resources/BridgeResource.java b/src/main/java/org/onap/dmaap/dbcapi/resources/BridgeResource.java index 1a6310c..aab1cac 100644 --- a/src/main/java/org/onap/dmaap/dbcapi/resources/BridgeResource.java +++ b/src/main/java/org/onap/dmaap/dbcapi/resources/BridgeResource.java @@ -20,37 +20,31 @@ package org.onap.dmaap.dbcapi.resources; -import io.swagger.annotations.Api; -import io.swagger.annotations.ApiOperation; -import io.swagger.annotations.ApiResponse; -import io.swagger.annotations.ApiResponses; - import java.util.List; import javax.ws.rs.Consumes; import javax.ws.rs.GET; import javax.ws.rs.PUT; -import javax.ws.rs.HeaderParam; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; -import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import javax.ws.rs.core.UriInfo; import javax.ws.rs.core.Response.Status; -import org.onap.dmaap.dbcapi.authentication.AuthenticationErrorException; import org.onap.dmaap.dbcapi.logging.BaseLoggingClass; -import org.onap.dmaap.dbcapi.logging.DmaapbcLogMessageEnum; import org.onap.dmaap.dbcapi.model.ApiError; import org.onap.dmaap.dbcapi.model.BrTopic; -import org.onap.dmaap.dbcapi.model.DcaeLocation; import org.onap.dmaap.dbcapi.model.Dmaap; import org.onap.dmaap.dbcapi.model.MirrorMaker; import org.onap.dmaap.dbcapi.service.ApiService; import org.onap.dmaap.dbcapi.service.MirrorMakerService; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; + @Path("/bridge") @Api( value= "bridge", description = "Endpoint for retreiving MR Bridge metrics" ) @Consumes(MediaType.APPLICATION_JSON) @@ -77,7 +71,7 @@ public class BridgeResource extends BaseLoggingClass { BrTopic brTopic = new BrTopic(); logger.info( "getBridgeTopics():" + " source=" + source + ", target=" + target); - // System.out.println("getBridgedTopics() " + "source=" + source + ", target=" + target ); + if (source != null && target != null) { // get topics between 2 bridged locations brTopic.setBrSource(source); brTopic.setBrTarget(target); diff --git a/src/main/java/org/onap/dmaap/dbcapi/resources/TopicResource.java b/src/main/java/org/onap/dmaap/dbcapi/resources/TopicResource.java index 8221813..95f9d33 100644 --- a/src/main/java/org/onap/dmaap/dbcapi/resources/TopicResource.java +++ b/src/main/java/org/onap/dmaap/dbcapi/resources/TopicResource.java @@ -120,7 +120,7 @@ public class TopicResource extends BaseLoggingClass { topic.setLastMod(); Topic mrc = mr_topicService.addTopic(topic, check.getErr()); - if ( mrc != null && mrc.isStatusValid() ) { + if ( mrc != null && check.getErr().is2xx() ) { return check.success(Status.CREATED.getStatusCode(), mrc); } return check.error(); diff --git a/src/main/java/org/onap/dmaap/dbcapi/service/MR_ClientService.java b/src/main/java/org/onap/dmaap/dbcapi/service/MR_ClientService.java index 40f86b6..5bd62cb 100644 --- a/src/main/java/org/onap/dmaap/dbcapi/service/MR_ClientService.java +++ b/src/main/java/org/onap/dmaap/dbcapi/service/MR_ClientService.java @@ -57,10 +57,12 @@ public class MR_ClientService extends BaseLoggingClass{ private Map<String, Topic> topics = DatabaseClass.getTopics(); private Map<String, DcaeLocation> locations = DatabaseClass.getDcaeLocations(); private DmaapService dmaap = new DmaapService(); + private String centralCname; public MR_ClientService() { DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig(); - + + centralCname = p.getProperty("MR.CentralCname", "MRcname.not.set"); deleteLevel = Integer.valueOf(p.getProperty("MR.ClientDeleteLevel", "0" )); } @@ -132,12 +134,12 @@ public class MR_ClientService extends BaseLoggingClass{ } String centralFqdn = null; DcaeLocation candidate = locations.get(client.getDcaeLocationName()); - if ( candidate != null && candidate.isCentral() ) { - DmaapConfig p = ( DmaapConfig)DmaapConfig.getConfig(); - centralFqdn = p.getProperty("MR.CentralCname"); - } + MR_Cluster cluster = clusters.get( client.getDcaeLocationName()); - if ( cluster != null ) { + if ( cluster != null && candidate != null ) { + if ( candidate.isCentral() && ! topic.getReplicationCase().involvesFQDN() ) { + centralFqdn = centralCname; + } client.setTopicURL(cluster.genTopicURL(centralFqdn, client.getFqtn())); if ( centralFqdn == null ) { client.setStatus( addTopicToCluster( cluster, topic, err)); @@ -148,8 +150,8 @@ public class MR_ClientService extends BaseLoggingClass{ } else { MR_ClusterService clusters = new MR_ClusterService(); - // in 1610, MM should only exist for edge-to-central - // we use a cname for the central target + // MM should only exist for edge-to-central + // we use a cname for the central target (default resiliency with no replicationGroup set) // but still need to provision topics on all central MRs for( MR_Cluster central: clusters.getCentralClusters() ) { client.setStatus( addTopicToCluster( central, topic, err)); @@ -161,7 +163,7 @@ public class MR_ClientService extends BaseLoggingClass{ } } else { - logger.info( "Client references a dcaeLocation that doesn't exist:" + client.getDcaeLocationName()); + logger.warn( "Client references a dcaeLocation that doesn't exist:" + client.getDcaeLocationName()); client.setStatus( DmaapObject_Status.STAGED); //return null; } diff --git a/src/main/java/org/onap/dmaap/dbcapi/service/MR_ClusterService.java b/src/main/java/org/onap/dmaap/dbcapi/service/MR_ClusterService.java index ed57279..e2c661b 100644 --- a/src/main/java/org/onap/dmaap/dbcapi/service/MR_ClusterService.java +++ b/src/main/java/org/onap/dmaap/dbcapi/service/MR_ClusterService.java @@ -21,8 +21,10 @@ package org.onap.dmaap.dbcapi.service; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import javax.ws.rs.core.Response.Status; @@ -32,6 +34,7 @@ import org.onap.dmaap.dbcapi.model.ApiError; import org.onap.dmaap.dbcapi.model.DcaeLocation; import org.onap.dmaap.dbcapi.model.MR_Cluster; import org.onap.dmaap.dbcapi.model.DmaapObject.DmaapObject_Status; +import org.onap.dmaap.dbcapi.service.DcaeLocationService; import org.onap.dmaap.dbcapi.util.DmaapConfig; public class MR_ClusterService extends BaseLoggingClass { @@ -73,6 +76,10 @@ public class MR_ClusterService extends BaseLoggingClass { return null; } + public MR_Cluster getMr_ClusterByLoc( String loc ) { + return mr_clusters.get( loc ); + } + public List<MR_Cluster> getCentralClusters() { DcaeLocationService locations = new DcaeLocationService(); List<MR_Cluster> result = new ArrayList<MR_Cluster>(); @@ -87,6 +94,20 @@ public class MR_ClusterService extends BaseLoggingClass { } return result; } + + // builds the set of unique cluster groups + public Set<String> getGroups() { + Set<String> result = new HashSet<String>(); + for( MR_Cluster c: mr_clusters.values() ) { + try { + result.add(c.getReplicationGroup()); + } catch ( NullPointerException npe ) { + logger.warn( "Failed to add Group for cluster:" + c.getDcaeLocationName() ); + } + } + return result; + } + public MR_Cluster addMr_Cluster( MR_Cluster cluster, ApiError apiError ) { logger.info( "Entry: addMr_Cluster"); diff --git a/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java b/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java index eed5022..3943419 100644 --- a/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java +++ b/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java @@ -25,23 +25,25 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import javax.ws.rs.core.Response.Status; import org.onap.dmaap.dbcapi.aaf.AafService; -import org.onap.dmaap.dbcapi.aaf.DmaapPerm; import org.onap.dmaap.dbcapi.aaf.AafService.ServiceType; +import org.onap.dmaap.dbcapi.aaf.DmaapPerm; import org.onap.dmaap.dbcapi.database.DatabaseClass; import org.onap.dmaap.dbcapi.logging.BaseLoggingClass; import org.onap.dmaap.dbcapi.logging.DmaapbcLogMessageEnum; import org.onap.dmaap.dbcapi.model.ApiError; +import org.onap.dmaap.dbcapi.model.DcaeLocation; import org.onap.dmaap.dbcapi.model.Dmaap; +import org.onap.dmaap.dbcapi.model.DmaapObject.DmaapObject_Status; import org.onap.dmaap.dbcapi.model.MR_Client; import org.onap.dmaap.dbcapi.model.MR_Cluster; import org.onap.dmaap.dbcapi.model.MirrorMaker; import org.onap.dmaap.dbcapi.model.ReplicationType; import org.onap.dmaap.dbcapi.model.Topic; -import org.onap.dmaap.dbcapi.model.DmaapObject.DmaapObject_Status; import org.onap.dmaap.dbcapi.util.DmaapConfig; import org.onap.dmaap.dbcapi.util.Fqdn; import org.onap.dmaap.dbcapi.util.Graph; @@ -54,16 +56,21 @@ public class TopicService extends BaseLoggingClass { private static String defaultGlobalMrHost; private Map<String, Topic> mr_topics = DatabaseClass.getTopics(); - private Map<String, MR_Cluster> clusters = DatabaseClass.getMr_clusters(); private static DmaapService dmaapSvc = new DmaapService(); private static Dmaap dmaap = new DmaapService().getDmaap(); private MR_ClientService clientService = new MR_ClientService(); + private MR_ClusterService clusters = new MR_ClusterService(); + private DcaeLocationService locations = new DcaeLocationService(); private MirrorMakerService bridge = new MirrorMakerService(); + + private static String centralCname; public TopicService(){ DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig(); defaultGlobalMrHost = p.getProperty("MR.globalHost", "global.host.not.set"); + centralCname = p.getProperty("MR.CentralCname"); + logger.info( "TopicService properties: CentralCname=" + centralCname + " defaultGlobarlMrHost=" + defaultGlobalMrHost ); } public Map<String, Topic> getTopics() { @@ -165,9 +172,12 @@ public class TopicService extends BaseLoggingClass { Topic ntopic = checkForBridge( topic, err ); if ( ntopic == null ) { topic.setStatus( DmaapObject_Status.INVALID); - return null; + if ( ! err.is2xx()) { + return null; + } } + mr_topics.put( nFqtn, ntopic ); err.setCode(Status.OK.getStatusCode()); @@ -183,7 +193,9 @@ public class TopicService extends BaseLoggingClass { Topic ntopic = checkForBridge( topic, err ); if ( ntopic == null ) { topic.setStatus( DmaapObject_Status.INVALID); - return null; + if ( ! err.is2xx() ) { + return null; + } } mr_topics.put( ntopic.getFqtn(), ntopic ); err.setCode(Status.OK.getStatusCode()); @@ -252,63 +264,147 @@ public class TopicService extends BaseLoggingClass { return topic; } - boolean anythingWrong = false; - String centralFqdn = new String(); - Graph graph = new Graph( topic.getClients(), true ); + boolean anythingWrong = false; + + Set<String> groups = clusters.getGroups(); + for ( String g : groups ) { + anythingWrong |= buildBridge( topic, err, g ); + } + if ( anythingWrong ) { + topic.setStatus( DmaapObject_Status.INVALID); + if ( ! err.is2xx() ) { + return null; + } + } else { + topic.setStatus( DmaapObject_Status.VALID); + } + return topic; + } + + private boolean buildBridge( Topic topic, ApiError err, String group ) { + + boolean anythingWrong = false; + Graph graph; + if ( group == null || group.isEmpty() ) { + graph = new Graph( topic.getClients(), true ); + } else { + graph = new Graph( topic.getClients(), true, group ); + } + MR_Cluster groupCentralCluster = null; - if ( graph.isHasCentral() ) { - DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig(); - centralFqdn = p.getProperty("MR.CentralCname"); - logger.info( "CentralCname=" + centralFqdn ); + if ( graph.isEmpty() ) { + return false; + } else if ( group == null && topic.getReplicationCase().involvesFQDN() ) { + return false; + } else if ( ! graph.hasCentral() ) { + logger.warn( "Topic " + topic.getFqtn() + " wants to be " + topic.getReplicationCase() + " but has no central clients"); + return true; } else { - logger.warn( "Topic " + topic.getFqtn() + " wants to be " + topic.getReplicationCase() + " but has no cental clients"); + groupCentralCluster = clusters.getMr_ClusterByLoc(graph.getCentralLoc()); } - Collection<String> locations = graph.getKeys(); - for( String loc : locations ) { + Collection<String> clientLocations = graph.getKeys(); + for( String loc : clientLocations ) { logger.info( "loc=" + loc ); - MR_Cluster cluster = clusters.get(loc); + DcaeLocation location = locations.getDcaeLocation(loc); + MR_Cluster cluster = clusters.getMr_ClusterByLoc(loc); logger.info( "cluster=" + cluster ); String source = null; String target = null; + /* - * all replication rules have 1 bridge... + * Provision Edge to Central bridges... */ - switch( topic.getReplicationCase() ) { - case REPLICATION_EDGE_TO_CENTRAL: - case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL: // NOTE: this is for E2C portion only - if ( graph.isHasCentral() && graph.getCentralLoc().equals(cluster.getDcaeLocationName())) { + if ( ! location.isCentral() && ! graph.getCentralLoc().equals(cluster.getDcaeLocationName()) ) { + switch( topic.getReplicationCase() ) { + case REPLICATION_EDGE_TO_CENTRAL: + case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL: // NOTE: this is for E2C portion only + source = cluster.getFqdn(); + target = centralCname; break; - } - source = cluster.getFqdn(); - target = centralFqdn; - break; - case REPLICATION_CENTRAL_TO_EDGE: - if ( graph.isHasCentral() && graph.getCentralLoc().equals(cluster.getDcaeLocationName())) { - continue; - } - source = centralFqdn; - target = cluster.getFqdn(); - break; - case REPLICATION_CENTRAL_TO_GLOBAL: - if ( graph.isHasCentral() && ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) { + case REPLICATION_CENTRAL_TO_EDGE: + case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE: // NOTE: this is for C2E portion only + source = centralCname; + target = cluster.getFqdn(); + break; + case REPLICATION_CENTRAL_TO_GLOBAL: + case REPLICATION_GLOBAL_TO_CENTRAL: + case REPLICATION_FQDN_TO_GLOBAL: + case REPLICATION_GLOBAL_TO_FQDN: + break; + + case REPLICATION_EDGE_TO_FQDN: + case REPLICATION_EDGE_TO_FQDN_TO_GLOBAL: // NOTE: this is for E2C portion only + source = cluster.getFqdn(); + target = groupCentralCluster.getFqdn(); + break; + case REPLICATION_FQDN_TO_EDGE: + case REPLICATION_GLOBAL_TO_FQDN_TO_EDGE: // NOTE: this is for F2E portion only + source = groupCentralCluster.getFqdn(); + target = cluster.getFqdn(); + break; + + default: + logger.error( "Unexpected value for ReplicationType ("+ topic.getReplicationCase() + ") for topic " + topic.getFqtn() ); + anythingWrong = true; + err.setCode(400); + err.setFields("topic=" + topic.genFqtn() + " replicationCase=" + + topic.getReplicationCase() ); + err.setMessage("Unexpected value for ReplicationType"); continue; } - source = centralFqdn; - target = topic.getGlobalMrURL(); - break; - case REPLICATION_GLOBAL_TO_CENTRAL: - case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE: // NOTE: this is for G2C portion only - if ( graph.isHasCentral() && ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) { + + } else if ( location.isCentral() && graph.getCentralLoc().equals(cluster.getDcaeLocationName()) ) { + /* + * Provision Central to Global bridges + */ + switch( topic.getReplicationCase() ) { + + case REPLICATION_CENTRAL_TO_GLOBAL: + case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL: + source = centralCname; + target = topic.getGlobalMrURL(); + break; + case REPLICATION_GLOBAL_TO_CENTRAL: + case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE: // NOTE: this is for G2C portion only + source = topic.getGlobalMrURL(); + target = centralCname; + break; + + case REPLICATION_EDGE_TO_FQDN_TO_GLOBAL: // NOTE: this is for E2F portion only + source = groupCentralCluster.getFqdn(); + target = topic.getGlobalMrURL(); + break; + + case REPLICATION_FQDN_TO_GLOBAL: + source = groupCentralCluster.getFqdn(); + target = topic.getGlobalMrURL(); + break; + + case REPLICATION_GLOBAL_TO_FQDN: + case REPLICATION_GLOBAL_TO_FQDN_TO_EDGE: // NOTE: this is for G2F portion only + source = topic.getGlobalMrURL(); + target = groupCentralCluster.getFqdn(); + break; + + case REPLICATION_FQDN_TO_EDGE: + case REPLICATION_EDGE_TO_FQDN: + case REPLICATION_EDGE_TO_CENTRAL: + case REPLICATION_CENTRAL_TO_EDGE: + break; + default: + logger.error( "Unexpected value for ReplicationType ("+ topic.getReplicationCase() + ") for topic " + topic.getFqtn() ); + anythingWrong = true; + err.setCode(400); + err.setFields("topic=" + topic.genFqtn() + " replicationCase=" + + topic.getReplicationCase() ); + err.setMessage("Unexpected value for ReplicationType"); continue; - } - source = topic.getGlobalMrURL(); - target = centralFqdn; - break; - default: - logger.error( "Unexpected value for ReplicationType ("+ topic.getReplicationCase() + ") for topic " + topic.getFqtn() ); + } + } else { + logger.warn( "dcaeLocation " + loc + " is neither Edge nor Central so no mmagent provisioning was done"); anythingWrong = true; continue; } @@ -328,65 +424,12 @@ public class TopicService extends BaseLoggingClass { anythingWrong = true; break; } - } - - - /* - * some replication rules have a 2nd bridge! - */ - source = target = null; - switch( topic.getReplicationCase() ) { - case REPLICATION_EDGE_TO_CENTRAL: - case REPLICATION_CENTRAL_TO_EDGE: - case REPLICATION_CENTRAL_TO_GLOBAL: - case REPLICATION_GLOBAL_TO_CENTRAL: - continue; - case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL: // NOTE: this is for C2G portion only - if ( graph.isHasCentral() && ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) { - continue; - } - source = centralFqdn; - target = topic.getGlobalMrURL(); - break; - - case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE: // NOTE: this is for C2E portion only - if ( graph.isHasCentral() && graph.getCentralLoc().equals(cluster.getDcaeLocationName())) { - continue; - } - source = centralFqdn; - target = cluster.getFqdn(); - break; - default: - logger.error( "Unexpected value for ReplicationType ("+ topic.getReplicationCase() + ") for topic " + topic.getFqtn() ); - anythingWrong = true; - break; - } - if ( source != null && target != null ) { - try { - logger.info( "Create a MM from " + source + " to " + target ); - MirrorMaker mm = bridge.getMirrorMaker( source, target); - if ( mm == null ) { - mm = new MirrorMaker(source, target); - } - mm.addTopic(topic.getFqtn()); - bridge.updateMirrorMaker(mm); - } catch ( Exception ex ) { - err.setCode(500); - err.setFields( "mirror_maker.topic"); - err.setMessage("Unexpected condition: " + ex ); - anythingWrong = true; - break; - } - } + } + } - if ( anythingWrong ) { - topic.setStatus( DmaapObject_Status.INVALID); - return null; - } - - topic.setStatus( DmaapObject_Status.VALID); - return topic; + return anythingWrong; + } /* @@ -402,7 +445,7 @@ public class TopicService extends BaseLoggingClass { Graph graph = new Graph( topic.getClients(), false ); String centralFqdn = new String(); - if ( graph.isHasCentral() ) { + if ( graph.hasCentral() ) { DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig(); centralFqdn = p.getProperty("MR.CentralCname"); } @@ -410,12 +453,12 @@ public class TopicService extends BaseLoggingClass { Collection<String> locations = graph.getKeys(); for( String loc : locations ) { logger.info( "loc=" + loc ); - MR_Cluster cluster = clusters.get(loc); + MR_Cluster cluster = clusters.getMr_ClusterByLoc(loc); if ( cluster == null ) { logger.info( "No MR cluster for location " + loc ); continue; } - if ( graph.isHasCentral() && ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) { + if ( graph.hasCentral() && ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) { logger.info( "Detected case for EDGE_TO_CENTRAL from " + cluster.getFqdn() + " to " + centralFqdn ); return ReplicationType.REPLICATION_EDGE_TO_CENTRAL; diff --git a/src/main/java/org/onap/dmaap/dbcapi/util/Graph.java b/src/main/java/org/onap/dmaap/dbcapi/util/Graph.java index f86569d..a7700a1 100644 --- a/src/main/java/org/onap/dmaap/dbcapi/util/Graph.java +++ b/src/main/java/org/onap/dmaap/dbcapi/util/Graph.java @@ -29,6 +29,8 @@ import java.util.Map; import org.onap.dmaap.dbcapi.database.DatabaseClass; import org.onap.dmaap.dbcapi.model.DcaeLocation; import org.onap.dmaap.dbcapi.model.MR_Client; +import org.onap.dmaap.dbcapi.model.MR_Cluster; +import org.onap.dmaap.dbcapi.service.MR_ClusterService; public class Graph { @@ -49,21 +51,38 @@ public class Graph { public Graph( List<MR_Client> clients, boolean strict ) { if ( clients == null ) return; + initGraph( clients, strict, "" ); + return; + + } + public Graph( List<MR_Client> clients, boolean strict, String group ) { + if ( clients == null ) + return; + initGraph( clients, strict, group ); + return; + } + + private void initGraph(List<MR_Client> clients, boolean strict, String group ) { + MR_ClusterService clusters = new MR_ClusterService(); this.graph = new HashMap<String, String>(); this.hasCentral = false; for( MR_Client client: clients ) { if ( ! strict || client.isStatusValid()) { String loc = client.getDcaeLocationName(); - for( String action : client.getAction() ){ - DcaeLocation dcaeLoc = locations.get(loc); + DcaeLocation dcaeLoc = locations.get(loc); + if ( dcaeLoc == null ) continue; + MR_Cluster c = clusters.getMr_ClusterByLoc(loc); + if ( group != null && ! group.isEmpty() && ! group.equals(c.getReplicationGroup())) continue; + + for( String action : client.getAction() ){ if ( ! action.equals("view") && dcaeLoc != null ) { - graph.put(loc, dcaeLoc.getDcaeLayer()); + String layer = dcaeLoc.getDcaeLayer(); + if ( layer != null && layer.contains(centralDcaeLayerName) ) { + this.hasCentral = true; + } + graph.put(loc, layer); } } - String layer = graph.get(loc); - if ( layer != null && layer.contains(centralDcaeLayerName) ) { - this.hasCentral = true; - } } } @@ -88,7 +107,7 @@ public class Graph { public Collection<String> getKeys() { return graph.keySet(); } - public boolean isHasCentral() { + public boolean hasCentral() { return hasCentral; } public void setHasCentral(boolean hasCentral) { @@ -106,6 +125,9 @@ public class Graph { } return null; } + public boolean isEmpty() { + return graph.isEmpty(); + } } |