summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--pom.xml2
-rw-r--r--src/main/java/org/onap/dmaap/dbcapi/database/DatabaseClass.java37
-rw-r--r--src/main/java/org/onap/dmaap/dbcapi/database/TableHandler.java10
-rw-r--r--src/main/java/org/onap/dmaap/dbcapi/model/MirrorMaker.java41
-rw-r--r--src/main/java/org/onap/dmaap/dbcapi/model/ReplicationType.java36
-rw-r--r--src/main/java/org/onap/dmaap/dbcapi/model/ReplicationVector.java100
-rw-r--r--src/main/java/org/onap/dmaap/dbcapi/resources/BridgeResource.java18
-rw-r--r--src/main/java/org/onap/dmaap/dbcapi/resources/TopicResource.java2
-rw-r--r--src/main/java/org/onap/dmaap/dbcapi/service/MR_ClientService.java20
-rw-r--r--src/main/java/org/onap/dmaap/dbcapi/service/MR_ClusterService.java21
-rw-r--r--src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java255
-rw-r--r--src/main/java/org/onap/dmaap/dbcapi/util/Graph.java38
-rw-r--r--src/main/resources/schema_9.sql27
-rw-r--r--src/test/java/org/onap/dmaap/dbcapi/model/MirrorMakerTest.java8
-rw-r--r--src/test/java/org/onap/dmaap/dbcapi/model/ReplicationVectorTest.java92
-rw-r--r--src/test/java/org/onap/dmaap/dbcapi/util/GraphTest.java2
-rw-r--r--version.properties2
17 files changed, 292 insertions, 419 deletions
diff --git a/pom.xml b/pom.xml
index 4855f75..c3f39b2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -344,7 +344,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<jettyVersion>9.3.9.v20160517</jettyVersion>
<eelf.version>0.0.1</eelf.version>
- <artifact.version>1.0.10-SNAPSHOT</artifact.version>
+ <artifact.version>1.0.11-SNAPSHOT</artifact.version>
<!-- SONAR -->
<jacoco.version>0.7.7.201606060606</jacoco.version>
<sonar-jacoco-listeners.version>3.2</sonar-jacoco-listeners.version>
diff --git a/src/main/java/org/onap/dmaap/dbcapi/database/DatabaseClass.java b/src/main/java/org/onap/dmaap/dbcapi/database/DatabaseClass.java
index 9b7c8ff..f0a5582 100644
--- a/src/main/java/org/onap/dmaap/dbcapi/database/DatabaseClass.java
+++ b/src/main/java/org/onap/dmaap/dbcapi/database/DatabaseClass.java
@@ -47,40 +47,8 @@ public class DatabaseClass extends BaseLoggingClass {
private static long lastTime = 0L;
- private static class MirrorVectorHandler implements DBFieldHandler.SqlOp {
- public Object get(ResultSet rs, int index) throws Exception {
- String val = rs.getString(index);
- if (val == null) {
- return(null);
- }
- Set<ReplicationVector> rv = new HashSet<ReplicationVector>();
- for (String s: val.split(",")) {
- String[] f = s.split(";");
- if (f.length < 3) {
- continue;
- }
- rv.add(new ReplicationVector(DBFieldHandler.funesc(f[0]), DBFieldHandler.funesc(f[1]), DBFieldHandler.funesc(f[2])));
- }
- return(rv);
- }
- public void set(PreparedStatement ps, int index, Object val) throws Exception {
- if (val == null) {
- ps.setString(index, null);
- return;
- }
- Set xv = (Set)val;
- StringBuffer sb = new StringBuffer();
- String sep = "";
- for (Object o: xv) {
- ReplicationVector rv = (ReplicationVector)o;
- sb.append(sep).append(DBFieldHandler.fesc(rv.getFqtn())).append(';').append(DBFieldHandler.fesc(rv.getSourceCluster())).append(';').append(DBFieldHandler.fesc(rv.getTargetCluster()));
- sep = ",";
- }
- ps.setString(index, sb.toString());
- }
- }
- // modified version of MirrorVectorHandler for Topics
+
private static class MirrorTopicsHandler implements DBFieldHandler.SqlOp {
public Object get(ResultSet rs, int index) throws Exception {
String val = rs.getString(index);
@@ -192,8 +160,7 @@ public class DatabaseClass extends BaseLoggingClass {
mr_clusters = new DBMap<MR_Cluster>(MR_Cluster.class, "mr_cluster", "dcae_location_name");
feeds = new DBMap<Feed>(Feed.class, "feed", "feed_id");
TableHandler.setSpecialCase("topic", "replication_case", new TopicReplicationTypeHandler());
- topics = new DBMap<Topic>(Topic.class, "topic", "fqtn");
- //TableHandler.setSpecialCase("mirror_maker", "vectors", new MirrorVectorHandler());
+ topics = new DBMap<Topic>(Topic.class, "topic", "fqtn");
TableHandler.setSpecialCase("mirror_maker", "topics", new MirrorTopicsHandler());
mirrors = new DBMap<MirrorMaker>(MirrorMaker.class, "mirror_maker", "mm_name");
} catch (Exception e) {
diff --git a/src/main/java/org/onap/dmaap/dbcapi/database/TableHandler.java b/src/main/java/org/onap/dmaap/dbcapi/database/TableHandler.java
index 66539ed..c83b6be 100644
--- a/src/main/java/org/onap/dmaap/dbcapi/database/TableHandler.java
+++ b/src/main/java/org/onap/dmaap/dbcapi/database/TableHandler.java
@@ -20,9 +20,15 @@
package org.onap.dmaap.dbcapi.database;
-import java.util.*;
import java.lang.reflect.*;
-import java.sql.*;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Vector;
class TableHandler<C> {
protected ConnectionFactory cf;
diff --git a/src/main/java/org/onap/dmaap/dbcapi/model/MirrorMaker.java b/src/main/java/org/onap/dmaap/dbcapi/model/MirrorMaker.java
index 9f6f402..6447123 100644
--- a/src/main/java/org/onap/dmaap/dbcapi/model/MirrorMaker.java
+++ b/src/main/java/org/onap/dmaap/dbcapi/model/MirrorMaker.java
@@ -36,7 +36,6 @@ public class MirrorMaker extends DmaapObject {
private String mmName;
private ArrayList<String> topics; //re-using this var name for backwards DB compatibility
- private Set<ReplicationVector> vectors;
public MirrorMaker(){
@@ -47,7 +46,6 @@ public class MirrorMaker extends DmaapObject {
sourceCluster = source;
targetCluster = target;
mmName = genKey(source, target);
- vectors = new HashSet<ReplicationVector>();
topics = new ArrayList<String>();
}
@@ -61,36 +59,6 @@ public class MirrorMaker extends DmaapObject {
}
- public void addVector( String fqtn, String source, String target ) {
- logger.info( "addVector: fqtn=" + fqtn + " source=" + source + " target=" + target );
- if ( ! sourceCluster.equals( source ) ){
- errorLogger.error( DmaapbcLogMessageEnum.MM_CIRCULAR_REF, source, sourceCluster );
- }
- vectors.add(new ReplicationVector( fqtn, source, target ));
- }
-
- public void delVector( String fqtn, String source, String target ) {
- vectors.remove(new ReplicationVector( fqtn, source, target));
- }
-
-
-
- public String toJSON() {
- StringBuilder str = new StringBuilder( "{ \"source\": " + sourceCluster + ",\"topics\": [" );
- int numTargets = 0;
- for (ReplicationVector rv: vectors) {
- if ( numTargets > 0 ) {
- str.append( ",");
- }
- str.append( " \"target\": " + rv.getTargetCluster() + ", \"topic\": " + rv.getFqtn());
- numTargets++;
- }
- str.append( "] }" );
-
- return str.toString();
- }
-
-
// returns the JSON for MM message containing which Topics to replicate
/*
* example:
@@ -166,18 +134,11 @@ public class MirrorMaker extends DmaapObject {
}
- public Set<ReplicationVector> getVectors() {
- return vectors;
- }
-
- public void setVectors(Set<ReplicationVector> vectors) {
- this.vectors = vectors;
- }
public ArrayList<String> getTopics() {
return topics;
}
- //public void setVectors(Set<ReplicationVector> vectors) {
+
public void setTopics(ArrayList<String> topics) {
this.topics = topics;
}
diff --git a/src/main/java/org/onap/dmaap/dbcapi/model/ReplicationType.java b/src/main/java/org/onap/dmaap/dbcapi/model/ReplicationType.java
index 49b93a6..5d5b6c6 100644
--- a/src/main/java/org/onap/dmaap/dbcapi/model/ReplicationType.java
+++ b/src/main/java/org/onap/dmaap/dbcapi/model/ReplicationType.java
@@ -34,7 +34,13 @@ public enum ReplicationType {
REPLICATION_CENTRAL_TO_EDGE(20),
REPLICATION_CENTRAL_TO_GLOBAL(21),
REPLICATION_GLOBAL_TO_CENTRAL(30),
- REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE(120);
+ REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE(120),
+ REPLICATION_EDGE_TO_FQDN(40),
+ REPLICATION_FQDN_TO_EDGE(41),
+ REPLICATION_FQDN_TO_GLOBAL(50),
+ REPLICATION_GLOBAL_TO_FQDN(51),
+ REPLICATION_EDGE_TO_FQDN_TO_GLOBAL(130),
+ REPLICATION_GLOBAL_TO_FQDN_TO_EDGE (140);
private int value;
private static Map map = new HashMap<>();
@@ -69,10 +75,30 @@ public enum ReplicationType {
}
public boolean involvesGlobal() {
- if ( this.compareTo(REPLICATION_CENTRAL_TO_GLOBAL) == 0 ||
- this.compareTo(REPLICATION_GLOBAL_TO_CENTRAL) == 0 ||
- this.compareTo(REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL) == 0 ||
- this.compareTo(REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE) == 0) {
+
+
+ if ( ( this.compareTo(REPLICATION_CENTRAL_TO_GLOBAL) == 0 ) ||
+ ( this.compareTo(REPLICATION_GLOBAL_TO_CENTRAL) == 0 ) ||
+ ( this.compareTo(REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL) == 0 ) ||
+ ( this.compareTo(REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE) == 0 ) ||
+ ( this.compareTo(REPLICATION_EDGE_TO_FQDN_TO_GLOBAL) == 0 ) ||
+ ( this.compareTo(REPLICATION_GLOBAL_TO_FQDN_TO_EDGE) == 0 ) ||
+ ( this.compareTo(REPLICATION_FQDN_TO_GLOBAL) == 0 ) ||
+ ( this.compareTo(REPLICATION_GLOBAL_TO_FQDN) == 0 ) ) {
+ return true;
+ }
+ return false;
+ }
+
+ public boolean involvesFQDN() {
+ if (
+ ( this.compareTo(REPLICATION_EDGE_TO_FQDN) == 0 ) ||
+ ( this.compareTo(REPLICATION_EDGE_TO_FQDN_TO_GLOBAL) == 0 ) ||
+ ( this.compareTo(REPLICATION_GLOBAL_TO_FQDN_TO_EDGE) == 0 ) ||
+ ( this.compareTo(REPLICATION_FQDN_TO_GLOBAL) == 0 ) ||
+ ( this.compareTo(REPLICATION_GLOBAL_TO_FQDN) == 0 ) ||
+ ( this.compareTo(REPLICATION_FQDN_TO_EDGE) == 0 )
+ ) {
return true;
}
return false;
diff --git a/src/main/java/org/onap/dmaap/dbcapi/model/ReplicationVector.java b/src/main/java/org/onap/dmaap/dbcapi/model/ReplicationVector.java
deleted file mode 100644
index add3998..0000000
--- a/src/main/java/org/onap/dmaap/dbcapi/model/ReplicationVector.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dmaap.dbcapi.model;
-
-public class ReplicationVector {
-
- public enum ReplicationVector_Status {
- EMPTY,
- NEW,
- STAGED,
- VALID,
- INVALID,
- INVALID_DUP,
- DELETED
- }
-
- String fqtn;
- String sourceCluster;
- String targetCluster;
- ReplicationVector_Status status;
-
- public ReplicationVector(){
-
- }
-
- public ReplicationVector(String fqtn, String sourceCluster,
- String targetCluster) {
- super();
- this.fqtn = fqtn;
- this.sourceCluster = sourceCluster;
- this.targetCluster = targetCluster;
- }
-
- public String getFqtn() {
- return fqtn;
- }
-
- public void setFqtn(String fqtn) {
- this.fqtn = fqtn;
- }
-
- public String getSourceCluster() {
- return sourceCluster;
- }
-
- public void setSourceCluster(String sourceCluster) {
- this.sourceCluster = sourceCluster;
- }
-
- public String getTargetCluster() {
- return targetCluster;
- }
-
- public void setTargetCluster(String targetCluster) {
- this.targetCluster = targetCluster;
- }
-
- public int hashCode() {
- StringBuilder tmp = new StringBuilder( this.fqtn );
- tmp.append(this.sourceCluster);
- tmp.append(this.targetCluster);
-
- return tmp.toString().hashCode();
- }
- private static boolean xeq(String s1, String s2) {
- if (s1 == null) {
- return(s2 == null);
- } else {
- return(s1.equals(s2));
- }
- }
- public boolean equals(Object o) {
- if (o == this) {
- return(true);
- }
- if (!(o instanceof ReplicationVector)) {
- return(false);
- }
- ReplicationVector x = (ReplicationVector)o;
- return(xeq(fqtn, x.fqtn) && xeq(sourceCluster, x.sourceCluster) && xeq(targetCluster, x.targetCluster));
- }
-}
diff --git a/src/main/java/org/onap/dmaap/dbcapi/resources/BridgeResource.java b/src/main/java/org/onap/dmaap/dbcapi/resources/BridgeResource.java
index 1a6310c..aab1cac 100644
--- a/src/main/java/org/onap/dmaap/dbcapi/resources/BridgeResource.java
+++ b/src/main/java/org/onap/dmaap/dbcapi/resources/BridgeResource.java
@@ -20,37 +20,31 @@
package org.onap.dmaap.dbcapi.resources;
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
-
import java.util.List;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.PUT;
-import javax.ws.rs.HeaderParam;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
-import javax.ws.rs.core.UriInfo;
import javax.ws.rs.core.Response.Status;
-import org.onap.dmaap.dbcapi.authentication.AuthenticationErrorException;
import org.onap.dmaap.dbcapi.logging.BaseLoggingClass;
-import org.onap.dmaap.dbcapi.logging.DmaapbcLogMessageEnum;
import org.onap.dmaap.dbcapi.model.ApiError;
import org.onap.dmaap.dbcapi.model.BrTopic;
-import org.onap.dmaap.dbcapi.model.DcaeLocation;
import org.onap.dmaap.dbcapi.model.Dmaap;
import org.onap.dmaap.dbcapi.model.MirrorMaker;
import org.onap.dmaap.dbcapi.service.ApiService;
import org.onap.dmaap.dbcapi.service.MirrorMakerService;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+
@Path("/bridge")
@Api( value= "bridge", description = "Endpoint for retreiving MR Bridge metrics" )
@Consumes(MediaType.APPLICATION_JSON)
@@ -77,7 +71,7 @@ public class BridgeResource extends BaseLoggingClass {
BrTopic brTopic = new BrTopic();
logger.info( "getBridgeTopics():" + " source=" + source + ", target=" + target);
- // System.out.println("getBridgedTopics() " + "source=" + source + ", target=" + target );
+
if (source != null && target != null) { // get topics between 2 bridged locations
brTopic.setBrSource(source);
brTopic.setBrTarget(target);
diff --git a/src/main/java/org/onap/dmaap/dbcapi/resources/TopicResource.java b/src/main/java/org/onap/dmaap/dbcapi/resources/TopicResource.java
index 8221813..95f9d33 100644
--- a/src/main/java/org/onap/dmaap/dbcapi/resources/TopicResource.java
+++ b/src/main/java/org/onap/dmaap/dbcapi/resources/TopicResource.java
@@ -120,7 +120,7 @@ public class TopicResource extends BaseLoggingClass {
topic.setLastMod();
Topic mrc = mr_topicService.addTopic(topic, check.getErr());
- if ( mrc != null && mrc.isStatusValid() ) {
+ if ( mrc != null && check.getErr().is2xx() ) {
return check.success(Status.CREATED.getStatusCode(), mrc);
}
return check.error();
diff --git a/src/main/java/org/onap/dmaap/dbcapi/service/MR_ClientService.java b/src/main/java/org/onap/dmaap/dbcapi/service/MR_ClientService.java
index 40f86b6..5bd62cb 100644
--- a/src/main/java/org/onap/dmaap/dbcapi/service/MR_ClientService.java
+++ b/src/main/java/org/onap/dmaap/dbcapi/service/MR_ClientService.java
@@ -57,10 +57,12 @@ public class MR_ClientService extends BaseLoggingClass{
private Map<String, Topic> topics = DatabaseClass.getTopics();
private Map<String, DcaeLocation> locations = DatabaseClass.getDcaeLocations();
private DmaapService dmaap = new DmaapService();
+ private String centralCname;
public MR_ClientService() {
DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
-
+
+ centralCname = p.getProperty("MR.CentralCname", "MRcname.not.set");
deleteLevel = Integer.valueOf(p.getProperty("MR.ClientDeleteLevel", "0" ));
}
@@ -132,12 +134,12 @@ public class MR_ClientService extends BaseLoggingClass{
}
String centralFqdn = null;
DcaeLocation candidate = locations.get(client.getDcaeLocationName());
- if ( candidate != null && candidate.isCentral() ) {
- DmaapConfig p = ( DmaapConfig)DmaapConfig.getConfig();
- centralFqdn = p.getProperty("MR.CentralCname");
- }
+
MR_Cluster cluster = clusters.get( client.getDcaeLocationName());
- if ( cluster != null ) {
+ if ( cluster != null && candidate != null ) {
+ if ( candidate.isCentral() && ! topic.getReplicationCase().involvesFQDN() ) {
+ centralFqdn = centralCname;
+ }
client.setTopicURL(cluster.genTopicURL(centralFqdn, client.getFqtn()));
if ( centralFqdn == null ) {
client.setStatus( addTopicToCluster( cluster, topic, err));
@@ -148,8 +150,8 @@ public class MR_ClientService extends BaseLoggingClass{
} else {
MR_ClusterService clusters = new MR_ClusterService();
- // in 1610, MM should only exist for edge-to-central
- // we use a cname for the central target
+ // MM should only exist for edge-to-central
+ // we use a cname for the central target (default resiliency with no replicationGroup set)
// but still need to provision topics on all central MRs
for( MR_Cluster central: clusters.getCentralClusters() ) {
client.setStatus( addTopicToCluster( central, topic, err));
@@ -161,7 +163,7 @@ public class MR_ClientService extends BaseLoggingClass{
}
} else {
- logger.info( "Client references a dcaeLocation that doesn't exist:" + client.getDcaeLocationName());
+ logger.warn( "Client references a dcaeLocation that doesn't exist:" + client.getDcaeLocationName());
client.setStatus( DmaapObject_Status.STAGED);
//return null;
}
diff --git a/src/main/java/org/onap/dmaap/dbcapi/service/MR_ClusterService.java b/src/main/java/org/onap/dmaap/dbcapi/service/MR_ClusterService.java
index ed57279..e2c661b 100644
--- a/src/main/java/org/onap/dmaap/dbcapi/service/MR_ClusterService.java
+++ b/src/main/java/org/onap/dmaap/dbcapi/service/MR_ClusterService.java
@@ -21,8 +21,10 @@
package org.onap.dmaap.dbcapi.service;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import javax.ws.rs.core.Response.Status;
@@ -32,6 +34,7 @@ import org.onap.dmaap.dbcapi.model.ApiError;
import org.onap.dmaap.dbcapi.model.DcaeLocation;
import org.onap.dmaap.dbcapi.model.MR_Cluster;
import org.onap.dmaap.dbcapi.model.DmaapObject.DmaapObject_Status;
+import org.onap.dmaap.dbcapi.service.DcaeLocationService;
import org.onap.dmaap.dbcapi.util.DmaapConfig;
public class MR_ClusterService extends BaseLoggingClass {
@@ -73,6 +76,10 @@ public class MR_ClusterService extends BaseLoggingClass {
return null;
}
+ public MR_Cluster getMr_ClusterByLoc( String loc ) {
+ return mr_clusters.get( loc );
+ }
+
public List<MR_Cluster> getCentralClusters() {
DcaeLocationService locations = new DcaeLocationService();
List<MR_Cluster> result = new ArrayList<MR_Cluster>();
@@ -87,6 +94,20 @@ public class MR_ClusterService extends BaseLoggingClass {
}
return result;
}
+
+ // builds the set of unique cluster groups
+ public Set<String> getGroups() {
+ Set<String> result = new HashSet<String>();
+ for( MR_Cluster c: mr_clusters.values() ) {
+ try {
+ result.add(c.getReplicationGroup());
+ } catch ( NullPointerException npe ) {
+ logger.warn( "Failed to add Group for cluster:" + c.getDcaeLocationName() );
+ }
+ }
+ return result;
+ }
+
public MR_Cluster addMr_Cluster( MR_Cluster cluster, ApiError apiError ) {
logger.info( "Entry: addMr_Cluster");
diff --git a/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java b/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java
index eed5022..3943419 100644
--- a/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java
+++ b/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java
@@ -25,23 +25,25 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import javax.ws.rs.core.Response.Status;
import org.onap.dmaap.dbcapi.aaf.AafService;
-import org.onap.dmaap.dbcapi.aaf.DmaapPerm;
import org.onap.dmaap.dbcapi.aaf.AafService.ServiceType;
+import org.onap.dmaap.dbcapi.aaf.DmaapPerm;
import org.onap.dmaap.dbcapi.database.DatabaseClass;
import org.onap.dmaap.dbcapi.logging.BaseLoggingClass;
import org.onap.dmaap.dbcapi.logging.DmaapbcLogMessageEnum;
import org.onap.dmaap.dbcapi.model.ApiError;
+import org.onap.dmaap.dbcapi.model.DcaeLocation;
import org.onap.dmaap.dbcapi.model.Dmaap;
+import org.onap.dmaap.dbcapi.model.DmaapObject.DmaapObject_Status;
import org.onap.dmaap.dbcapi.model.MR_Client;
import org.onap.dmaap.dbcapi.model.MR_Cluster;
import org.onap.dmaap.dbcapi.model.MirrorMaker;
import org.onap.dmaap.dbcapi.model.ReplicationType;
import org.onap.dmaap.dbcapi.model.Topic;
-import org.onap.dmaap.dbcapi.model.DmaapObject.DmaapObject_Status;
import org.onap.dmaap.dbcapi.util.DmaapConfig;
import org.onap.dmaap.dbcapi.util.Fqdn;
import org.onap.dmaap.dbcapi.util.Graph;
@@ -54,16 +56,21 @@ public class TopicService extends BaseLoggingClass {
private static String defaultGlobalMrHost;
private Map<String, Topic> mr_topics = DatabaseClass.getTopics();
- private Map<String, MR_Cluster> clusters = DatabaseClass.getMr_clusters();
private static DmaapService dmaapSvc = new DmaapService();
private static Dmaap dmaap = new DmaapService().getDmaap();
private MR_ClientService clientService = new MR_ClientService();
+ private MR_ClusterService clusters = new MR_ClusterService();
+ private DcaeLocationService locations = new DcaeLocationService();
private MirrorMakerService bridge = new MirrorMakerService();
+
+ private static String centralCname;
public TopicService(){
DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
defaultGlobalMrHost = p.getProperty("MR.globalHost", "global.host.not.set");
+ centralCname = p.getProperty("MR.CentralCname");
+ logger.info( "TopicService properties: CentralCname=" + centralCname + " defaultGlobarlMrHost=" + defaultGlobalMrHost );
}
public Map<String, Topic> getTopics() {
@@ -165,9 +172,12 @@ public class TopicService extends BaseLoggingClass {
Topic ntopic = checkForBridge( topic, err );
if ( ntopic == null ) {
topic.setStatus( DmaapObject_Status.INVALID);
- return null;
+ if ( ! err.is2xx()) {
+ return null;
+ }
}
+
mr_topics.put( nFqtn, ntopic );
err.setCode(Status.OK.getStatusCode());
@@ -183,7 +193,9 @@ public class TopicService extends BaseLoggingClass {
Topic ntopic = checkForBridge( topic, err );
if ( ntopic == null ) {
topic.setStatus( DmaapObject_Status.INVALID);
- return null;
+ if ( ! err.is2xx() ) {
+ return null;
+ }
}
mr_topics.put( ntopic.getFqtn(), ntopic );
err.setCode(Status.OK.getStatusCode());
@@ -252,63 +264,147 @@ public class TopicService extends BaseLoggingClass {
return topic;
}
- boolean anythingWrong = false;
- String centralFqdn = new String();
- Graph graph = new Graph( topic.getClients(), true );
+ boolean anythingWrong = false;
+
+ Set<String> groups = clusters.getGroups();
+ for ( String g : groups ) {
+ anythingWrong |= buildBridge( topic, err, g );
+ }
+ if ( anythingWrong ) {
+ topic.setStatus( DmaapObject_Status.INVALID);
+ if ( ! err.is2xx() ) {
+ return null;
+ }
+ } else {
+ topic.setStatus( DmaapObject_Status.VALID);
+ }
+ return topic;
+ }
+
+ private boolean buildBridge( Topic topic, ApiError err, String group ) {
+
+ boolean anythingWrong = false;
+ Graph graph;
+ if ( group == null || group.isEmpty() ) {
+ graph = new Graph( topic.getClients(), true );
+ } else {
+ graph = new Graph( topic.getClients(), true, group );
+ }
+ MR_Cluster groupCentralCluster = null;
- if ( graph.isHasCentral() ) {
- DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
- centralFqdn = p.getProperty("MR.CentralCname");
- logger.info( "CentralCname=" + centralFqdn );
+ if ( graph.isEmpty() ) {
+ return false;
+ } else if ( group == null && topic.getReplicationCase().involvesFQDN() ) {
+ return false;
+ } else if ( ! graph.hasCentral() ) {
+ logger.warn( "Topic " + topic.getFqtn() + " wants to be " + topic.getReplicationCase() + " but has no central clients");
+ return true;
} else {
- logger.warn( "Topic " + topic.getFqtn() + " wants to be " + topic.getReplicationCase() + " but has no cental clients");
+ groupCentralCluster = clusters.getMr_ClusterByLoc(graph.getCentralLoc());
}
- Collection<String> locations = graph.getKeys();
- for( String loc : locations ) {
+ Collection<String> clientLocations = graph.getKeys();
+ for( String loc : clientLocations ) {
logger.info( "loc=" + loc );
- MR_Cluster cluster = clusters.get(loc);
+ DcaeLocation location = locations.getDcaeLocation(loc);
+ MR_Cluster cluster = clusters.getMr_ClusterByLoc(loc);
logger.info( "cluster=" + cluster );
String source = null;
String target = null;
+
/*
- * all replication rules have 1 bridge...
+ * Provision Edge to Central bridges...
*/
- switch( topic.getReplicationCase() ) {
- case REPLICATION_EDGE_TO_CENTRAL:
- case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL: // NOTE: this is for E2C portion only
- if ( graph.isHasCentral() && graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
+ if ( ! location.isCentral() && ! graph.getCentralLoc().equals(cluster.getDcaeLocationName()) ) {
+ switch( topic.getReplicationCase() ) {
+ case REPLICATION_EDGE_TO_CENTRAL:
+ case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL: // NOTE: this is for E2C portion only
+ source = cluster.getFqdn();
+ target = centralCname;
break;
- }
- source = cluster.getFqdn();
- target = centralFqdn;
- break;
- case REPLICATION_CENTRAL_TO_EDGE:
- if ( graph.isHasCentral() && graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
- continue;
- }
- source = centralFqdn;
- target = cluster.getFqdn();
- break;
- case REPLICATION_CENTRAL_TO_GLOBAL:
- if ( graph.isHasCentral() && ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
+ case REPLICATION_CENTRAL_TO_EDGE:
+ case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE: // NOTE: this is for C2E portion only
+ source = centralCname;
+ target = cluster.getFqdn();
+ break;
+ case REPLICATION_CENTRAL_TO_GLOBAL:
+ case REPLICATION_GLOBAL_TO_CENTRAL:
+ case REPLICATION_FQDN_TO_GLOBAL:
+ case REPLICATION_GLOBAL_TO_FQDN:
+ break;
+
+ case REPLICATION_EDGE_TO_FQDN:
+ case REPLICATION_EDGE_TO_FQDN_TO_GLOBAL: // NOTE: this is for E2C portion only
+ source = cluster.getFqdn();
+ target = groupCentralCluster.getFqdn();
+ break;
+ case REPLICATION_FQDN_TO_EDGE:
+ case REPLICATION_GLOBAL_TO_FQDN_TO_EDGE: // NOTE: this is for F2E portion only
+ source = groupCentralCluster.getFqdn();
+ target = cluster.getFqdn();
+ break;
+
+ default:
+ logger.error( "Unexpected value for ReplicationType ("+ topic.getReplicationCase() + ") for topic " + topic.getFqtn() );
+ anythingWrong = true;
+ err.setCode(400);
+ err.setFields("topic=" + topic.genFqtn() + " replicationCase="
+ + topic.getReplicationCase() );
+ err.setMessage("Unexpected value for ReplicationType");
continue;
}
- source = centralFqdn;
- target = topic.getGlobalMrURL();
- break;
- case REPLICATION_GLOBAL_TO_CENTRAL:
- case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE: // NOTE: this is for G2C portion only
- if ( graph.isHasCentral() && ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
+
+ } else if ( location.isCentral() && graph.getCentralLoc().equals(cluster.getDcaeLocationName()) ) {
+ /*
+ * Provision Central to Global bridges
+ */
+ switch( topic.getReplicationCase() ) {
+
+ case REPLICATION_CENTRAL_TO_GLOBAL:
+ case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL:
+ source = centralCname;
+ target = topic.getGlobalMrURL();
+ break;
+ case REPLICATION_GLOBAL_TO_CENTRAL:
+ case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE: // NOTE: this is for G2C portion only
+ source = topic.getGlobalMrURL();
+ target = centralCname;
+ break;
+
+ case REPLICATION_EDGE_TO_FQDN_TO_GLOBAL: // NOTE: this is for E2F portion only
+ source = groupCentralCluster.getFqdn();
+ target = topic.getGlobalMrURL();
+ break;
+
+ case REPLICATION_FQDN_TO_GLOBAL:
+ source = groupCentralCluster.getFqdn();
+ target = topic.getGlobalMrURL();
+ break;
+
+ case REPLICATION_GLOBAL_TO_FQDN:
+ case REPLICATION_GLOBAL_TO_FQDN_TO_EDGE: // NOTE: this is for G2F portion only
+ source = topic.getGlobalMrURL();
+ target = groupCentralCluster.getFqdn();
+ break;
+
+ case REPLICATION_FQDN_TO_EDGE:
+ case REPLICATION_EDGE_TO_FQDN:
+ case REPLICATION_EDGE_TO_CENTRAL:
+ case REPLICATION_CENTRAL_TO_EDGE:
+ break;
+ default:
+ logger.error( "Unexpected value for ReplicationType ("+ topic.getReplicationCase() + ") for topic " + topic.getFqtn() );
+ anythingWrong = true;
+ err.setCode(400);
+ err.setFields("topic=" + topic.genFqtn() + " replicationCase="
+ + topic.getReplicationCase() );
+ err.setMessage("Unexpected value for ReplicationType");
continue;
- }
- source = topic.getGlobalMrURL();
- target = centralFqdn;
- break;
- default:
- logger.error( "Unexpected value for ReplicationType ("+ topic.getReplicationCase() + ") for topic " + topic.getFqtn() );
+ }
+ } else {
+ logger.warn( "dcaeLocation " + loc + " is neither Edge nor Central so no mmagent provisioning was done");
anythingWrong = true;
continue;
}
@@ -328,65 +424,12 @@ public class TopicService extends BaseLoggingClass {
anythingWrong = true;
break;
}
- }
-
-
- /*
- * some replication rules have a 2nd bridge!
- */
- source = target = null;
- switch( topic.getReplicationCase() ) {
- case REPLICATION_EDGE_TO_CENTRAL:
- case REPLICATION_CENTRAL_TO_EDGE:
- case REPLICATION_CENTRAL_TO_GLOBAL:
- case REPLICATION_GLOBAL_TO_CENTRAL:
- continue;
- case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL: // NOTE: this is for C2G portion only
- if ( graph.isHasCentral() && ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
- continue;
- }
- source = centralFqdn;
- target = topic.getGlobalMrURL();
- break;
-
- case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE: // NOTE: this is for C2E portion only
- if ( graph.isHasCentral() && graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
- continue;
- }
- source = centralFqdn;
- target = cluster.getFqdn();
- break;
- default:
- logger.error( "Unexpected value for ReplicationType ("+ topic.getReplicationCase() + ") for topic " + topic.getFqtn() );
- anythingWrong = true;
- break;
- }
- if ( source != null && target != null ) {
- try {
- logger.info( "Create a MM from " + source + " to " + target );
- MirrorMaker mm = bridge.getMirrorMaker( source, target);
- if ( mm == null ) {
- mm = new MirrorMaker(source, target);
- }
- mm.addTopic(topic.getFqtn());
- bridge.updateMirrorMaker(mm);
- } catch ( Exception ex ) {
- err.setCode(500);
- err.setFields( "mirror_maker.topic");
- err.setMessage("Unexpected condition: " + ex );
- anythingWrong = true;
- break;
- }
- }
+ }
+
}
- if ( anythingWrong ) {
- topic.setStatus( DmaapObject_Status.INVALID);
- return null;
- }
-
- topic.setStatus( DmaapObject_Status.VALID);
- return topic;
+ return anythingWrong;
+
}
/*
@@ -402,7 +445,7 @@ public class TopicService extends BaseLoggingClass {
Graph graph = new Graph( topic.getClients(), false );
String centralFqdn = new String();
- if ( graph.isHasCentral() ) {
+ if ( graph.hasCentral() ) {
DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
centralFqdn = p.getProperty("MR.CentralCname");
}
@@ -410,12 +453,12 @@ public class TopicService extends BaseLoggingClass {
Collection<String> locations = graph.getKeys();
for( String loc : locations ) {
logger.info( "loc=" + loc );
- MR_Cluster cluster = clusters.get(loc);
+ MR_Cluster cluster = clusters.getMr_ClusterByLoc(loc);
if ( cluster == null ) {
logger.info( "No MR cluster for location " + loc );
continue;
}
- if ( graph.isHasCentral() && ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
+ if ( graph.hasCentral() && ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
logger.info( "Detected case for EDGE_TO_CENTRAL from " + cluster.getFqdn() + " to " + centralFqdn );
return ReplicationType.REPLICATION_EDGE_TO_CENTRAL;
diff --git a/src/main/java/org/onap/dmaap/dbcapi/util/Graph.java b/src/main/java/org/onap/dmaap/dbcapi/util/Graph.java
index f86569d..a7700a1 100644
--- a/src/main/java/org/onap/dmaap/dbcapi/util/Graph.java
+++ b/src/main/java/org/onap/dmaap/dbcapi/util/Graph.java
@@ -29,6 +29,8 @@ import java.util.Map;
import org.onap.dmaap.dbcapi.database.DatabaseClass;
import org.onap.dmaap.dbcapi.model.DcaeLocation;
import org.onap.dmaap.dbcapi.model.MR_Client;
+import org.onap.dmaap.dbcapi.model.MR_Cluster;
+import org.onap.dmaap.dbcapi.service.MR_ClusterService;
public class Graph {
@@ -49,21 +51,38 @@ public class Graph {
public Graph( List<MR_Client> clients, boolean strict ) {
if ( clients == null )
return;
+ initGraph( clients, strict, "" );
+ return;
+
+ }
+ public Graph( List<MR_Client> clients, boolean strict, String group ) {
+ if ( clients == null )
+ return;
+ initGraph( clients, strict, group );
+ return;
+ }
+
+ private void initGraph(List<MR_Client> clients, boolean strict, String group ) {
+ MR_ClusterService clusters = new MR_ClusterService();
this.graph = new HashMap<String, String>();
this.hasCentral = false;
for( MR_Client client: clients ) {
if ( ! strict || client.isStatusValid()) {
String loc = client.getDcaeLocationName();
- for( String action : client.getAction() ){
- DcaeLocation dcaeLoc = locations.get(loc);
+ DcaeLocation dcaeLoc = locations.get(loc);
+ if ( dcaeLoc == null ) continue;
+ MR_Cluster c = clusters.getMr_ClusterByLoc(loc);
+ if ( group != null && ! group.isEmpty() && ! group.equals(c.getReplicationGroup())) continue;
+
+ for( String action : client.getAction() ){
if ( ! action.equals("view") && dcaeLoc != null ) {
- graph.put(loc, dcaeLoc.getDcaeLayer());
+ String layer = dcaeLoc.getDcaeLayer();
+ if ( layer != null && layer.contains(centralDcaeLayerName) ) {
+ this.hasCentral = true;
+ }
+ graph.put(loc, layer);
}
}
- String layer = graph.get(loc);
- if ( layer != null && layer.contains(centralDcaeLayerName) ) {
- this.hasCentral = true;
- }
}
}
@@ -88,7 +107,7 @@ public class Graph {
public Collection<String> getKeys() {
return graph.keySet();
}
- public boolean isHasCentral() {
+ public boolean hasCentral() {
return hasCentral;
}
public void setHasCentral(boolean hasCentral) {
@@ -106,6 +125,9 @@ public class Graph {
}
return null;
}
+ public boolean isEmpty() {
+ return graph.isEmpty();
+ }
}
diff --git a/src/main/resources/schema_9.sql b/src/main/resources/schema_9.sql
new file mode 100644
index 0000000..ac43d78
--- /dev/null
+++ b/src/main/resources/schema_9.sql
@@ -0,0 +1,27 @@
+---
+-- ============LICENSE_START=======================================================
+-- OpenECOMP - org.onap.dbcapi
+-- ================================================================================
+-- Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+-- ================================================================================
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+-- ============LICENSE_END=========================================================
+---
+
+
+@alter table mr_cluster
+ add column replication_group varchar(100)
+;
+
+
+update dmaapbc_sch_ver set version = 9 where version = 8;
diff --git a/src/test/java/org/onap/dmaap/dbcapi/model/MirrorMakerTest.java b/src/test/java/org/onap/dmaap/dbcapi/model/MirrorMakerTest.java
index 8b632ed..547bfc9 100644
--- a/src/test/java/org/onap/dmaap/dbcapi/model/MirrorMakerTest.java
+++ b/src/test/java/org/onap/dmaap/dbcapi/model/MirrorMakerTest.java
@@ -83,8 +83,8 @@ public class MirrorMakerTest {
String p2 = "2081";
MirrorMaker t = new MirrorMaker( c1, c2 );
String m = t.getMmName();
+
- t.addVector( f, c1, c2 );
ArrayList<String> topics = new ArrayList<String>();
topics.add( f );
t.setTopics( topics );
@@ -92,14 +92,10 @@ public class MirrorMakerTest {
int i = t.getTopicCount();
- String s = t.toJSON();
-
- s = t.updateWhiteList();
+ String s = t.updateWhiteList();
s = t.createMirrorMaker(p1, p2);
- t.delVector( f, c1, c2 );
-
}
}
diff --git a/src/test/java/org/onap/dmaap/dbcapi/model/ReplicationVectorTest.java b/src/test/java/org/onap/dmaap/dbcapi/model/ReplicationVectorTest.java
deleted file mode 100644
index dde3b49..0000000
--- a/src/test/java/org/onap/dmaap/dbcapi/model/ReplicationVectorTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dmaap.dbcapi.model;
-
-import static org.junit.Assert.*;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.onap.dmaap.dbcapi.testframework.ReflectionHarness;
-
-import java.util.ArrayList;
-
-
-public class ReplicationVectorTest {
-
- private static final String fmt = "%24s: %s%n";
-
- ReflectionHarness rh = new ReflectionHarness();
-
-
- @Before
- public void setUp() throws Exception {
- }
-
- @After
- public void tearDown() throws Exception {
- }
-
-
- @Test
- public void test1() {
-
-
- rh.reflect( "org.onap.dmaap.dbcapi.model.ReplicationVector", "get", null );
-
- }
- @Test
- public void test2() {
-
- String v = "Validate";
- rh.reflect( "org.onap.dmaap.dbcapi.model.ReplicationVector", "set", v );
- }
-
- @Test
- public void test3() {
- String f = "org.onap.interestingTopic";
- String c1 = "cluster1.onap.org";
- String c2 = "cluster2.onap.org";
- ReplicationVector t = new ReplicationVector( f, c1, c2 );
-
-
- assertTrue( f.equals( t.getFqtn() ));
- assertTrue( c1.equals( t.getSourceCluster() ));
- assertTrue( c2.equals( t.getTargetCluster() ));
- }
-
-
- @Test
- public void test4() {
- String f = "org.onap.interestingTopic";
- String c1 = "cluster1.onap.org";
- String c2 = "cluster2.onap.org";
- ReplicationVector t = new ReplicationVector( f, c1, c2 );
-
- int i = t.hashCode();
-
- ReplicationVector t2 = new ReplicationVector(f, c1, c2 );
-
- assertTrue( t.equals( t2 ));
- assertTrue( t.equals( t ));
- assertTrue( ! t.equals( f ));
- }
-
-}
diff --git a/src/test/java/org/onap/dmaap/dbcapi/util/GraphTest.java b/src/test/java/org/onap/dmaap/dbcapi/util/GraphTest.java
index 7cedfac..449746a 100644
--- a/src/test/java/org/onap/dmaap/dbcapi/util/GraphTest.java
+++ b/src/test/java/org/onap/dmaap/dbcapi/util/GraphTest.java
@@ -89,7 +89,7 @@ public class GraphTest {
s = g.getCentralLoc();
g.setHasCentral( true );
- g.isHasCentral();
+ g.hasCentral();
hm = g.getGraph();
diff --git a/version.properties b/version.properties
index 5325c31..344bfdf 100644
--- a/version.properties
+++ b/version.properties
@@ -27,7 +27,7 @@
major=1
minor=0
-patch=10
+patch=11
base_version=${major}.${minor}.${patch}
# Release must be completed with git revision # in Jenkins