diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/main/java/org/onap/dmaap/dbcapi/service/MR_ClientService.java | 534 | ||||
-rw-r--r-- | src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java | 1139 |
2 files changed, 831 insertions, 842 deletions
diff --git a/src/main/java/org/onap/dmaap/dbcapi/service/MR_ClientService.java b/src/main/java/org/onap/dmaap/dbcapi/service/MR_ClientService.java index d0407c2..23627b5 100644 --- a/src/main/java/org/onap/dmaap/dbcapi/service/MR_ClientService.java +++ b/src/main/java/org/onap/dmaap/dbcapi/service/MR_ClientService.java @@ -9,9 +9,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -22,288 +22,278 @@ package org.onap.dmaap.dbcapi.service; +import org.onap.dmaap.dbcapi.aaf.AafService; +import org.onap.dmaap.dbcapi.aaf.AafService.ServiceType; +import org.onap.dmaap.dbcapi.aaf.AafUserRole; +import org.onap.dmaap.dbcapi.aaf.DmaapGrant; +import org.onap.dmaap.dbcapi.aaf.DmaapPerm; +import org.onap.dmaap.dbcapi.client.MrProvConnection; +import org.onap.dmaap.dbcapi.database.DatabaseClass; +import org.onap.dmaap.dbcapi.logging.BaseLoggingClass; +import org.onap.dmaap.dbcapi.model.ApiError; +import org.onap.dmaap.dbcapi.model.DcaeLocation; +import org.onap.dmaap.dbcapi.model.DmaapObject.DmaapObject_Status; +import org.onap.dmaap.dbcapi.model.MR_Client; +import org.onap.dmaap.dbcapi.model.MR_Cluster; +import org.onap.dmaap.dbcapi.model.Topic; +import org.onap.dmaap.dbcapi.util.DmaapConfig; + +import javax.ws.rs.core.Response.Status; import java.util.ArrayList; import java.util.List; import java.util.Map; -import javax.ws.rs.core.Response.Status; +public class MR_ClientService extends BaseLoggingClass { + private static final String MR_CLIENT_ID = "mrClientId"; + private int deleteLevel; + private Map<String, MR_Client> mr_clients = DatabaseClass.getMr_clients(); + private Map<String, MR_Cluster> clusters = DatabaseClass.getMr_clusters(); + private Map<String, DcaeLocation> locations = DatabaseClass.getDcaeLocations(); + private DmaapService dmaap = new DmaapService(); + private String centralCname; + public MR_ClientService() { + DmaapConfig p = (DmaapConfig) DmaapConfig.getConfig(); + centralCname = p.getProperty("MR.CentralCname", "MRcname.not.set"); + deleteLevel = Integer.valueOf(p.getProperty("MR.ClientDeleteLevel", "0")); + } + public List<MR_Client> getAllMr_Clients() { + return new ArrayList<>(mr_clients.values()); + } + List<MR_Client> getAllMrClients(String fqtn) { + ArrayList<MR_Client> results = new ArrayList<>(); + for (Map.Entry<String, MR_Client> entry : mr_clients.entrySet()) { + MR_Client client = entry.getValue(); + if (fqtn.equals(client.getFqtn())) { + results.add(client); + } + } + return results; + } + List<MR_Client> getClientsByLocation(String location) { + List<MR_Client> results = new ArrayList<>(); + for (Map.Entry<String, MR_Client> entry : mr_clients.entrySet()) { + MR_Client client = entry.getValue(); + if (location.equals(client.getDcaeLocationName())) { + results.add(client); + } + } + return results; + } + public MR_Client getMr_Client(String key, ApiError apiError) { + MR_Client c = mr_clients.get(key); + if (c == null) { + apiError.setCode(Status.NOT_FOUND.getStatusCode()); + apiError.setFields(MR_CLIENT_ID); + apiError.setMessage(MR_CLIENT_ID + " " + key + " not found"); + } else { + apiError.setCode(200); + } + return c; + } -import org.onap.dmaap.dbcapi.aaf.AafService; -import org.onap.dmaap.dbcapi.aaf.DmaapGrant; -import org.onap.dmaap.dbcapi.aaf.DmaapPerm; -import org.onap.dmaap.dbcapi.aaf.AafService.ServiceType; -import org.onap.dmaap.dbcapi.aaf.AafUserRole; -import org.onap.dmaap.dbcapi.client.MrProvConnection; -import org.onap.dmaap.dbcapi.database.DatabaseClass; -import org.onap.dmaap.dbcapi.logging.BaseLoggingClass; -import org.onap.dmaap.dbcapi.model.ApiError; -import org.onap.dmaap.dbcapi.model.DcaeLocation; -import org.onap.dmaap.dbcapi.model.MR_Client; -import org.onap.dmaap.dbcapi.model.MR_Cluster; -import org.onap.dmaap.dbcapi.model.Topic; -import org.onap.dmaap.dbcapi.model.DmaapObject.DmaapObject_Status; -import org.onap.dmaap.dbcapi.util.DmaapConfig; + public MR_Client addMr_Client(MR_Client client, Topic topic, ApiError err) { + if (client.getDcaeLocationName().isEmpty()) { + logger.info("Client dcaeLocation that doesn't exist or not specified"); + return null; + } + // original style: clients specified Role. This has precedence for backwards + // compatibility. + // ONAP style: clients specify Identity to be assigned to generated Role + String role = client.getClientRole(); + if (role != null) { + grantClientRolePerms(client, err); + } else if (client.hasClientIdentity()) { + if (client.isSubscriber()) { + role = topic.getSubscriberRole(); + assignIdentityToRole(client, role, err); + } + if (client.isPublisher()) { + role = topic.getPublisherRole(); + assignIdentityToRole(client, role, err); + } + } + if (!client.isStatusValid()) { + return null; + } + String centralFqdn = null; + DcaeLocation candidate = locations.get(client.getDcaeLocationName()); + + MR_Cluster cluster = clusters.get(client.getDcaeLocationName()); + if (cluster != null && candidate != null) { + if (candidate.isCentral() && !topic.getReplicationCase().involvesFQDN()) { + centralFqdn = centralCname; + } + client.setTopicURL(cluster.genTopicURL(centralFqdn, client.getFqtn())); + if (centralFqdn == null) { + client.setStatus(addTopicToCluster(cluster, topic, err)); + if (!err.is2xx() && err.getCode() != 409) { + topic.setFqtn(err.getMessage()); + return null; + } + + } else { + MR_ClusterService clusters = new MR_ClusterService(); + // MM should only exist for edge-to-central + // we use a cname for the central target (default resiliency with no replicationGroup set) + // but still need to provision topics on all central MRs + for (MR_Cluster central : clusters.getCentralClusters()) { + client.setStatus(addTopicToCluster(central, topic, err)); + if (!err.is2xx() && err.getCode() != 409) { + topic.setFqtn(err.getMessage()); + return null; + } + } + } + + } else { + logger.warn("Client references a dcaeLocation that doesn't exist:" + client.getDcaeLocationName()); + client.setStatus(DmaapObject_Status.STAGED); + } + + mr_clients.put(client.getMrClientId(), client); + + err.setCode(200); + + return client; + } + + private DmaapObject_Status addTopicToCluster(MR_Cluster cluster, Topic topic, ApiError err) { + + MrProvConnection prov = new MrProvConnection(); + logger.info("POST topic " + topic.getFqtn() + " to cluster " + cluster.getFqdn() + " in loc " + cluster.getDcaeLocationName()); + if (prov.makeTopicConnection(cluster)) { + prov.doPostTopic(topic, err); + logger.info("response code: " + err.getCode()); + if (err.is2xx() || err.getCode() == 409) { + return DmaapObject_Status.VALID; + } + } + return DmaapObject_Status.INVALID; + } + + private void grantClientRolePerms(MR_Client client, ApiError err) { + AafService aaf = new AafService(ServiceType.AAF_TopicMgr); + + String instance = ":topic." + client.getFqtn(); + client.setStatus(DmaapObject_Status.VALID); + String role = client.getClientRole(); + for (String want : client.getAction()) { + int rc; + DmaapPerm perm = new DmaapPerm(dmaap.getTopicPerm(), instance, want); + if (role != null) { + DmaapGrant g = new DmaapGrant(perm, role); + rc = aaf.addGrant(g); + if (rc != 201 && rc != 409) { + client.setStatus(DmaapObject_Status.INVALID); + err.setCode(rc); + err.setMessage("Grant of " + dmaap.getTopicPerm() + "|" + instance + "|" + want + " failed for " + role); + logger.warn(err.getMessage()); + return; + } + } else { + logger.warn("No Grant of " + dmaap.getTopicPerm() + "|" + instance + "|" + want + " because role is null "); + } + } + } + + private void assignIdentityToRole(MR_Client client, String role, ApiError err) { + AafService aaf = new AafService(ServiceType.AAF_TopicMgr); + + AafUserRole ur = new AafUserRole(client.getClientIdentity(), role); + int rc = aaf.addUserRole(ur); + if (rc != 201 && rc != 409) { + client.setStatus(DmaapObject_Status.INVALID); + err.setCode(rc); + err.setMessage("Failed to add user " + client.getClientIdentity() + " to " + role); + logger.warn(err.getMessage()); + return; + } + client.setStatus(DmaapObject_Status.VALID); + + } + + private void revokeClientPerms(MR_Client client, ApiError err) { + AafService aaf = new AafService(ServiceType.AAF_TopicMgr); + + String instance = ":topic." + client.getFqtn(); + client.setStatus(DmaapObject_Status.VALID); + for (String want : client.getAction()) { + int rc; + DmaapPerm perm = new DmaapPerm(dmaap.getTopicPerm(), instance, want); + DmaapGrant g = new DmaapGrant(perm, client.getClientRole()); + rc = aaf.delGrant(g); + if (rc != 200 && rc != 404) { + client.setStatus(DmaapObject_Status.INVALID); + err.setCode(rc); + err.setMessage("Revoke of " + dmaap.getTopicPerm() + "|" + instance + "|" + want + " failed for " + client.getClientRole()); + logger.warn(err.getMessage()); + return; + } + } + } + + public MR_Client updateMr_Client(MR_Client client, ApiError apiError) { + MR_Client c = mr_clients.get(client.getMrClientId()); + if (c == null) { + apiError.setCode(Status.NOT_FOUND.getStatusCode()); + apiError.setFields(MR_CLIENT_ID); + apiError.setMessage("mrClientId " + client.getMrClientId() + " not found"); + } else { + apiError.setCode(200); + } + mr_clients.put(client.getMrClientId(), client); + return client; + } + + public void removeMr_Client(String key, boolean updateTopicView, ApiError apiError) { + MR_Client client = mr_clients.get(key); + if (client == null) { + apiError.setCode(Status.NOT_FOUND.getStatusCode()); + apiError.setFields(MR_CLIENT_ID); + apiError.setMessage("mrClientId " + key + " not found"); + return; + } else { + apiError.setCode(200); + } + + if (updateTopicView) { + + TopicService topics = new TopicService(); + + Topic t = topics.getTopic(client.getFqtn(), apiError); + if (t != null) { + List<MR_Client> tc = t.getClients(); + for (MR_Client c : tc) { + if (c.getMrClientId().equals(client.getMrClientId())) { + tc.remove(c); + break; + } + } + t.setClients(tc); + topics.updateTopic(t, apiError); + } + + } + + // remove from AAF + if (deleteLevel >= 2) { + revokeClientPerms(client, apiError); + if (!apiError.is2xx()) { + return; + } + } + // remove from DB + if (deleteLevel >= 1) { + mr_clients.remove(key); + } + } -public class MR_ClientService extends BaseLoggingClass{ - - private static final String MR_CLIENT_ID = "mrClientId"; - private int deleteLevel; - private Map<String, MR_Client> mr_clients = DatabaseClass.getMr_clients(); - private Map<String, MR_Cluster> clusters = DatabaseClass.getMr_clusters(); - private Map<String, DcaeLocation> locations = DatabaseClass.getDcaeLocations(); - private DmaapService dmaap = new DmaapService(); - private String centralCname; - - public MR_ClientService() { - DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig(); - - centralCname = p.getProperty("MR.CentralCname", "MRcname.not.set"); - deleteLevel = Integer.valueOf(p.getProperty("MR.ClientDeleteLevel", "0" )); - } - - public List<MR_Client> getAllMr_Clients() { - return new ArrayList<>(mr_clients.values()); - } - - List<MR_Client> getAllMrClients(String fqtn) { - ArrayList<MR_Client> results = new ArrayList<>(); - for (Map.Entry<String, MR_Client> entry : mr_clients.entrySet()) - { - MR_Client client = entry.getValue(); - if ( fqtn.equals(client.getFqtn() ) ) { - results.add( client ); - } - } - return results; - } - - List<MR_Client> getClientsByLocation(String location) { - List<MR_Client> results = new ArrayList<>(); - for (Map.Entry<String, MR_Client> entry : mr_clients.entrySet()) - { - MR_Client client = entry.getValue(); - if ( location.equals(client.getDcaeLocationName() ) ) { - results.add( client ); - } - } - return results; - } - - - public MR_Client getMr_Client( String key, ApiError apiError ) { - MR_Client c = mr_clients.get( key ); - if ( c == null ) { - apiError.setCode(Status.NOT_FOUND.getStatusCode()); - apiError.setFields(MR_CLIENT_ID); - apiError.setMessage(MR_CLIENT_ID+ " " + key + " not found" ); - } else { - apiError.setCode(200); - } - return c; - } - - public MR_Client addMr_Client( MR_Client client, Topic topic, ApiError err ) { - if ( client.getDcaeLocationName().isEmpty()) { - logger.info( "Client dcaeLocation that doesn't exist or not specified" ); - return null; - } - // original style: clients specified Role. This has precedence for backwards - // compatibility. - // ONAP style: clients specify Identity to be assigned to generated Role - String role = client.getClientRole(); - if ( role != null ) { - grantClientRolePerms( client, err); - } else if ( client.hasClientIdentity() ){ - if ( client.isSubscriber() ) { - role = topic.getSubscriberRole(); - assignIdentityToRole( client, role, err ); - } - if (client.isPublisher() ) { - role = topic.getPublisherRole(); - assignIdentityToRole( client, role, err ); - } - } - if ( ! client.isStatusValid()) { - return null; - } - String centralFqdn = null; - DcaeLocation candidate = locations.get(client.getDcaeLocationName()); - - MR_Cluster cluster = clusters.get( client.getDcaeLocationName()); - if ( cluster != null && candidate != null ) { - if ( candidate.isCentral() && ! topic.getReplicationCase().involvesFQDN() ) { - centralFqdn = centralCname; - } - client.setTopicURL(cluster.genTopicURL(centralFqdn, client.getFqtn())); - if ( centralFqdn == null ) { - client.setStatus( addTopicToCluster( cluster, topic, err)); - if( ! err.is2xx() && err.getCode() != 409 ) { - topic.setFqtn(err.getMessage()); - return null; - } - - } else { - MR_ClusterService clusters = new MR_ClusterService(); - // MM should only exist for edge-to-central - // we use a cname for the central target (default resiliency with no replicationGroup set) - // but still need to provision topics on all central MRs - for( MR_Cluster central: clusters.getCentralClusters() ) { - client.setStatus( addTopicToCluster( central, topic, err)); - if( ! err.is2xx() && err.getCode() != 409 ) { - topic.setFqtn(err.getMessage()); - return null; - } - } - } - - } else { - logger.warn( "Client references a dcaeLocation that doesn't exist:" + client.getDcaeLocationName()); - client.setStatus( DmaapObject_Status.STAGED); - } - - mr_clients.put( client.getMrClientId(), client ); - - err.setCode(200); - - return client; - } - - private DmaapObject_Status addTopicToCluster( MR_Cluster cluster, Topic topic, ApiError err ){ - - MrProvConnection prov = new MrProvConnection(); - logger.info( "POST topic " + topic.getFqtn() + " to cluster " + cluster.getFqdn() + " in loc " + cluster.getDcaeLocationName()); - if ( prov.makeTopicConnection(cluster)) { - prov.doPostTopic(topic, err); - logger.info( "response code: " + err.getCode() ); - if ( err.is2xx() || err.getCode() == 409 ) { - return DmaapObject_Status.VALID; - } - } - return DmaapObject_Status.INVALID; - } - - private void grantClientRolePerms( MR_Client client, ApiError err) { - AafService aaf = new AafService(ServiceType.AAF_TopicMgr); - - String instance = ":topic." + client.getFqtn(); - client.setStatus( DmaapObject_Status.VALID); - String role = client.getClientRole(); - for( String want : client.getAction() ) { - int rc; - DmaapPerm perm = new DmaapPerm( dmaap.getTopicPerm(), instance, want ); - if ( role != null ) { - DmaapGrant g = new DmaapGrant( perm, role ); - rc = aaf.addGrant( g ); - if ( rc != 201 && rc != 409 ) { - client.setStatus( DmaapObject_Status.INVALID); - err.setCode(rc); - err.setMessage( "Grant of " + dmaap.getTopicPerm() + "|" + instance + "|" + want + " failed for " + role ); - logger.warn( err.getMessage()); - return; - } - } else { - logger.warn( "No Grant of " + dmaap.getTopicPerm() + "|" + instance + "|" + want + " because role is null " ); - } - } - } - - private void assignIdentityToRole( MR_Client client, String role, ApiError err ) { - AafService aaf = new AafService(ServiceType.AAF_TopicMgr); - - AafUserRole ur = new AafUserRole( client.getClientIdentity(), role ); - int rc = aaf.addUserRole( ur ); - if ( rc != 201 && rc != 409 ) { - client.setStatus( DmaapObject_Status.INVALID); - err.setCode(rc); - err.setMessage( "Failed to add user " + client.getClientIdentity()+ " to " + role ); - logger.warn( err.getMessage()); - return; - } - client.setStatus( DmaapObject_Status.VALID); - - } - private void revokeClientPerms( MR_Client client, ApiError err) { - AafService aaf = new AafService(ServiceType.AAF_TopicMgr); - - String instance = ":topic." + client.getFqtn(); - client.setStatus( DmaapObject_Status.VALID); - for( String want : client.getAction() ) { - int rc; - DmaapPerm perm = new DmaapPerm( dmaap.getTopicPerm(), instance, want ); - DmaapGrant g = new DmaapGrant( perm, client.getClientRole() ); - rc = aaf.delGrant( g ); - if ( rc != 200 && rc != 404 ) { - client.setStatus( DmaapObject_Status.INVALID); - err.setCode(rc); - err.setMessage( "Revoke of " + dmaap.getTopicPerm() + "|" + instance + "|" + want + " failed for " + client.getClientRole() ); - logger.warn( err.getMessage()); - return; - } - } - } - - public MR_Client updateMr_Client( MR_Client client, ApiError apiError ) { - MR_Client c = mr_clients.get( client.getMrClientId()); - if ( c == null ) { - apiError.setCode(Status.NOT_FOUND.getStatusCode()); - apiError.setFields(MR_CLIENT_ID); - apiError.setMessage("mrClientId " + client.getMrClientId() + " not found" ); - } else { - apiError.setCode(200); - } - mr_clients.put( client.getMrClientId(), client ); - return client; - } - - public void removeMr_Client( String key, boolean updateTopicView, ApiError apiError ) { - MR_Client client = mr_clients.get( key ); - if ( client == null ) { - apiError.setCode(Status.NOT_FOUND.getStatusCode()); - apiError.setFields(MR_CLIENT_ID); - apiError.setMessage("mrClientId " + key + " not found" ); - return; - } else { - apiError.setCode(200); - } - - if (updateTopicView) { - - TopicService topics = new TopicService(); - - Topic t = topics.getTopic(client.getFqtn(), apiError ); - if ( t != null ) { - List<MR_Client> tc = t.getClients(); - for( MR_Client c: tc) { - if ( c.getMrClientId().equals(client.getMrClientId())) { - tc.remove(c); - break; - } - } - t.setClients(tc); - topics.updateTopic( t, apiError ); - } - - } - - // remove from AAF - if ( deleteLevel >= 2 ) { - revokeClientPerms( client, apiError ); - if ( ! apiError.is2xx()) { - return; - } - } - // remove from DB - if ( deleteLevel >= 1 ) { - mr_clients.remove(key); - } - } - } diff --git a/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java b/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java index 08e58be..83591dd 100644 --- a/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java +++ b/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java @@ -9,9 +9,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -22,20 +22,11 @@ package org.onap.dmaap.dbcapi.service; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import javax.ws.rs.core.Response.Status; - import org.onap.dmaap.dbcapi.aaf.AafNamespace; import org.onap.dmaap.dbcapi.aaf.AafRole; import org.onap.dmaap.dbcapi.aaf.AafService; -import org.onap.dmaap.dbcapi.aaf.DmaapGrant; import org.onap.dmaap.dbcapi.aaf.AafService.ServiceType; +import org.onap.dmaap.dbcapi.aaf.DmaapGrant; import org.onap.dmaap.dbcapi.aaf.DmaapPerm; import org.onap.dmaap.dbcapi.database.DatabaseClass; import org.onap.dmaap.dbcapi.logging.BaseLoggingClass; @@ -52,565 +43,573 @@ import org.onap.dmaap.dbcapi.util.DmaapConfig; import org.onap.dmaap.dbcapi.util.Fqdn; import org.onap.dmaap.dbcapi.util.Graph; +import javax.ws.rs.core.Response.Status; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + public class TopicService extends BaseLoggingClass { - - - // REF: https://wiki.web.att.com/pages/viewpage.action?pageId=519703122 - private static String defaultGlobalMrHost; - - private Map<String, Topic> mr_topics; - - private static DmaapService dmaapSvc = new DmaapService(); - private MR_ClientService clientService; - private MR_ClusterService clusters; - private DcaeLocationService locations; - private MirrorMakerService bridge; - - private static String centralCname; - private static boolean createTopicRoles; - private boolean strictGraph = true; - private boolean mmPerMR; - - - public TopicService(){ - this(DatabaseClass.getTopics(), new MR_ClientService(), (DmaapConfig)DmaapConfig.getConfig(), - new MR_ClusterService(), new DcaeLocationService(), new MirrorMakerService()); - - } - - TopicService(Map<String, Topic> mr_topics, MR_ClientService clientService, DmaapConfig p, - MR_ClusterService clusters, DcaeLocationService locations, MirrorMakerService bridge) { - this.mr_topics = mr_topics; - this.clientService = clientService; - defaultGlobalMrHost = p.getProperty("MR.globalHost", "global.host.not.set"); - centralCname = p.getProperty("MR.CentralCname"); - createTopicRoles = "true".equalsIgnoreCase(p.getProperty("aaf.CreateTopicRoles", "true")); - String unit_test = p.getProperty( "UnitTest", "No" ); - if ( "Yes".equals(unit_test)) { - strictGraph = false; - } - mmPerMR = "true".equalsIgnoreCase(p.getProperty("MirrorMakerPerMR", "true")); - logger.info( "TopicService properties: CentralCname=" + centralCname + - " defaultGlobarlMrHost=" + defaultGlobalMrHost + - " createTopicRoles=" + createTopicRoles + - " mmPerMR=" + mmPerMR ); - this.clusters = clusters; - this.locations = locations; - this.bridge = bridge; - } - - public Map<String, Topic> getTopics() { - return mr_topics; - } - - public List<Topic> getAllTopics() { - return getAllTopics( true ); - } - public List<Topic> getAllTopicsWithoutClients() { - return getAllTopics(false); - } - - private List<Topic> getAllTopics( Boolean withClients ) { - ArrayList<Topic> topics = new ArrayList<>(mr_topics.values()); - if ( withClients ) { - for( Topic topic: topics ) { - topic.setClients(clientService.getAllMrClients(topic.getFqtn())); - } - } - return topics; - } - - - public Topic getTopic( String key, ApiError apiError ) { - logger.info( "getTopic: key=" + key); - Topic t = mr_topics.get( key ); - if ( t == null ) { - apiError.setCode(Status.NOT_FOUND.getStatusCode()); - apiError.setFields( "fqtn"); - apiError.setMessage("topic with fqtn " + key + " not found"); - return null; - } - t.setClients( clientService.getAllMrClients( key )); - apiError.setCode(Status.OK.getStatusCode()); - return t; - } - - private void aafTopicSetup(Topic topic, ApiError err ) { - - String nsr = dmaapSvc.getDmaap().getTopicNsRoot(); - if ( nsr == null ) { - err.setCode(500); - err.setMessage("Unable to establish AAF namespace root: (check /dmaap object)" ); - err.setFields("topicNsRoot"); - return; - } - - // establish AAF Connection using TopicMgr identity - AafService aaf = new AafService(ServiceType.AAF_TopicMgr); - - AafRole pubRole = null; - AafRole subRole = null; - - // creating Topic Roles was not an original feature. - // For backwards compatibility, only do this if the feature is enabled. - // Also, if the namespace of the topic is a foreign namespace, (i.e. not the same as our root ns) - // then we likely don't have permission to create sub-ns and Roles so don't try. - if ( createTopicRoles && topic.getFqtn().startsWith(nsr)) { - // create AAF namespace for this topic - AafNamespace ns = new AafNamespace( topic.getFqtn(), aaf.getIdentity()); - { - int rc = aaf.addNamespace( ns ); - if ( rc != 201 && rc != 409 ) { - err.setCode(500); - err.setMessage("Unexpected response from AAF:" + rc ); - err.setFields("namespace:" + topic.getFqtn() + " identity="+ aaf.getIdentity()); - return; - } - } - - // create AAF Roles for MR clients of this topic - String rn = "publisher"; - pubRole = new AafRole( topic.getFqtn(), rn ); - int rc = aaf.addRole( pubRole ); - if ( rc != 201 && rc != 409 ) { - err.setCode(500); - err.setMessage("Unexpected response from AAF:" + rc ); - err.setFields("topic:" + topic.getFqtn() + " role="+ rn); - return; - } - topic.setPublisherRole( pubRole.getFullyQualifiedRole() ); - - rn = "subscriber"; - subRole = new AafRole( topic.getFqtn(), rn ); - rc = aaf.addRole( subRole ); - if ( rc != 201 && rc != 409 ) { - err.setCode(500); - err.setMessage("Unexpected response from AAF:" + rc ); - err.setFields("topic:" + topic.getFqtn() + " role="+ rn); - return; - } - topic.setSubscriberRole( subRole.getFullyQualifiedRole() ); - } - - // create AAF perms checked by MR - String instance = ":topic." + topic.getFqtn(); - String[] actions = { "pub", "sub", "view" }; - String t = dmaapSvc.getTopicPerm(); - for ( String action : actions ){ - DmaapPerm perm = new DmaapPerm( t, instance, action ); - int rc = aaf.addPerm( perm ); - if ( rc != 201 && rc != 409 ) { - err.setCode(500); - err.setMessage("Unexpected response from AAF:" + rc ); - err.setFields("t="+t + " instance="+ instance + " action="+ action); - return; - } - if ( createTopicRoles ) { - // Grant perms to our default Roles - if ( action.equals( "pub") || action.equals( "view") ) { - DmaapGrant g = new DmaapGrant( perm, pubRole.getFullyQualifiedRole() ); - rc = aaf.addGrant( g ); - if ( rc != 201 && rc != 409 ) { - err.setCode(rc); - err.setMessage( "Grant of " + perm.toString() + " failed for " + pubRole.getFullyQualifiedRole() ); - logger.warn( err.getMessage()); - return; - } - } - if ( action.equals( "sub") || action.equals( "view") ) { - DmaapGrant g = new DmaapGrant( perm, subRole.getFullyQualifiedRole() ); - rc = aaf.addGrant( g ); - if ( rc != 201 && rc != 409 ) { - err.setCode(rc); - err.setMessage( "Grant of " + perm.toString() + " failed for " + subRole.getFullyQualifiedRole() ); - logger.warn( err.getMessage()); - return; - } - } - } - - } - } - - public Topic addTopic( Topic topic, ApiError err, Boolean useExisting ) { - logger.info( "Entry: addTopic"); - logger.info( "Topic name=" + topic.getTopicName() + " fqtnStyle=" + topic.getFqtnStyle() ); - String nFqtn = topic.genFqtn(); - logger.info( "FQTN=" + nFqtn ); - Topic pTopic = getTopic( nFqtn, err ); - if ( pTopic != null ) { - String t = "topic already exists: " + nFqtn; - logger.info( t ); - if ( useExisting ) { - err.setCode(Status.OK.getStatusCode()); - return pTopic; - } - err.setMessage( t ); - err.setFields( "fqtn"); - err.setCode(Status.CONFLICT.getStatusCode()); - return null; - } - err.reset(); // err filled with NOT_FOUND is expected case, but don't want to litter... - - topic.setFqtn( nFqtn ); - - aafTopicSetup( topic, err ); - if ( err.getCode() >= 400 ) { - return null; - } - - if ( topic.getReplicationCase().involvesGlobal() ) { - if ( topic.getGlobalMrURL() == null ) { - topic.setGlobalMrURL(defaultGlobalMrHost); - } - if ( ! Fqdn.isValid( topic.getGlobalMrURL())) { - logger.error( "GlobalMR FQDN not valid: " + topic.getGlobalMrURL()); - topic.setStatus( DmaapObject_Status.INVALID); - err.setCode(500); - err.setMessage("Value is not a valid FQDN:" + topic.getGlobalMrURL() ); - err.setFields("globalMrURL"); - - return null; - } - } - - - if ( topic.getNumClients() > 0 ) { - ArrayList<MR_Client> clients = new ArrayList<MR_Client>(topic.getClients()); - - - ArrayList<MR_Client> clients2 = new ArrayList<MR_Client>(); - for ( Iterator<MR_Client> it = clients.iterator(); it.hasNext(); ) { - MR_Client c = it.next(); - - logger.info( "c fqtn=" + c.getFqtn() + " ID=" + c.getMrClientId() + " url=" + c.getTopicURL()); - MR_Client nc = new MR_Client( c.getDcaeLocationName(), topic.getFqtn(), c.getClientRole(), c.getAction()); - nc.setFqtn(topic.getFqtn()); - nc.setClientIdentity( c.getClientIdentity()); - logger.info( "nc fqtn=" + nc.getFqtn() + " ID=" + nc.getMrClientId() + " url=" + nc.getTopicURL()); - clients2.add( clientService.addMr_Client(nc, topic, err)); - if ( ! err.is2xx()) { - return null; - } - } - - topic.setClients(clients2); - } - - Topic ntopic = checkForBridge( topic, err ); - if ( ntopic == null ) { - topic.setStatus( DmaapObject_Status.INVALID); - if ( ! err.is2xx()) { - return null; - } - } - - - mr_topics.put( nFqtn, ntopic ); - - err.setCode(Status.OK.getStatusCode()); - return ntopic; - } - - - public Topic updateTopic( Topic topic, ApiError err ) { - logger.info( "updateTopic: entry"); - logger.info( "updateTopic: topic=" + topic); - logger.info( "updateTopic: fqtn=" + topic.getFqtn() ); - if ( topic.getFqtn().isEmpty()) { - return null; - } - logger.info( "updateTopic: call checkForBridge"); - Topic ntopic = checkForBridge( topic, err ); - if ( ntopic == null ) { - topic.setStatus( DmaapObject_Status.INVALID); - if ( ! err.is2xx() ) { - return null; - } - } - if(ntopic != null) { - logger.info( "updateTopic: call put"); - mr_topics.put( ntopic.getFqtn(), ntopic ); - } - err.setCode(Status.OK.getStatusCode()); - return ntopic; - } - - public Topic removeTopic( String pubId, ApiError apiError ) { - Topic topic = mr_topics.get(pubId); - if ( topic == null ) { - apiError.setCode(Status.NOT_FOUND.getStatusCode()); - apiError.setMessage("Topic " + pubId + " does not exist"); - apiError.setFields("fqtn"); - return null; - } - ArrayList<MR_Client> clients = new ArrayList<MR_Client>(clientService.getAllMrClients( pubId )); - for ( Iterator<MR_Client> it = clients.iterator(); it.hasNext(); ) { - MR_Client c = it.next(); - - - clientService.removeMr_Client(c.getMrClientId(), false, apiError); - if ( ! apiError.is2xx()) { - return null; - } - } - apiError.setCode(Status.OK.getStatusCode()); - return mr_topics.remove(pubId); - } - public static ApiError setBridgeClientPerms( MR_Cluster node ) { - DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig(); - String mmProvRole = p.getProperty("MM.ProvRole"); - String mmAgentRole = p.getProperty("MM.AgentRole"); - String[] Roles = { mmProvRole, mmAgentRole }; - String[] actions = { "view", "pub", "sub" }; - Topic bridgeAdminTopic = new Topic().init(); - bridgeAdminTopic.setTopicName( dmaapSvc.getBridgeAdminFqtn() ); - bridgeAdminTopic.setTopicDescription( "RESERVED topic for MirroMaker Provisioning"); - bridgeAdminTopic.setOwner( "DBC" ); - - ArrayList<MR_Client> clients = new ArrayList<MR_Client>(); - for( String role: Roles ) { - MR_Client client = new MR_Client(); - client.setAction(actions); - client.setClientRole(role); - client.setDcaeLocationName( node.getDcaeLocationName()); - clients.add( client ); - } - bridgeAdminTopic.setClients(clients); - - TopicService ts = new TopicService(); - ApiError err = new ApiError(); - ts.addTopic(bridgeAdminTopic, err, true); - - if ( err.is2xx() || err.getCode() == 409 ){ - err.setCode(200); - return err; - } - - errorLogger.error( DmaapbcLogMessageEnum.TOPIC_CREATE_ERROR, bridgeAdminTopic.getFqtn(), Integer.toString(err.getCode()), err.getFields(), err.getMessage()); - return err; - } - - - public Topic checkForBridge( Topic topic, ApiError err ) { - logger.info( "checkForBridge: entry"); - logger.info( "fqtn=" + topic.getFqtn() + " replicatonType=" + topic.getReplicationCase()); - if ( topic.getReplicationCase() == ReplicationType.REPLICATION_NONE ) { - topic.setStatus( DmaapObject_Status.VALID); - return topic; - } - - boolean anythingWrong = false; - - Set<String> groups = clusters.getGroups(); - for ( String g : groups ) { - logger.info( "buildBridge for " + topic.getFqtn() + " on group" + g); - anythingWrong |= buildBridge( topic, err, g ); - } - if ( anythingWrong ) { - topic.setStatus( DmaapObject_Status.INVALID); - if ( ! err.is2xx() ) { - return null; - } - } else { - topic.setStatus( DmaapObject_Status.VALID); - } - return topic; - } - - private boolean buildBridge( Topic topic, ApiError err, String group ) { - logger.info( "buildBridge: entry"); - boolean anythingWrong = false; - Graph graph; - logger.info( "buildBridge: strictGraph=" + strictGraph ); - if ( group == null || group.isEmpty() ) { - graph = new Graph( topic.getClients(), strictGraph ); - } else { - graph = new Graph( topic.getClients(), strictGraph, group ); - } - logger.info( "buildBridge: graph=" + graph ); - MR_Cluster groupCentralCluster = null; - - - if ( graph.isEmpty() ) { - logger.info( "buildBridge: graph is empty. return false" ); - return false; - } else if ( group == null && topic.getReplicationCase().involvesFQDN() ) { - logger.info( "buildBridge: group is null and replicationCaseInvolvesFQDN. return false" ); - return false; - } else if ( ! graph.hasCentral() ) { - logger.warn( "Topic " + topic.getFqtn() + " wants to be " + topic.getReplicationCase() + " but has no central clients"); - return true; - } else { - groupCentralCluster = clusters.getMr_ClusterByLoc(graph.getCentralLoc()); - } - Collection<String> clientLocations = graph.getKeys(); - for( String loc : clientLocations ) { - logger.info( "loc=" + loc ); - DcaeLocation location = locations.getDcaeLocation(loc); - MR_Cluster cluster = clusters.getMr_ClusterByLoc(loc); - logger.info( "cluster=" + cluster + " at "+ cluster.getDcaeLocationName() ); - logger.info( "location.isCentral()="+location.isCentral() + " getCentralLoc()=" + graph.getCentralLoc() ); - - - - String source = null; - String target = null; - - /* - * Provision Edge to Central bridges... - */ - if ( ! location.isCentral() && ! graph.getCentralLoc().equals(cluster.getDcaeLocationName()) ) { - switch( topic.getReplicationCase() ) { - case REPLICATION_EDGE_TO_CENTRAL: - case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL: // NOTE: this is for E2C portion only - source = cluster.getFqdn(); - target = (mmPerMR)? groupCentralCluster.getFqdn() : centralCname; - logger.info( "REPLICATION_EDGE_TO_CENTRAL: source=" + source + " target=" +target ); - break; - case REPLICATION_CENTRAL_TO_EDGE: - case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE: // NOTE: this is for C2E portion only - source = (mmPerMR) ? groupCentralCluster.getFqdn() : centralCname; - target = cluster.getFqdn(); - break; - case REPLICATION_CENTRAL_TO_GLOBAL: - case REPLICATION_GLOBAL_TO_CENTRAL: - case REPLICATION_FQDN_TO_GLOBAL: - case REPLICATION_GLOBAL_TO_FQDN: - break; - - case REPLICATION_EDGE_TO_FQDN: - case REPLICATION_EDGE_TO_FQDN_TO_GLOBAL: // NOTE: this is for E2C portion only - source = cluster.getFqdn(); - target = groupCentralCluster.getFqdn(); - break; - case REPLICATION_FQDN_TO_EDGE: - case REPLICATION_GLOBAL_TO_FQDN_TO_EDGE: // NOTE: this is for F2E portion only - source = groupCentralCluster.getFqdn(); - target = cluster.getFqdn(); - break; - - default: - logger.error( "Unexpected value for ReplicationType ("+ topic.getReplicationCase() + ") for topic " + topic.getFqtn() ); - anythingWrong = true; - err.setCode(400); - err.setFields("topic=" + topic.genFqtn() + " replicationCase=" - + topic.getReplicationCase() ); - err.setMessage("Unexpected value for ReplicationType"); - continue; - } - - } else if ( location.isCentral() && graph.getCentralLoc().equals(cluster.getDcaeLocationName()) ) { - /* - * Provision Central to Global bridges - */ - switch( topic.getReplicationCase() ) { - - case REPLICATION_CENTRAL_TO_GLOBAL: - case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL: - source = centralCname; - target = topic.getGlobalMrURL(); - break; - case REPLICATION_GLOBAL_TO_CENTRAL: - case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE: // NOTE: this is for G2C portion only - source = topic.getGlobalMrURL(); - target = centralCname; - break; - - case REPLICATION_EDGE_TO_FQDN_TO_GLOBAL: // NOTE: this is for E2F portion only - source = groupCentralCluster.getFqdn(); - target = topic.getGlobalMrURL(); - break; - - case REPLICATION_FQDN_TO_GLOBAL: - source = groupCentralCluster.getFqdn(); - target = topic.getGlobalMrURL(); - break; - - case REPLICATION_GLOBAL_TO_FQDN: - case REPLICATION_GLOBAL_TO_FQDN_TO_EDGE: // NOTE: this is for G2F portion only - source = topic.getGlobalMrURL(); - target = groupCentralCluster.getFqdn(); - break; - - case REPLICATION_FQDN_TO_EDGE: - case REPLICATION_EDGE_TO_FQDN: - case REPLICATION_EDGE_TO_CENTRAL: - case REPLICATION_CENTRAL_TO_EDGE: - break; - default: - logger.error( "Unexpected value for ReplicationType ("+ topic.getReplicationCase() + ") for topic " + topic.getFqtn() ); - anythingWrong = true; - err.setCode(400); - err.setFields("topic=" + topic.genFqtn() + " replicationCase=" - + topic.getReplicationCase() ); - err.setMessage("Unexpected value for ReplicationType"); - continue; - } - } else { - logger.warn( "dcaeLocation " + loc + " is neither Edge nor Central so no mmagent provisioning was done"); - anythingWrong = true; - continue; - } - if ( source != null && target != null ) { - try { - logger.info( "Create a MM from " + source + " to " + target ); - MirrorMaker mm = bridge.findNextMM( source, target, topic.getFqtn()); - mm.addTopic(topic.getFqtn()); - bridge.updateMirrorMaker(mm); - } catch ( Exception ex ) { - err.setCode(500); - err.setFields( "mirror_maker.topic"); - err.setMessage("Unexpected condition: " + ex ); - anythingWrong = true; - break; - } - } - - - } - return anythingWrong; - - } - - - /* - * Prior to 1707, we only supported EDGE_TO_CENTRAL replication. - * This was determined automatically based on presence of edge publishers and central subscribers. - * The following method is a modification of that original logic, to preserve some backwards compatibility, - * i.e. to be used when no ReplicationType is specified. - */ - public ReplicationType reviewTopic( Topic topic ) { - - - if ( topic.getNumClients() > 1 ) { - Graph graph = new Graph( topic.getClients(), false ); - - String centralFqdn = new String(); - if ( graph.hasCentral() ) { - DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig(); - centralFqdn = p.getProperty("MR.CentralCname"); - } - - Collection<String> locations = graph.getKeys(); - for( String loc : locations ) { - logger.info( "loc=" + loc ); - MR_Cluster cluster = clusters.getMr_ClusterByLoc(loc); - if ( cluster == null ) { - logger.info( "No MR cluster for location " + loc ); - continue; - } - if ( graph.hasCentral() && ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) { - logger.info( "Detected case for EDGE_TO_CENTRAL from " + cluster.getFqdn() + " to " + centralFqdn ); - return ReplicationType.REPLICATION_EDGE_TO_CENTRAL; - - } - - } - } - - return ReplicationType.REPLICATION_NONE; - } + + // REF: https://wiki.web.att.com/pages/viewpage.action?pageId=519703122 + private static String defaultGlobalMrHost; + + private Map<String, Topic> mr_topics; + + private static DmaapService dmaapSvc = new DmaapService(); + private MR_ClientService clientService; + private MR_ClusterService clusters; + private DcaeLocationService locations; + private MirrorMakerService bridge; + + private static String centralCname; + private static boolean createTopicRoles; + private boolean strictGraph = true; + private boolean mmPerMR; + + + public TopicService() { + this(DatabaseClass.getTopics(), new MR_ClientService(), (DmaapConfig) DmaapConfig.getConfig(), + new MR_ClusterService(), new DcaeLocationService(), new MirrorMakerService()); + + } + + TopicService(Map<String, Topic> mr_topics, MR_ClientService clientService, DmaapConfig p, + MR_ClusterService clusters, DcaeLocationService locations, MirrorMakerService bridge) { + this.mr_topics = mr_topics; + this.clientService = clientService; + defaultGlobalMrHost = p.getProperty("MR.globalHost", "global.host.not.set"); + centralCname = p.getProperty("MR.CentralCname"); + createTopicRoles = "true".equalsIgnoreCase(p.getProperty("aaf.CreateTopicRoles", "true")); + String unit_test = p.getProperty("UnitTest", "No"); + if ("Yes".equals(unit_test)) { + strictGraph = false; + } + mmPerMR = "true".equalsIgnoreCase(p.getProperty("MirrorMakerPerMR", "true")); + logger.info("TopicService properties: CentralCname=" + centralCname + + " defaultGlobarlMrHost=" + defaultGlobalMrHost + + " createTopicRoles=" + createTopicRoles + + " mmPerMR=" + mmPerMR); + this.clusters = clusters; + this.locations = locations; + this.bridge = bridge; + } + + public Map<String, Topic> getTopics() { + return mr_topics; + } + + public List<Topic> getAllTopics() { + return getAllTopics(true); + } + + public List<Topic> getAllTopicsWithoutClients() { + return getAllTopics(false); + } + + private List<Topic> getAllTopics(Boolean withClients) { + ArrayList<Topic> topics = new ArrayList<>(mr_topics.values()); + if (withClients) { + for (Topic topic : topics) { + topic.setClients(clientService.getAllMrClients(topic.getFqtn())); + } + } + return topics; + } + + + public Topic getTopic(String key, ApiError apiError) { + logger.info("getTopic: key=" + key); + Topic t = mr_topics.get(key); + if (t == null) { + apiError.setCode(Status.NOT_FOUND.getStatusCode()); + apiError.setFields("fqtn"); + apiError.setMessage("topic with fqtn " + key + " not found"); + return null; + } + t.setClients(clientService.getAllMrClients(key)); + apiError.setCode(Status.OK.getStatusCode()); + return t; + } + + private void aafTopicSetup(Topic topic, ApiError err) { + + String nsr = dmaapSvc.getDmaap().getTopicNsRoot(); + if (nsr == null) { + err.setCode(500); + err.setMessage("Unable to establish AAF namespace root: (check /dmaap object)"); + err.setFields("topicNsRoot"); + return; + } + + // establish AAF Connection using TopicMgr identity + AafService aaf = new AafService(ServiceType.AAF_TopicMgr); + + AafRole pubRole = null; + AafRole subRole = null; + + // creating Topic Roles was not an original feature. + // For backwards compatibility, only do this if the feature is enabled. + // Also, if the namespace of the topic is a foreign namespace, (i.e. not the same as our root ns) + // then we likely don't have permission to create sub-ns and Roles so don't try. + if (createTopicRoles && topic.getFqtn().startsWith(nsr)) { + // create AAF namespace for this topic + AafNamespace ns = new AafNamespace(topic.getFqtn(), aaf.getIdentity()); + { + int rc = aaf.addNamespace(ns); + if (rc != 201 && rc != 409) { + err.setCode(500); + err.setMessage("Unexpected response from AAF:" + rc); + err.setFields("namespace:" + topic.getFqtn() + " identity=" + aaf.getIdentity()); + return; + } + } + + // create AAF Roles for MR clients of this topic + String rn = "publisher"; + pubRole = new AafRole(topic.getFqtn(), rn); + int rc = aaf.addRole(pubRole); + if (rc != 201 && rc != 409) { + err.setCode(500); + err.setMessage("Unexpected response from AAF:" + rc); + err.setFields("topic:" + topic.getFqtn() + " role=" + rn); + return; + } + topic.setPublisherRole(pubRole.getFullyQualifiedRole()); + + rn = "subscriber"; + subRole = new AafRole(topic.getFqtn(), rn); + rc = aaf.addRole(subRole); + if (rc != 201 && rc != 409) { + err.setCode(500); + err.setMessage("Unexpected response from AAF:" + rc); + err.setFields("topic:" + topic.getFqtn() + " role=" + rn); + return; + } + topic.setSubscriberRole(subRole.getFullyQualifiedRole()); + } + + // create AAF perms checked by MR + String instance = ":topic." + topic.getFqtn(); + String[] actions = {"pub", "sub", "view"}; + String t = dmaapSvc.getTopicPerm(); + for (String action : actions) { + DmaapPerm perm = new DmaapPerm(t, instance, action); + int rc = aaf.addPerm(perm); + if (rc != 201 && rc != 409) { + err.setCode(500); + err.setMessage("Unexpected response from AAF:" + rc); + err.setFields("t=" + t + " instance=" + instance + " action=" + action); + return; + } + if (createTopicRoles) { + // Grant perms to our default Roles + if (action.equals("pub") || action.equals("view")) { + DmaapGrant g = new DmaapGrant(perm, pubRole.getFullyQualifiedRole()); + rc = aaf.addGrant(g); + if (rc != 201 && rc != 409) { + err.setCode(rc); + err.setMessage("Grant of " + perm.toString() + " failed for " + pubRole.getFullyQualifiedRole()); + logger.warn(err.getMessage()); + return; + } + } + if (action.equals("sub") || action.equals("view")) { + DmaapGrant g = new DmaapGrant(perm, subRole.getFullyQualifiedRole()); + rc = aaf.addGrant(g); + if (rc != 201 && rc != 409) { + err.setCode(rc); + err.setMessage("Grant of " + perm.toString() + " failed for " + subRole.getFullyQualifiedRole()); + logger.warn(err.getMessage()); + return; + } + } + } + + } + } + + public Topic addTopic(Topic topic, ApiError err, Boolean useExisting) { + logger.info("Entry: addTopic"); + logger.info("Topic name=" + topic.getTopicName() + " fqtnStyle=" + topic.getFqtnStyle()); + String nFqtn = topic.genFqtn(); + logger.info("FQTN=" + nFqtn); + Topic pTopic = getTopic(nFqtn, err); + if (pTopic != null) { + String t = "topic already exists: " + nFqtn; + logger.info(t); + if (useExisting) { + err.setCode(Status.OK.getStatusCode()); + return pTopic; + } + err.setMessage(t); + err.setFields("fqtn"); + err.setCode(Status.CONFLICT.getStatusCode()); + return null; + } + err.reset(); // err filled with NOT_FOUND is expected case, but don't want to litter... + + topic.setFqtn(nFqtn); + + aafTopicSetup(topic, err); + if (err.getCode() >= 400) { + return null; + } + + if (topic.getReplicationCase().involvesGlobal()) { + if (topic.getGlobalMrURL() == null) { + topic.setGlobalMrURL(defaultGlobalMrHost); + } + if (!Fqdn.isValid(topic.getGlobalMrURL())) { + logger.error("GlobalMR FQDN not valid: " + topic.getGlobalMrURL()); + topic.setStatus(DmaapObject_Status.INVALID); + err.setCode(500); + err.setMessage("Value is not a valid FQDN:" + topic.getGlobalMrURL()); + err.setFields("globalMrURL"); + + return null; + } + } + + + if (topic.getNumClients() > 0) { + ArrayList<MR_Client> clients = new ArrayList<MR_Client>(topic.getClients()); + + + ArrayList<MR_Client> clients2 = new ArrayList<MR_Client>(); + for (Iterator<MR_Client> it = clients.iterator(); it.hasNext(); ) { + MR_Client c = it.next(); + + logger.info("c fqtn=" + c.getFqtn() + " ID=" + c.getMrClientId() + " url=" + c.getTopicURL()); + MR_Client nc = new MR_Client(c.getDcaeLocationName(), topic.getFqtn(), c.getClientRole(), c.getAction()); + nc.setFqtn(topic.getFqtn()); + nc.setClientIdentity(c.getClientIdentity()); + logger.info("nc fqtn=" + nc.getFqtn() + " ID=" + nc.getMrClientId() + " url=" + nc.getTopicURL()); + clients2.add(clientService.addMr_Client(nc, topic, err)); + if (!err.is2xx()) { + return null; + } + } + + topic.setClients(clients2); + } + + Topic ntopic = checkForBridge(topic, err); + if (ntopic == null) { + topic.setStatus(DmaapObject_Status.INVALID); + if (!err.is2xx()) { + return null; + } + } + + + mr_topics.put(nFqtn, ntopic); + + err.setCode(Status.OK.getStatusCode()); + return ntopic; + } + + + public Topic updateTopic(Topic topic, ApiError err) { + logger.info("updateTopic: entry"); + logger.info("updateTopic: topic=" + topic); + logger.info("updateTopic: fqtn=" + topic.getFqtn()); + if (topic.getFqtn().isEmpty()) { + return null; + } + logger.info("updateTopic: call checkForBridge"); + Topic ntopic = checkForBridge(topic, err); + if (ntopic == null) { + topic.setStatus(DmaapObject_Status.INVALID); + if (!err.is2xx()) { + return null; + } + } + if (ntopic != null) { + logger.info("updateTopic: call put"); + mr_topics.put(ntopic.getFqtn(), ntopic); + } + err.setCode(Status.OK.getStatusCode()); + return ntopic; + } + + public Topic removeTopic(String pubId, ApiError apiError) { + Topic topic = mr_topics.get(pubId); + if (topic == null) { + apiError.setCode(Status.NOT_FOUND.getStatusCode()); + apiError.setMessage("Topic " + pubId + " does not exist"); + apiError.setFields("fqtn"); + return null; + } + ArrayList<MR_Client> clients = new ArrayList<MR_Client>(clientService.getAllMrClients(pubId)); + for (Iterator<MR_Client> it = clients.iterator(); it.hasNext(); ) { + MR_Client c = it.next(); + + + clientService.removeMr_Client(c.getMrClientId(), false, apiError); + if (!apiError.is2xx()) { + return null; + } + } + apiError.setCode(Status.OK.getStatusCode()); + return mr_topics.remove(pubId); + } + + public static ApiError setBridgeClientPerms(MR_Cluster node) { + DmaapConfig p = (DmaapConfig) DmaapConfig.getConfig(); + String mmProvRole = p.getProperty("MM.ProvRole"); + String mmAgentRole = p.getProperty("MM.AgentRole"); + String[] Roles = {mmProvRole, mmAgentRole}; + String[] actions = {"view", "pub", "sub"}; + Topic bridgeAdminTopic = new Topic().init(); + bridgeAdminTopic.setTopicName(dmaapSvc.getBridgeAdminFqtn()); + bridgeAdminTopic.setTopicDescription("RESERVED topic for MirroMaker Provisioning"); + bridgeAdminTopic.setOwner("DBC"); + + ArrayList<MR_Client> clients = new ArrayList<MR_Client>(); + for (String role : Roles) { + MR_Client client = new MR_Client(); + client.setAction(actions); + client.setClientRole(role); + client.setDcaeLocationName(node.getDcaeLocationName()); + clients.add(client); + } + bridgeAdminTopic.setClients(clients); + + TopicService ts = new TopicService(); + ApiError err = new ApiError(); + ts.addTopic(bridgeAdminTopic, err, true); + + if (err.is2xx() || err.getCode() == 409) { + err.setCode(200); + return err; + } + + errorLogger.error(DmaapbcLogMessageEnum.TOPIC_CREATE_ERROR, bridgeAdminTopic.getFqtn(), Integer.toString(err.getCode()), err.getFields(), err.getMessage()); + return err; + } + + + public Topic checkForBridge(Topic topic, ApiError err) { + logger.info("checkForBridge: entry"); + logger.info("fqtn=" + topic.getFqtn() + " replicatonType=" + topic.getReplicationCase()); + if (topic.getReplicationCase() == ReplicationType.REPLICATION_NONE) { + topic.setStatus(DmaapObject_Status.VALID); + return topic; + } + + boolean anythingWrong = false; + + Set<String> groups = clusters.getGroups(); + for (String g : groups) { + logger.info("buildBridge for " + topic.getFqtn() + " on group" + g); + anythingWrong |= buildBridge(topic, err, g); + } + if (anythingWrong) { + topic.setStatus(DmaapObject_Status.INVALID); + if (!err.is2xx()) { + return null; + } + } else { + topic.setStatus(DmaapObject_Status.VALID); + } + return topic; + } + + private boolean buildBridge(Topic topic, ApiError err, String group) { + logger.info("buildBridge: entry"); + boolean anythingWrong = false; + Graph graph; + logger.info("buildBridge: strictGraph=" + strictGraph); + if (group == null || group.isEmpty()) { + graph = new Graph(topic.getClients(), strictGraph); + } else { + graph = new Graph(topic.getClients(), strictGraph, group); + } + logger.info("buildBridge: graph=" + graph); + MR_Cluster groupCentralCluster = null; + + + if (graph.isEmpty()) { + logger.info("buildBridge: graph is empty. return false"); + return false; + } else if (group == null && topic.getReplicationCase().involvesFQDN()) { + logger.info("buildBridge: group is null and replicationCaseInvolvesFQDN. return false"); + return false; + } else if (!graph.hasCentral()) { + logger.warn("Topic " + topic.getFqtn() + " wants to be " + topic.getReplicationCase() + " but has no central clients"); + return true; + } else { + groupCentralCluster = clusters.getMr_ClusterByLoc(graph.getCentralLoc()); + } + Collection<String> clientLocations = graph.getKeys(); + for (String loc : clientLocations) { + logger.info("loc=" + loc); + DcaeLocation location = locations.getDcaeLocation(loc); + MR_Cluster cluster = clusters.getMr_ClusterByLoc(loc); + logger.info("cluster=" + cluster + " at " + cluster.getDcaeLocationName()); + logger.info("location.isCentral()=" + location.isCentral() + " getCentralLoc()=" + graph.getCentralLoc()); + + + String source = null; + String target = null; + + /* + * Provision Edge to Central bridges... + */ + if (!location.isCentral() && !graph.getCentralLoc().equals(cluster.getDcaeLocationName())) { + switch (topic.getReplicationCase()) { + case REPLICATION_EDGE_TO_CENTRAL: + case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL: // NOTE: this is for E2C portion only + source = cluster.getFqdn(); + target = (mmPerMR) ? groupCentralCluster.getFqdn() : centralCname; + logger.info("REPLICATION_EDGE_TO_CENTRAL: source=" + source + " target=" + target); + break; + case REPLICATION_CENTRAL_TO_EDGE: + case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE: // NOTE: this is for C2E portion only + source = (mmPerMR) ? groupCentralCluster.getFqdn() : centralCname; + target = cluster.getFqdn(); + break; + case REPLICATION_CENTRAL_TO_GLOBAL: + case REPLICATION_GLOBAL_TO_CENTRAL: + case REPLICATION_FQDN_TO_GLOBAL: + case REPLICATION_GLOBAL_TO_FQDN: + break; + + case REPLICATION_EDGE_TO_FQDN: + case REPLICATION_EDGE_TO_FQDN_TO_GLOBAL: // NOTE: this is for E2C portion only + source = cluster.getFqdn(); + target = groupCentralCluster.getFqdn(); + break; + case REPLICATION_FQDN_TO_EDGE: + case REPLICATION_GLOBAL_TO_FQDN_TO_EDGE: // NOTE: this is for F2E portion only + source = groupCentralCluster.getFqdn(); + target = cluster.getFqdn(); + break; + + default: + logger.error("Unexpected value for ReplicationType (" + topic.getReplicationCase() + ") for topic " + topic.getFqtn()); + anythingWrong = true; + err.setCode(400); + err.setFields("topic=" + topic.genFqtn() + " replicationCase=" + + topic.getReplicationCase()); + err.setMessage("Unexpected value for ReplicationType"); + continue; + } + + } else if (location.isCentral() && graph.getCentralLoc().equals(cluster.getDcaeLocationName())) { + /* + * Provision Central to Global bridges + */ + switch (topic.getReplicationCase()) { + + case REPLICATION_CENTRAL_TO_GLOBAL: + case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL: + source = centralCname; + target = topic.getGlobalMrURL(); + break; + case REPLICATION_GLOBAL_TO_CENTRAL: + case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE: // NOTE: this is for G2C portion only + source = topic.getGlobalMrURL(); + target = centralCname; + break; + + case REPLICATION_EDGE_TO_FQDN_TO_GLOBAL: // NOTE: this is for E2F portion only + source = groupCentralCluster.getFqdn(); + target = topic.getGlobalMrURL(); + break; + + case REPLICATION_FQDN_TO_GLOBAL: + source = groupCentralCluster.getFqdn(); + target = topic.getGlobalMrURL(); + break; + + case REPLICATION_GLOBAL_TO_FQDN: + case REPLICATION_GLOBAL_TO_FQDN_TO_EDGE: // NOTE: this is for G2F portion only + source = topic.getGlobalMrURL(); + target = groupCentralCluster.getFqdn(); + break; + + case REPLICATION_FQDN_TO_EDGE: + case REPLICATION_EDGE_TO_FQDN: + case REPLICATION_EDGE_TO_CENTRAL: + case REPLICATION_CENTRAL_TO_EDGE: + break; + default: + logger.error("Unexpected value for ReplicationType (" + topic.getReplicationCase() + ") for topic " + topic.getFqtn()); + anythingWrong = true; + err.setCode(400); + err.setFields("topic=" + topic.genFqtn() + " replicationCase=" + + topic.getReplicationCase()); + err.setMessage("Unexpected value for ReplicationType"); + continue; + } + } else { + logger.warn("dcaeLocation " + loc + " is neither Edge nor Central so no mmagent provisioning was done"); + anythingWrong = true; + continue; + } + if (source != null && target != null) { + try { + logger.info("Create a MM from " + source + " to " + target); + MirrorMaker mm = bridge.findNextMM(source, target, topic.getFqtn()); + mm.addTopic(topic.getFqtn()); + bridge.updateMirrorMaker(mm); + } catch (Exception ex) { + err.setCode(500); + err.setFields("mirror_maker.topic"); + err.setMessage("Unexpected condition: " + ex); + anythingWrong = true; + break; + } + } + + + } + return anythingWrong; + + } + + + /* + * Prior to 1707, we only supported EDGE_TO_CENTRAL replication. + * This was determined automatically based on presence of edge publishers and central subscribers. + * The following method is a modification of that original logic, to preserve some backwards compatibility, + * i.e. to be used when no ReplicationType is specified. + */ + public ReplicationType reviewTopic(Topic topic) { + + + if (topic.getNumClients() > 1) { + Graph graph = new Graph(topic.getClients(), false); + + String centralFqdn = new String(); + if (graph.hasCentral()) { + DmaapConfig p = (DmaapConfig) DmaapConfig.getConfig(); + centralFqdn = p.getProperty("MR.CentralCname"); + } + + Collection<String> locations = graph.getKeys(); + for (String loc : locations) { + logger.info("loc=" + loc); + MR_Cluster cluster = clusters.getMr_ClusterByLoc(loc); + if (cluster == null) { + logger.info("No MR cluster for location " + loc); + continue; + } + if (graph.hasCentral() && !graph.getCentralLoc().equals(cluster.getDcaeLocationName())) { + logger.info("Detected case for EDGE_TO_CENTRAL from " + cluster.getFqdn() + " to " + centralFqdn); + return ReplicationType.REPLICATION_EDGE_TO_CENTRAL; + + } + + } + } + + return ReplicationType.REPLICATION_NONE; + } } |