aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/openecomp/dmaapbc/service
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/openecomp/dmaapbc/service')
-rw-r--r--src/main/java/org/openecomp/dmaapbc/service/ApiService.java226
-rw-r--r--src/main/java/org/openecomp/dmaapbc/service/DR_NodeService.java139
-rw-r--r--src/main/java/org/openecomp/dmaapbc/service/DR_PubService.java12
-rw-r--r--src/main/java/org/openecomp/dmaapbc/service/DR_SubService.java96
-rw-r--r--src/main/java/org/openecomp/dmaapbc/service/DcaeLocationService.java8
-rw-r--r--src/main/java/org/openecomp/dmaapbc/service/DmaapService.java45
-rw-r--r--src/main/java/org/openecomp/dmaapbc/service/FeedService.java9
-rw-r--r--src/main/java/org/openecomp/dmaapbc/service/MR_ClientService.java215
-rw-r--r--src/main/java/org/openecomp/dmaapbc/service/MR_ClusterService.java18
-rw-r--r--src/main/java/org/openecomp/dmaapbc/service/MirrorMakerService.java63
-rw-r--r--src/main/java/org/openecomp/dmaapbc/service/TopicService.java288
11 files changed, 880 insertions, 239 deletions
diff --git a/src/main/java/org/openecomp/dmaapbc/service/ApiService.java b/src/main/java/org/openecomp/dmaapbc/service/ApiService.java
index 8690bb0..824fc71 100644
--- a/src/main/java/org/openecomp/dmaapbc/service/ApiService.java
+++ b/src/main/java/org/openecomp/dmaapbc/service/ApiService.java
@@ -20,6 +20,19 @@
package org.openecomp.dmaapbc.service;
+import static com.att.eelf.configuration.Configuration.MDC_BEGIN_TIMESTAMP;
+import static com.att.eelf.configuration.Configuration.MDC_ELAPSED_TIME;
+import static com.att.eelf.configuration.Configuration.MDC_END_TIMESTAMP;
+import static com.att.eelf.configuration.Configuration.MDC_KEY_REQUEST_ID;
+import static com.att.eelf.configuration.Configuration.MDC_PARTNER_NAME;
+import static com.att.eelf.configuration.Configuration.MDC_RESPONSE_CODE;
+import static com.att.eelf.configuration.Configuration.MDC_RESPONSE_DESC;
+import static com.att.eelf.configuration.Configuration.MDC_SERVICE_NAME;
+import static com.att.eelf.configuration.Configuration.MDC_STATUS_CODE;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -27,33 +40,116 @@ import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.xml.bind.DatatypeConverter;
-import org.apache.log4j.Logger;
import org.openecomp.dmaapbc.aaf.DmaapPerm;
+import org.openecomp.dmaapbc.authentication.ApiPolicy;
import org.openecomp.dmaapbc.authentication.AuthenticationErrorException;
-import org.openecomp.dmaapbc.authentication.DecisionPolicy;
+import org.openecomp.dmaapbc.logging.BaseLoggingClass;
import org.openecomp.dmaapbc.model.ApiError;
import org.openecomp.dmaapbc.model.Dmaap;
import org.openecomp.dmaapbc.resources.RequiredFieldException;
import org.openecomp.dmaapbc.util.DmaapConfig;
+import org.openecomp.dmaapbc.util.RandomString;
+import org.slf4j.MDC;
+
+public class ApiService extends BaseLoggingClass {
+ private class StopWatch {
+ private long clock = 0;
+ private long elapsed = 0;
+
-public class ApiService {
- static final Logger logger = Logger.getLogger(ApiService.class);
- static private String apiNamespace;
- static private boolean usePE;
+
+ public StopWatch() {
+ clock = 0;
+ elapsed = 0;
+ }
+
+ public void reset() {
+ clock = System.currentTimeMillis();
+ elapsed = 0;
+ }
+ public void stop() {
+ Long stopTime = System.currentTimeMillis();
+ elapsed += stopTime - clock;
+ clock = 0;
+ MDC.put( MDC_END_TIMESTAMP, isoFormatter.format(new Date(stopTime)));
+ MDC.put( MDC_ELAPSED_TIME, String.valueOf(elapsed));
+ }
+ public void start() {
+ if ( clock != 0 ) {
+ //not stopped
+ return;
+ }
+ clock = System.currentTimeMillis();
+ MDC.put( MDC_BEGIN_TIMESTAMP, isoFormatter.format(new Date(clock)));
+ }
+ private long getElapsed() {
+ return elapsed;
+ }
+ }
+ private String apiNamespace;
+ private boolean usePE;
+ private String uri;
+ private String uriPath;
+ private String method;
+ private String authorization;
+ private String requestId;
private ApiError err;
+ private StopWatch stopwatch;
+ public static final String ISO_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZ";
+ public final static TimeZone utc = TimeZone.getTimeZone("UTC");
+ public final static SimpleDateFormat isoFormatter = new SimpleDateFormat(ISO_FORMAT);
+
+ static {
+ isoFormatter.setTimeZone(utc);
+ }
public ApiService() {
+
+ stopwatch = new StopWatch();
+ stopwatch.start();
err = new ApiError();
+ requestId = (new RandomString(10)).nextString();
if (apiNamespace == null) {
DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
usePE = "true".equalsIgnoreCase(p.getProperty("UsePE", "false"));
apiNamespace = p.getProperty("ApiNamespace", "org.openecomp.dmaapBC.api");
}
-
+
+ logger.info( "usePE=" + usePE + " apiNamespace=" + apiNamespace);
}
-
+
+ public ApiService setAuth( String auth ) {
+ this.authorization = auth;
+ logger.info( "setAuth: authorization={} ", authorization);
+ return this;
+ }
+ private void setServiceName(){
+ String svcRequest = new String( this.method + " " + this.uriPath );
+ MDC.put(MDC_SERVICE_NAME, svcRequest );
+ }
+ public ApiService setHttpMethod( String httpMethod ) {
+ this.method = httpMethod;
+ logger.info( "setHttpMethod: method={} ", method);
+ setServiceName();
+ return this;
+ }
+ public ApiService setUriPath( String uriPath ) {
+ this.uriPath = uriPath;
+ this.uri = setUriFromPath( uriPath );
+ logger.info( "setUriPath: uriPath={} uri={}", uriPath, uri);
+ setServiceName();
+ return this;
+ }
+ private String setUriFromPath( String uriPath ) {
+ int ch = uriPath.indexOf("/");
+ if ( ch > 0 ) {
+ return( (String) uriPath.subSequence(0, ch ) );
+ } else {
+ return uriPath;
+ }
+ }
public ApiError getErr() {
return err;
@@ -105,41 +201,87 @@ public class ApiService {
err.setFields(string);
}
- private Response buildResponse() {
+ private Response buildResponse( Object obj ) {
+ stopwatch.stop();
+ MDC.put( MDC_RESPONSE_CODE, String.valueOf(err.getCode()) );
+
+ auditLogger.auditEvent( "" );
return Response.status( err.getCode())
- .entity(getErr())
+ .entity(obj)
.build();
}
- public Response response(int statusCode) {
- err.setCode(statusCode);
- return buildResponse();
+ private Response buildSuccessResponse(Object d) {
+ MDC.put( MDC_STATUS_CODE, "COMPLETE");
+ MDC.put( MDC_RESPONSE_DESC, "");
+ return buildResponse( d );
+ }
+ private Response buildErrResponse() {
+
+ MDC.put( MDC_STATUS_CODE, "ERROR");
+ MDC.put( MDC_RESPONSE_DESC, err.getMessage());
+
+ return buildResponse(getErr());
+ }
+ public Response success( Object d ) {
+ err.setCode(Status.OK.getStatusCode());
+ return buildSuccessResponse(d);
+
+ }
+ public Response success( int code, Object d ) {
+ err.setCode(code);
+ return buildSuccessResponse(d);
+ }
+
+ public Response unauthorized( String msg ) {
+ err.setCode(Status.UNAUTHORIZED.getStatusCode());
+ err.setFields( "Authorization");
+ err.setMessage( msg );
+ return buildErrResponse();
}
public Response unauthorized() {
err.setCode(Status.UNAUTHORIZED.getStatusCode());
err.setFields( "Authorization");
err.setMessage( "User credentials in HTTP Header field Authorization are not authorized for the requested action");
- return buildResponse();
+ return buildErrResponse();
}
public Response unavailable() {
err.setCode(Status.SERVICE_UNAVAILABLE.getStatusCode());
err.setMessage( "Request is unavailable due to unexpected condition");
- return buildResponse();
+ return buildErrResponse();
+ }
+ public Response notFound() {
+ err.setCode(Status.NOT_FOUND.getStatusCode());
+ err.setMessage( "Requested object not found");
+ return buildErrResponse();
+ }
+ public Response error() {
+ return buildErrResponse();
}
- public void checkAuthorization( String authorization, String uriPath, String method ) throws AuthenticationErrorException, Exception {
- if ( ! usePE ) return; // skip authorization if not enabled
- if ( authorization == null || authorization.isEmpty()) {
- String errmsg = "No basic authorization value provided ";
- logger.info( errmsg );
- throw new AuthenticationErrorException( );
- }
- if ( uriPath == null || uriPath.isEmpty()) {
+ public void checkAuthorization( String auth, String uriPath, String httpMethod ) throws AuthenticationErrorException, Exception {
+ authorization = auth;
+ setUriFromPath( uriPath );
+ method = httpMethod;
+
+ checkAuthorization();
+ }
+
+
+ public void checkAuthorization() throws AuthenticationErrorException, Exception {
+
+ MDC.put(MDC_KEY_REQUEST_ID, requestId);
+
+ logger.info("request: uri={} method={} auth={}", uri, method, authorization );
+
+ if ( uri == null || uri.isEmpty()) {
String errmsg = "No URI value provided ";
+ err.setMessage(errmsg);
logger.info( errmsg );
throw new AuthenticationErrorException( );
}
if ( method == null || method.isEmpty()) {
String errmsg = "No method value provided ";
+ err.setMessage(errmsg);
logger.info( errmsg );
throw new AuthenticationErrorException( );
}
@@ -152,25 +294,45 @@ public class ApiService {
if ( env.isEmpty() ) {
env = "boot";
}
-
- String credentials = authorization.substring("Basic".length()).trim();
+ if ( ! usePE ) return; // skip authorization if not enabled
+ if ( authorization == null || authorization.isEmpty()) {
+ String errmsg = "No basic authorization value provided ";
+ err.setMessage(errmsg);
+ logger.info( errmsg );
+ throw new AuthenticationErrorException( );
+ }
+ String credentials = authorization.substring("Basic".length()).trim();
byte[] decoded = DatatypeConverter.parseBase64Binary(credentials);
String decodedString = new String(decoded);
String[] actualCredentials = decodedString.split(":");
String ID = actualCredentials[0];
String Password = actualCredentials[1];
-
-logger.info( "User " + ID + " allowed - DecisionPolicy() not compiled in yet!" );
-/* disable until PolicyEngine avail...
+ MDC.put(MDC_PARTNER_NAME, ID);
try {
- DecisionPolicy d = new DecisionPolicy();
- DmaapPerm p = new DmaapPerm( apiNamespace + "." + uriPath, env, method );
+ ApiPolicy d = new ApiPolicy();
+ DmaapPerm p = new DmaapPerm( apiNamespace + "." + uri, env, method );
d.check( ID, Password, p);
} catch ( AuthenticationErrorException ae ) {
- logger.info( "User " + ID + " failed authentication/authorization");
+ String errmsg = "User " + ID + " failed authentication/authorization for " + apiNamespace + "." + uriPath + " " + env + " " + method;
+ logger.info( errmsg );
+ err.setMessage(errmsg);
throw ae;
}
-*/
+
+
+ }
+ public String getRequestId() {
+ return requestId;
+ }
+ public ApiService setRequestId(String requestId) {
+ if ( requestId == null || requestId.isEmpty()) {
+ this.requestId = (new RandomString(10)).nextString();
+ logger.warn( "X-ECOMP-RequestID not set in HTTP Header. Setting RequestId value to: " + this.requestId );
+ } else {
+ this.requestId = requestId;
+ }
+ MDC.put(MDC_KEY_REQUEST_ID, this.requestId);
+ return this;
}
}
diff --git a/src/main/java/org/openecomp/dmaapbc/service/DR_NodeService.java b/src/main/java/org/openecomp/dmaapbc/service/DR_NodeService.java
index c1e36aa..f791896 100644
--- a/src/main/java/org/openecomp/dmaapbc/service/DR_NodeService.java
+++ b/src/main/java/org/openecomp/dmaapbc/service/DR_NodeService.java
@@ -27,13 +27,109 @@ import java.util.Map;
import javax.ws.rs.core.Response.Status;
import org.apache.log4j.Logger;
+import org.openecomp.dmaapbc.client.DrProvConnection;
import org.openecomp.dmaapbc.database.DatabaseClass;
+import org.openecomp.dmaapbc.logging.BaseLoggingClass;
import org.openecomp.dmaapbc.model.ApiError;
import org.openecomp.dmaapbc.model.DR_Node;
import org.openecomp.dmaapbc.model.DmaapObject.DmaapObject_Status;
-public class DR_NodeService {
- static final Logger logger = Logger.getLogger(DR_NodeService.class);
+public class DR_NodeService extends BaseLoggingClass {
+ private class DrProv {
+ String currentNodes;
+ String currentStaticNodes;
+
+ private String getX( String X, ApiError apiError ) {
+
+ DrProvConnection prov = new DrProvConnection();
+ prov.makeNodesConnection( X );
+ String resp = prov.doGetNodes( apiError );
+ logger.info( "rc=" + apiError.getCode() );
+ return resp;
+ }
+
+ private void setX( String X, String list, ApiError apiError ) {
+ DrProvConnection prov = new DrProvConnection();
+ prov.makeNodesConnection( X, list );
+ String resp = prov.doPutNodes( apiError );
+ }
+
+ private String removeFromList( String aNode, String aList ) {
+ String[] nodeList = aList.split("\\|");
+ StringBuilder res = new StringBuilder();
+ for ( String n: nodeList ) {
+ logger.info( "compare existing node " + n + " vs " + aNode );
+ if ( ! n.equals(aNode)) {
+ if (res.length() > 0 ) {
+ res.append( "|" );
+ }
+ res.append(n);
+ }
+ }
+ logger.info( "result=" + res.toString() );
+ return res.toString();
+ }
+
+ boolean containsNode( String aNode , ApiError apiError ){
+
+ //DrProvConnection prov = new DrProvConnection();
+ //prov.makeNodesConnection();
+ currentNodes = getX( "NODES", apiError );
+ if ( ! apiError.is2xx() || currentNodes == null ) {
+ return false;
+ }
+ logger.info( "NODES now=" + currentNodes );
+ String[] nodeList = currentNodes.split("\\|");
+ for( String n: nodeList ) {
+ logger.info( "compare existing node " + n + " vs " + aNode );
+ if ( n.equals(aNode) ) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ void addNode( String aNode, ApiError apiError ) {
+
+ currentNodes = currentNodes + "|" + aNode;
+ setX( "NODES", currentNodes, apiError );
+
+
+ }
+ void removeNode( String aNode, ApiError apiError ) {
+ currentNodes = removeFromList( aNode, currentNodes );
+ setX( "NODES", currentNodes, apiError );
+ }
+
+ public boolean containsStaticNode(String aNode, ApiError apiError) {
+
+ //DrProvConnection prov = new DrProvConnection();
+ //prov.makeNodesConnection();
+ currentStaticNodes = getX( "STATIC_ROUTING_NODES", apiError );
+ if (! apiError.is2xx() || currentStaticNodes == null ) {
+ return false;
+ }
+ logger.info( "STATIC_ROUTING_NODES now=" + currentNodes );
+ String[] nodeList = currentStaticNodes.split("\\|");
+ for( String n: nodeList ) {
+ logger.info( "compare existing node " + n + " vs " + aNode );
+ if ( n.equals(aNode) ) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+
+ public void addStaticNode(String aNode, ApiError apiError) {
+ currentStaticNodes = currentStaticNodes + "|" + aNode;
+ setX( "STATIC_ROUTING_NODES", currentStaticNodes, apiError );
+ }
+ void removeStaticNode( String aNode, ApiError apiError ) {
+ currentStaticNodes = removeFromList( aNode, currentStaticNodes );
+ setX( "STATIC_ROUTING_NODES", currentStaticNodes, apiError );
+ }
+ }
private Map<String, DR_Node> dr_nodes = DatabaseClass.getDr_nodes();
public Map<String, DR_Node> getDr_Nodes() {
@@ -65,6 +161,25 @@ public class DR_NodeService {
apiError.setMessage( "Node " + fqdn + " already exists");
return null;
}
+
+ DrProv drProv = new DrProv();
+
+ if ( ! drProv.containsNode( node.getFqdn(), apiError ) && apiError.is2xx() ) {
+ drProv.addNode( node.getFqdn(), apiError );
+ }
+ if ( ! apiError.is2xx()) {
+ return null;
+ }
+ DcaeLocationService locService = new DcaeLocationService();
+ if ( locService.isEdgeLocation( node.getDcaeLocationName()) && ! drProv.containsStaticNode( node.getFqdn(), apiError ) ) {
+ if ( apiError.is2xx() ) {
+ drProv.addStaticNode( node.getFqdn(), apiError );
+ }
+ }
+ if ( ! apiError.is2xx()) {
+ return null;
+ }
+
node.setLastMod();
node.setStatus(DmaapObject_Status.VALID);
dr_nodes.put( node.getFqdn(), node );
@@ -94,11 +209,24 @@ public class DR_NodeService {
apiError.setMessage( "Node " + nodeName + " does not exist");
return null;
}
+
+ DrProv drProv = new DrProv();
+ if ( drProv.containsNode( old.getFqdn(), apiError ) && apiError.is2xx() ) {
+ drProv.removeNode( old.getFqdn(), apiError );
+ }
+ DcaeLocationService locService = new DcaeLocationService();
+ if ( locService.isEdgeLocation( old.getDcaeLocationName()) && drProv.containsStaticNode( old.getFqdn(), apiError ) ) {
+ if ( apiError.is2xx()) {
+ drProv.removeStaticNode( old.getFqdn(), apiError );
+ }
+
+ }
+
apiError.setCode(200);
return dr_nodes.remove(nodeName);
- }
+ }
- public String getNodePatternAtLocation( String loc ) {
+ public String getNodePatternAtLocation( String loc, boolean allowMult ) {
logger.info( "loc=" + loc );
if ( loc == null ) {
return null;
@@ -110,6 +238,9 @@ public class DR_NodeService {
str.append( ",");
}
str.append( node.getFqdn());
+ if ( ! allowMult ) {
+ break;
+ }
}
}
logger.info( "returning " + str.toString() );
diff --git a/src/main/java/org/openecomp/dmaapbc/service/DR_PubService.java b/src/main/java/org/openecomp/dmaapbc/service/DR_PubService.java
index 8479e55..a1feb6d 100644
--- a/src/main/java/org/openecomp/dmaapbc/service/DR_PubService.java
+++ b/src/main/java/org/openecomp/dmaapbc/service/DR_PubService.java
@@ -29,12 +29,14 @@ import javax.ws.rs.core.Response.Status;
import org.apache.log4j.Logger;
import org.openecomp.dmaapbc.client.DrProvConnection;
import org.openecomp.dmaapbc.database.DatabaseClass;
+import org.openecomp.dmaapbc.logging.BaseLoggingClass;
+import org.openecomp.dmaapbc.logging.DmaapbcLogMessageEnum;
import org.openecomp.dmaapbc.model.ApiError;
import org.openecomp.dmaapbc.model.DR_Pub;
import org.openecomp.dmaapbc.model.DmaapObject.DmaapObject_Status;
-public class DR_PubService {
- static final Logger logger = Logger.getLogger(DR_PubService.class);
+public class DR_PubService extends BaseLoggingClass{
+
private Map<String, DR_Pub> dr_pubs = DatabaseClass.getDr_pubs();
private DR_NodeService nodeService = new DR_NodeService();
private static DrProvConnection prov;
@@ -77,11 +79,11 @@ public class DR_PubService {
private void addIngressRoute( DR_Pub pub, ApiError err ) {
- String nodePattern = nodeService.getNodePatternAtLocation( pub.getDcaeLocationName());
+ String nodePattern = nodeService.getNodePatternAtLocation( pub.getDcaeLocationName(), true );
if ( nodePattern != null && nodePattern.length() > 0 ) {
logger.info( "creating ingress rule: pub " + pub.getPubId() + " on feed " + pub.getFeedId() + " to " + nodePattern);
prov.makeIngressConnection( pub.getFeedId(), pub.getUsername(), "-", nodePattern);
- int rc = prov.doIngressPost(err);
+ int rc = prov.doXgressPost(err);
logger.info( "rc=" + rc + " error code=" + err.getCode() );
if ( rc != 200 ) {
@@ -94,7 +96,7 @@ public class DR_PubService {
break;
default:
- logger.warn( "unable to create ingress rule for " + pub.getPubId() + " on feed " + pub.getFeedId() + " to " + nodePattern);
+ logger.info( DmaapbcLogMessageEnum.INGRESS_CREATE_ERROR, Integer.toString(rc), pub.getPubId(), pub.getFeedId(), nodePattern);
}
}
diff --git a/src/main/java/org/openecomp/dmaapbc/service/DR_SubService.java b/src/main/java/org/openecomp/dmaapbc/service/DR_SubService.java
index a360cff..f2fc99d 100644
--- a/src/main/java/org/openecomp/dmaapbc/service/DR_SubService.java
+++ b/src/main/java/org/openecomp/dmaapbc/service/DR_SubService.java
@@ -26,38 +26,36 @@ import java.util.Map;
import javax.ws.rs.core.Response.Status;
-import org.apache.log4j.Logger;
import org.openecomp.dmaapbc.client.DrProvConnection;
import org.openecomp.dmaapbc.database.DatabaseClass;
+import org.openecomp.dmaapbc.logging.BaseLoggingClass;
+import org.openecomp.dmaapbc.logging.DmaapbcLogMessageEnum;
import org.openecomp.dmaapbc.model.ApiError;
-import org.openecomp.dmaapbc.model.DR_Pub;
import org.openecomp.dmaapbc.model.DR_Sub;
-import org.openecomp.dmaapbc.model.Feed;
-public class DR_SubService {
- static final Logger logger = Logger.getLogger(DR_SubService.class);
+public class DR_SubService extends BaseLoggingClass {
+
private Map<String, DR_Sub> dr_subs = DatabaseClass.getDr_subs();
+ private DR_NodeService nodeService = new DR_NodeService();
private String provURL;
- //private DrProvConnection prov;
+ private static DrProvConnection prov;
+
public DR_SubService( ) {
- logger.info( "Entry: DR_SubService (with no args)" );
-// prov = new DrProvConnection();
+ logger.debug( "Entry: DR_SubService (with no args)" );
}
public DR_SubService( String subURL ) {
- logger.info( "Entry: DR_SubService " + subURL );
+ logger.debug( "Entry: DR_SubService " + subURL );
provURL = subURL;
-// prov = new DrProvConnection();
-// prov.makeSubConnection( subURL );
}
public Map<String, DR_Sub> getDR_Subs() {
- logger.info( "enter getDR_Subs()");
+ logger.debug( "enter getDR_Subs()");
return dr_subs;
}
public List<DR_Sub> getAllDr_Subs() {
- logger.info( "enter getAllDR_Subs()");
+ logger.debug( "enter getAllDR_Subs()");
return new ArrayList<DR_Sub>(dr_subs.values());
}
@@ -72,7 +70,7 @@ public class DR_SubService {
return someSubs;
}
public DR_Sub getDr_Sub( String key, ApiError apiError ) {
- logger.info( "enter getDR_Sub()");
+ logger.debug( "enter getDR_Sub()");
DR_Sub sub = dr_subs.get( key );
if ( sub == null ) {
apiError.setCode(Status.NOT_FOUND.getStatusCode());
@@ -85,11 +83,11 @@ public class DR_SubService {
}
public DR_Sub addDr_Sub( DR_Sub sub, ApiError apiError ) {
- logger.info( "enter addDR_Subs()");
- DrProvConnection prov = new DrProvConnection();
- prov.makeSubConnection( provURL );
- String resp = prov.doPostDr_Sub( sub );
- logger.info( "resp=" + resp );
+ logger.debug( "enter addDR_Subs()");
+ prov = new DrProvConnection();
+ prov.makeSubPostConnection( provURL );
+ String resp = prov.doPostDr_Sub( sub, apiError );
+ logger.debug( "resp=" + resp );
DR_Sub snew = null;
@@ -97,6 +95,7 @@ public class DR_SubService {
snew = new DR_Sub( resp );
snew.setDcaeLocationName(sub.getDcaeLocationName());
snew.setLastMod();
+ addEgressRoute( snew, apiError );
dr_subs.put( snew.getSubId(), snew );
apiError.setCode(200);
} else {
@@ -106,25 +105,58 @@ public class DR_SubService {
return snew;
}
+ private void addEgressRoute( DR_Sub sub, ApiError err ) {
+ String nodePattern = nodeService.getNodePatternAtLocation( sub.getDcaeLocationName(), false );
+ if ( nodePattern != null && nodePattern.length() > 0 ) {
+ logger.info( "creating egress rule: sub " + sub.getSubId() + " on feed " + sub.getFeedId() + " to " + nodePattern);
+ prov.makeEgressConnection( sub.getSubId(), nodePattern);
+ int rc = prov.doXgressPost(err);
+ logger.info( "rc=" + rc + " error code=" + err.getCode() );
+
+ if ( rc != 200 ) {
+ switch( rc ) {
+ case 403:
+ logger.error( "Not authorized for DR egress API");
+ err.setCode(500);
+ err.setMessage("API deployment/configuration error - contact support");
+ err.setFields( "PROV_AUTH_ADDRESSES");
+ break;
+
+ default:
+ logger.info( DmaapbcLogMessageEnum.EGRESS_CREATE_ERROR, Integer.toString(rc), sub.getSubId(), sub.getFeedId(), nodePattern);
+ }
+ }
+
+ }
+ }
+
public DR_Sub updateDr_Sub( DR_Sub obj, ApiError apiError ) {
- logger.info( "enter updateDR_Subs()");
+ logger.debug( "enter updateDR_Subs()");
- DR_Sub sub = dr_subs.get( obj.getSubId() );
- if ( sub == null ) {
- apiError.setCode(Status.NOT_FOUND.getStatusCode());
- apiError.setFields( "subId");
- apiError.setMessage("subId " + obj.getSubId() + " not found");
- return null;
- }
- sub.setLastMod();
- dr_subs.put( sub.getSubId(), sub );
- apiError.setCode(200);
- return sub;
+ DrProvConnection prov = new DrProvConnection();
+ prov.makeSubPutConnection( obj.getSubId() );
+ String resp = prov.doPutDr_Sub( obj, apiError );
+ logger.debug( "resp=" + resp );
+
+ DR_Sub snew = null;
+
+ if ( resp != null ) {
+ snew = new DR_Sub( resp );
+ snew.setDcaeLocationName(obj.getDcaeLocationName());
+ snew.setLastMod();
+ dr_subs.put( snew.getSubId(), snew );
+ apiError.setCode(200);
+ } else if ( apiError.is2xx()) {
+ apiError.setCode(400);
+ apiError.setMessage("unexpected empty response from DR Prov");
+ }
+
+ return snew;
}
public void removeDr_Sub( String key, ApiError apiError ) {
- logger.info( "enter removeDR_Subs()");
+ logger.debug( "enter removeDR_Subs()");
DR_Sub sub = dr_subs.get( key );
if ( sub == null ) {
apiError.setCode(Status.NOT_FOUND.getStatusCode());
diff --git a/src/main/java/org/openecomp/dmaapbc/service/DcaeLocationService.java b/src/main/java/org/openecomp/dmaapbc/service/DcaeLocationService.java
index f88852a..4451ff8 100644
--- a/src/main/java/org/openecomp/dmaapbc/service/DcaeLocationService.java
+++ b/src/main/java/org/openecomp/dmaapbc/service/DcaeLocationService.java
@@ -82,5 +82,13 @@ public class DcaeLocationService {
}
return "aCentralLocation"; // default value that is obvious to see is wrong
}
+
+ public boolean isEdgeLocation(String aName) {
+ DcaeLocation loc = dcaeLocations.get(aName);
+ if ( ! loc.isCentral() ) {
+ return true;
+ }
+ return false;
+ }
}
diff --git a/src/main/java/org/openecomp/dmaapbc/service/DmaapService.java b/src/main/java/org/openecomp/dmaapbc/service/DmaapService.java
index 47cc271..c673427 100644
--- a/src/main/java/org/openecomp/dmaapbc/service/DmaapService.java
+++ b/src/main/java/org/openecomp/dmaapbc/service/DmaapService.java
@@ -22,14 +22,17 @@ package org.openecomp.dmaapbc.service;
import java.util.ArrayList;
-import org.apache.log4j.Logger;
+
import org.openecomp.dmaapbc.aaf.AafService;
import org.openecomp.dmaapbc.aaf.DmaapGrant;
import org.openecomp.dmaapbc.aaf.DmaapPerm;
import org.openecomp.dmaapbc.aaf.AafService.ServiceType;
+import org.openecomp.dmaapbc.authentication.ApiPerms;
import org.openecomp.dmaapbc.database.DatabaseClass;
+import org.openecomp.dmaapbc.logging.BaseLoggingClass;
+import org.openecomp.dmaapbc.logging.DmaapbcLogMessageEnum;
import org.openecomp.dmaapbc.model.ApiError;
-import org.openecomp.dmaapbc.model.DcaeLocation;
+
import org.openecomp.dmaapbc.model.Dmaap;
import org.openecomp.dmaapbc.model.MR_Client;
import org.openecomp.dmaapbc.model.Topic;
@@ -37,8 +40,8 @@ import org.openecomp.dmaapbc.model.DmaapObject.DmaapObject_Status;
import org.openecomp.dmaapbc.util.DmaapConfig;
import org.openecomp.dmaapbc.util.Singleton;
-public class DmaapService {
- static final Logger logger = Logger.getLogger(DmaapService.class);
+public class DmaapService extends BaseLoggingClass {
+
private Singleton<Dmaap> dmaapholder = DatabaseClass.getDmaap();
@@ -69,7 +72,8 @@ public class DmaapService {
dmaapholder.update(nd);
AafService aaf = new AafService( ServiceType.AAF_Admin);
-
+ ApiPerms p = new ApiPerms();
+ p.setEnvMap();
boolean anythingWrong = setTopicMgtPerms( nd, aaf ) || createMmaTopic();
if ( anythingWrong ) {
@@ -92,7 +96,7 @@ public class DmaapService {
logger.info( "entering updateDmaap()" );
boolean anythingWrong = false;
- AafService aaf = new AafService( ServiceType.AAF_Admin);
+
Dmaap dmaap = dmaapholder.get();
// some triggers for when we attempt to reprovision perms and MMA topic:
@@ -102,6 +106,9 @@ public class DmaapService {
if ( ! dmaap.isStatusValid() || ! nd.getDmaapName().equals(dmaap.getDmaapName()) || dmaap.getVersion().equals( "0") ) {
nd.setLastMod();
dmaapholder.update(nd); //need to set this so the following perms will pick up any new vals.
+ ApiPerms p = new ApiPerms();
+ p.setEnvMap();
+ AafService aaf = new AafService( ServiceType.AAF_Admin);
anythingWrong = setTopicMgtPerms( nd, aaf ) || createMmaTopic();
}
@@ -128,7 +135,14 @@ public class DmaapService {
public String getBridgeAdminFqtn(){
Dmaap dmaap = dmaapholder.get();
- return(dmaap.getTopicNsRoot() + "." + dmaap.getDmaapName() + "." + dmaap.getBridgeAdminTopic());
+ String topic = dmaap.getBridgeAdminTopic();
+
+ // check if this is already an fqtn (contains a dot)
+ // otherwise build it
+ if ( topic.indexOf('.') < 0 ) {
+ topic = dmaap.getTopicNsRoot() + "." + dmaap.getDmaapName() + "." + dmaap.getBridgeAdminTopic();
+ }
+ return( topic );
}
private boolean setTopicMgtPerms( Dmaap nd, AafService aaf ){
@@ -164,14 +178,14 @@ public class DmaapService {
int rc = aaf.addPerm( perm );
if ( rc != 201 && rc != 409 ) {
- logger.error( "unable to add perm for "+ t + "|" + instance + "|" + action );
+ errorLogger.error( DmaapbcLogMessageEnum.AAF_UNEXPECTED_RESPONSE, Integer.toString(rc), "add perm", t + "|" + instance + "|" + action );
return true;
}
DmaapGrant grant = new DmaapGrant( perm, topicMgrRole );
rc = aaf.addGrant( grant );
if ( rc != 201 && rc != 409 ) {
- logger.error( "unable to grant to " + topicMgrRole + " perm for "+ topicFactory + "|" + instance + "|" + action );
+ errorLogger.error( DmaapbcLogMessageEnum.AAF_UNEXPECTED_RESPONSE, Integer.toString(rc), "grant to " + topicMgrRole + " perm ", topicFactory + "|" + instance + "|" + action );
return true;
}
@@ -221,11 +235,16 @@ public class DmaapService {
ApiError err = new ApiError();
TopicService svc = new TopicService();
- Topic nTopic = svc.addTopic(mmaTopic, err);
- if ( err.is2xx() || err.getCode() == 409 ) {
- return false;
+ try {
+ @SuppressWarnings("unused")
+ Topic nTopic = svc.addTopic(mmaTopic, err);
+ if ( err.is2xx() || err.getCode() == 409 ) {
+ return false;
+ }
+ } catch ( Exception e) {
+ errorLogger.error( DmaapbcLogMessageEnum.UNEXPECTED_CONDITION, " while adding Topic: " + e.getMessage());
}
- logger.error( "Unable to create topic for " + dmaap.getBridgeAdminTopic() + " err=" + err.getFields() + " fields=" + err.getFields() + " msg=" + err.getMessage());
+ errorLogger.error( DmaapbcLogMessageEnum.TOPIC_CREATE_ERROR, dmaap.getBridgeAdminTopic(), err.getFields(), err.getFields(), err.getMessage());
return rc;
diff --git a/src/main/java/org/openecomp/dmaapbc/service/FeedService.java b/src/main/java/org/openecomp/dmaapbc/service/FeedService.java
index a9af116..ae05016 100644
--- a/src/main/java/org/openecomp/dmaapbc/service/FeedService.java
+++ b/src/main/java/org/openecomp/dmaapbc/service/FeedService.java
@@ -29,14 +29,15 @@ import javax.ws.rs.core.Response.Status;
import org.apache.log4j.Logger;
import org.openecomp.dmaapbc.client.DrProvConnection;
import org.openecomp.dmaapbc.database.DatabaseClass;
+import org.openecomp.dmaapbc.logging.BaseLoggingClass;
import org.openecomp.dmaapbc.model.ApiError;
import org.openecomp.dmaapbc.model.DR_Pub;
import org.openecomp.dmaapbc.model.DR_Sub;
import org.openecomp.dmaapbc.model.Feed;
import org.openecomp.dmaapbc.model.DmaapObject.DmaapObject_Status;
-public class FeedService {
- static final Logger logger = Logger.getLogger(FeedService.class);
+public class FeedService extends BaseLoggingClass {
+
private Map<String, Feed> feeds = DatabaseClass.getFeeds();
private DR_PubService pubService = new DR_PubService();
private DR_SubService subService = new DR_SubService();
@@ -146,7 +147,7 @@ public class FeedService {
int rSize = reqPubs.size();
logger.info( "reqPubs size=" + rSize + " newPubs size=" + nSize );
if ( nSize != rSize ) {
- logger.error( "Resulting set of publishers do not match requested set of publishers " + nSize + " vs " + rSize );
+ errorLogger.error( "Resulting set of publishers do not match requested set of publishers " + nSize + " vs " + rSize );
fnew.setStatus( DmaapObject_Status.INVALID);
return false;
}
@@ -160,7 +161,7 @@ public class FeedService {
reqPub.setFeedId(newPub.getFeedId());
reqPub.setStatus(DmaapObject_Status.VALID);
if ( reqPub.getDcaeLocationName() == null ) {
- reqPub.setDcaeLocationName("dcaeLocationNotSpecified");
+ reqPub.setDcaeLocationName("notSpecified");
}
pubSvc.addDr_Pub( reqPub );
}
diff --git a/src/main/java/org/openecomp/dmaapbc/service/MR_ClientService.java b/src/main/java/org/openecomp/dmaapbc/service/MR_ClientService.java
index ac4b5c8..6ea61d5 100644
--- a/src/main/java/org/openecomp/dmaapbc/service/MR_ClientService.java
+++ b/src/main/java/org/openecomp/dmaapbc/service/MR_ClientService.java
@@ -26,27 +26,37 @@ import java.util.Map;
import javax.ws.rs.core.Response.Status;
-import org.apache.log4j.Logger;
+
import org.openecomp.dmaapbc.aaf.AafService;
import org.openecomp.dmaapbc.aaf.DmaapGrant;
import org.openecomp.dmaapbc.aaf.DmaapPerm;
import org.openecomp.dmaapbc.aaf.AafService.ServiceType;
import org.openecomp.dmaapbc.client.MrProvConnection;
import org.openecomp.dmaapbc.database.DatabaseClass;
+import org.openecomp.dmaapbc.logging.BaseLoggingClass;
import org.openecomp.dmaapbc.model.ApiError;
+import org.openecomp.dmaapbc.model.DcaeLocation;
import org.openecomp.dmaapbc.model.MR_Client;
import org.openecomp.dmaapbc.model.MR_Cluster;
import org.openecomp.dmaapbc.model.Topic;
import org.openecomp.dmaapbc.model.DmaapObject.DmaapObject_Status;
+import org.openecomp.dmaapbc.util.DmaapConfig;
-public class MR_ClientService {
- static final Logger logger = Logger.getLogger(MR_ClientService.class);
+public class MR_ClientService extends BaseLoggingClass{
+ private int deleteLevel;
private Map<String, MR_Client> mr_clients = DatabaseClass.getMr_clients();
private Map<String, MR_Cluster> clusters = DatabaseClass.getMr_clusters();
private Map<String, Topic> topics = DatabaseClass.getTopics();
+ private Map<String, DcaeLocation> locations = DatabaseClass.getDcaeLocations();
private DmaapService dmaap = new DmaapService();
+ public MR_ClientService() {
+ DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
+
+ deleteLevel = Integer.valueOf(p.getProperty("MR.ClientDeleteLevel", "0" ));
+ }
+
public Map<String, MR_Client> getMR_Clients() {
return mr_clients;
}
@@ -67,6 +77,30 @@ public class MR_ClientService {
return results;
}
+ public ArrayList<MR_Client> getClientsByLocation(String location) {
+ ArrayList<MR_Client> results = new ArrayList<MR_Client>();
+ for (Map.Entry<String, MR_Client> entry : mr_clients.entrySet())
+ {
+ MR_Client client = entry.getValue();
+ if ( location.equals(client.getDcaeLocationName() ) ) {
+ results.add( client );
+ }
+ }
+ return results;
+ }
+
+ public void refreshClients( String location ) {
+ ApiError err = new ApiError();
+ ArrayList<MR_Client> clients = getClientsByLocation( location );
+ for( MR_Client client : clients ) {
+ Topic topic = topics.get(client.getFqtn());
+ if ( topic != null ) {
+ addMr_Client( client, topic, err);
+ }
+
+
+ }
+ }
public MR_Client getMr_Client( String key, ApiError apiError ) {
MR_Client c = mr_clients.get( key );
@@ -82,39 +116,40 @@ public class MR_ClientService {
public MR_Client addMr_Client( MR_Client client, Topic topic, ApiError err ) {
if ( client.getDcaeLocationName().isEmpty()) {
- logger.error( "Client dcaeLocation that doesn't exist or not specified" );
+ logger.info( "Client dcaeLocation that doesn't exist or not specified" );
+ return null;
+ }
+ grantClientPerms( client, err);
+ if ( ! client.isStatusValid()) {
return null;
}
+ String centralFqdn = null;
+ DcaeLocation candidate = locations.get(client.getDcaeLocationName());
+ if ( candidate != null && candidate.isCentral() ) {
+ DmaapConfig p = ( DmaapConfig)DmaapConfig.getConfig();
+ centralFqdn = p.getProperty("MR.CentralCname");
+ }
MR_Cluster cluster = clusters.get( client.getDcaeLocationName());
if ( cluster != null ) {
- client.setTopicURL(cluster.genTopicURL(client.getFqtn()));
- AafService aaf = new AafService(ServiceType.AAF_TopicMgr);
-
- String instance = ":topic." + client.getFqtn();
- client.setStatus( DmaapObject_Status.VALID);
- for( String want : client.getAction() ) {
- int rc;
- DmaapPerm perm = new DmaapPerm( dmaap.getTopicPerm(), instance, want );
- DmaapGrant g = new DmaapGrant( perm, client.getClientRole() );
- rc = aaf.addGrant( g );
- if ( rc != 201 && rc != 409 ) {
- client.setStatus( DmaapObject_Status.INVALID);
- err.setCode(rc);
- err.setMessage( "Grant of " + dmaap.getTopicPerm() + "|" + instance + "|" + want + " failed for " + client.getClientRole() );
- logger.warn( err.getMessage());
+ client.setTopicURL(cluster.genTopicURL(centralFqdn, client.getFqtn()));
+ if ( centralFqdn == null ) {
+ client.setStatus( addTopicToCluster( cluster, topic, err));
+ if( ! err.is2xx() && err.getCode() != 409 ) {
+ topic.setFqtn(err.getMessage());
return null;
- }
- }
-
-
- logger.info( "cluster=" + cluster );
- MrProvConnection prov = new MrProvConnection();
- logger.info( "POST topic " + topic.getFqtn() + " to cluster " + cluster.getFqdn() + " in loc " + cluster.getDcaeLocationName());
- if ( prov.makeTopicConnection(cluster)) {
- String resp = prov.doPostTopic(topic);
- logger.info( "response: " + resp );
- if ( resp == null ) {
- client.setStatus( DmaapObject_Status.INVALID);
+ }
+
+ } else {
+ MR_ClusterService clusters = new MR_ClusterService();
+ // in 1610, MM should only exist for edge-to-central
+ // we use a cname for the central target
+ // but still need to provision topics on all central MRs
+ for( MR_Cluster central: clusters.getCentralClusters() ) {
+ client.setStatus( addTopicToCluster( central, topic, err));
+ if( ! err.is2xx() && err.getCode() != 409 ) {
+ topic.setFqtn(err.getMessage());
+ return null;
+ }
}
}
@@ -125,32 +160,66 @@ public class MR_ClientService {
}
mr_clients.put( client.getMrClientId(), client );
-
-
- //TODO: this section on updating an existing topic with a new client needs to belong someplace else
- //Topic t = topics.get(topic.getFqtn());
- /*
- int n;
- ArrayList<MR_Client> tc = topic.getClients();
- if ( tc == null ) {
- n = 0;
- tc = new ArrayList<MR_Client>();
- } else {
- n = tc.size();
- }
- logger.info( "number of existing clients for topic is " + n );
-
- logger.info( "n=" + n + " tc=" + tc + " client=" + client );
- tc.add( client );
- topic.setClients(tc);
- */
- topics.put(topic.getFqtn(), topic);
err.setCode(200);
return client;
}
+
+ private DmaapObject_Status addTopicToCluster( MR_Cluster cluster, Topic topic, ApiError err ){
+
+ MrProvConnection prov = new MrProvConnection();
+ logger.info( "POST topic " + topic.getFqtn() + " to cluster " + cluster.getFqdn() + " in loc " + cluster.getDcaeLocationName());
+ if ( prov.makeTopicConnection(cluster)) {
+ String resp = prov.doPostTopic(topic, err);
+ logger.info( "response code: " + err.getCode() );
+ if ( err.is2xx() || err.getCode() == 409 ) {
+ return DmaapObject_Status.VALID;
+ }
+ }
+ return DmaapObject_Status.INVALID;
+ }
+
+ private void grantClientPerms( MR_Client client, ApiError err) {
+ AafService aaf = new AafService(ServiceType.AAF_TopicMgr);
+
+ String instance = ":topic." + client.getFqtn();
+ client.setStatus( DmaapObject_Status.VALID);
+ for( String want : client.getAction() ) {
+ int rc;
+ DmaapPerm perm = new DmaapPerm( dmaap.getTopicPerm(), instance, want );
+ DmaapGrant g = new DmaapGrant( perm, client.getClientRole() );
+ rc = aaf.addGrant( g );
+ if ( rc != 201 && rc != 409 ) {
+ client.setStatus( DmaapObject_Status.INVALID);
+ err.setCode(rc);
+ err.setMessage( "Grant of " + dmaap.getTopicPerm() + "|" + instance + "|" + want + " failed for " + client.getClientRole() );
+ logger.warn( err.getMessage());
+ return;
+ }
+ }
+ }
+
+ private void revokeClientPerms( MR_Client client, ApiError err) {
+ AafService aaf = new AafService(ServiceType.AAF_TopicMgr);
+ String instance = ":topic." + client.getFqtn();
+ client.setStatus( DmaapObject_Status.VALID);
+ for( String want : client.getAction() ) {
+ int rc;
+ DmaapPerm perm = new DmaapPerm( dmaap.getTopicPerm(), instance, want );
+ DmaapGrant g = new DmaapGrant( perm, client.getClientRole() );
+ rc = aaf.delGrant( g );
+ if ( rc != 200 && rc != 404 ) {
+ client.setStatus( DmaapObject_Status.INVALID);
+ err.setCode(rc);
+ err.setMessage( "Revoke of " + dmaap.getTopicPerm() + "|" + instance + "|" + want + " failed for " + client.getClientRole() );
+ logger.warn( err.getMessage());
+ return;
+ }
+ }
+ }
+
public MR_Client updateMr_Client( MR_Client client, ApiError apiError ) {
MR_Client c = mr_clients.get( client.getMrClientId());
if ( c == null ) {
@@ -164,18 +233,50 @@ public class MR_ClientService {
return client;
}
- public MR_Client removeMr_Client( String key, ApiError apiError ) {
- MR_Client c = mr_clients.get( key );
- if ( c == null ) {
+ public void removeMr_Client( String key, boolean updateTopicView, ApiError apiError ) {
+ MR_Client client = mr_clients.get( key );
+ if ( client == null ) {
apiError.setCode(Status.NOT_FOUND.getStatusCode());
apiError.setFields( "mrClientId");
apiError.setMessage("mrClientId " + key + " not found" );
+ return;
} else {
apiError.setCode(200);
}
-
- return mr_clients.remove(key);
- }
+
+ if ( updateTopicView == true ) {
+
+ TopicService topics = new TopicService();
+
+ Topic t = topics.getTopic(client.getFqtn(), apiError );
+ if ( t != null ) {
+ ArrayList<MR_Client> tc = t.getClients();
+ for( MR_Client c: tc) {
+ if ( c.getMrClientId().equals(client.getMrClientId())) {
+ tc.remove(c);
+ break;
+ }
+ }
+ t.setClients(tc);
+ topics.updateTopic( t, apiError );
+ }
+ }
+
+
+ // remove from AAF
+ if ( deleteLevel >= 2 ) {
+ revokeClientPerms( client, apiError );
+ if ( ! apiError.is2xx()) {
+ return;
+ }
+ }
+ // remove from DB
+ if ( deleteLevel >= 1 ) {
+ mr_clients.remove(key);
+ }
+
+ return;
+ }
}
diff --git a/src/main/java/org/openecomp/dmaapbc/service/MR_ClusterService.java b/src/main/java/org/openecomp/dmaapbc/service/MR_ClusterService.java
index d5cb402..d96b568 100644
--- a/src/main/java/org/openecomp/dmaapbc/service/MR_ClusterService.java
+++ b/src/main/java/org/openecomp/dmaapbc/service/MR_ClusterService.java
@@ -26,15 +26,16 @@ import java.util.Map;
import javax.ws.rs.core.Response.Status;
-import org.apache.log4j.Logger;
+
+
import org.openecomp.dmaapbc.database.DatabaseClass;
+import org.openecomp.dmaapbc.logging.BaseLoggingClass;
import org.openecomp.dmaapbc.model.ApiError;
import org.openecomp.dmaapbc.model.DcaeLocation;
import org.openecomp.dmaapbc.model.MR_Cluster;
import org.openecomp.dmaapbc.model.DmaapObject.DmaapObject_Status;
-public class MR_ClusterService {
- static final Logger logger = Logger.getLogger(MR_ClusterService.class);
+public class MR_ClusterService extends BaseLoggingClass {
private Map<String, MR_Cluster> mr_clusters = DatabaseClass.getMr_clusters();
@@ -64,6 +65,17 @@ public class MR_ClusterService {
}
return null;
}
+
+ public List<MR_Cluster> getCentralClusters() {
+ DcaeLocationService locations = new DcaeLocationService();
+ List<MR_Cluster> result = new ArrayList<MR_Cluster>();
+ for( MR_Cluster c: mr_clusters.values() ) {
+ if ( locations.getDcaeLocation(c.getDcaeLocationName()).isCentral() ) {
+ result.add(c);
+ }
+ }
+ return result;
+ }
public MR_Cluster addMr_Cluster( MR_Cluster cluster, ApiError apiError ) {
logger.info( "Entry: addMr_Cluster");
diff --git a/src/main/java/org/openecomp/dmaapbc/service/MirrorMakerService.java b/src/main/java/org/openecomp/dmaapbc/service/MirrorMakerService.java
index 5d633b0..0370857 100644
--- a/src/main/java/org/openecomp/dmaapbc/service/MirrorMakerService.java
+++ b/src/main/java/org/openecomp/dmaapbc/service/MirrorMakerService.java
@@ -21,43 +21,33 @@
package org.openecomp.dmaapbc.service;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import org.apache.log4j.Logger;
-
-
-
-
-
-
-
-
-
-
-
-
-import org.openecomp.dmaapbc.aaf.AndrewDecryptor;
+import org.openecomp.dmaapbc.aaf.AafDecrypt;
+//import org.openecomp.dmaapbc.aaf.AndrewDecryptor;
import org.openecomp.dmaapbc.client.MrTopicConnection;
import org.openecomp.dmaapbc.database.DatabaseClass;
+import org.openecomp.dmaapbc.logging.BaseLoggingClass;
+import org.openecomp.dmaapbc.logging.DmaapbcLogMessageEnum;
import org.openecomp.dmaapbc.model.ApiError;
+import org.openecomp.dmaapbc.model.DmaapObject.DmaapObject_Status;
import org.openecomp.dmaapbc.model.MR_Cluster;
import org.openecomp.dmaapbc.model.MirrorMaker;
-import org.openecomp.dmaapbc.model.DmaapObject.DmaapObject_Status;
import org.openecomp.dmaapbc.util.DmaapConfig;
import org.openecomp.dmaapbc.util.RandomInteger;
-public class MirrorMakerService {
- static final Logger logger = Logger.getLogger(MirrorMakerService.class);
+public class MirrorMakerService extends BaseLoggingClass {
private Map<String, MirrorMaker> mirrors = DatabaseClass.getMirrorMakers();
private static MrTopicConnection prov;
+ private static AafDecrypt decryptor;
public MirrorMakerService() {
super();
- // TODO Auto-generated constructor stub
+
+ decryptor = new AafDecrypt();
}
// will create a MM on MMagent if needed
@@ -66,34 +56,41 @@ public class MirrorMakerService {
logger.info( "updateMirrorMaker");
DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
String provUser = p.getProperty("MM.ProvUserMechId");
- String provUserPwd = AndrewDecryptor.valueOf(p.getProperty( "MM.ProvUserPwd", "notSet" ));
+ String provUserPwd = decryptor.decrypt(p.getProperty( "MM.ProvUserPwd", "notSet" ));
prov = new MrTopicConnection( provUser, provUserPwd );
- MR_ClusterService clusters = new MR_ClusterService();
- DmaapService dmaap = new DmaapService();
- //TODO: this should find the cluster!!!!
+
+ String centralFqdn = p.getProperty("MR.CentralCname", "notSet");
- MR_Cluster central = clusters.getMr_ClusterByFQDN(mm.getTargetCluster());
- if ( central != null ) {
- prov.makeTopicConnection(central, dmaap.getBridgeAdminFqtn() );
+ DmaapService dmaap = new DmaapService();
+ MR_ClusterService clusters = new MR_ClusterService();
+
+ // in 1610, MM should only exist for edge-to-central
+ // we use a cname for the central MR cluster that is active, and provision on agent topic on that target
+ // but only send 1 message so MM Agents can read it relying on kafka delivery
+ for( MR_Cluster central: clusters.getCentralClusters() ) {
+ prov.makeTopicConnection(central, dmaap.getBridgeAdminFqtn(), centralFqdn );
ApiError resp = prov.doPostMessage(mm.createMirrorMaker());
if ( ! resp.is2xx() ) {
- //logger.error( "Unable to publish MR Bridge provisioning message. rc=" + resp.getCode() + " msg=" + resp.getMessage());
- logger.error( "Unable to publish create MM provisioning message. rc=" + resp.getCode() + " msg=" + resp.getMessage());
+
+ errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR, "create MM", Integer.toString(resp.getCode()), resp.getMessage());
mm.setStatus(DmaapObject_Status.INVALID);
} else {
- prov.makeTopicConnection(central, dmaap.getBridgeAdminFqtn() );
+ prov.makeTopicConnection(central, dmaap.getBridgeAdminFqtn(), centralFqdn );
resp = prov.doPostMessage(mm.updateWhiteList());
if ( ! resp.is2xx()) {
- logger.error( "Unable to publish MR Bridge provisioning message. rc=" + resp.getCode() + " msg=" + resp.getMessage());
+ errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR,"MR Bridge", Integer.toString(resp.getCode()), resp.getMessage());
mm.setStatus(DmaapObject_Status.INVALID);
} else {
mm.setStatus(DmaapObject_Status.VALID);
}
}
+
+ // we only want to send one message even if there are multiple central clusters
+ break;
- } else {
- logger.warn( "target cluster " + mm.getTargetCluster() + " not found!");
- }
+ }
+
+
mm.setLastMod();
return mirrors.put( mm.getMmName(), mm);
diff --git a/src/main/java/org/openecomp/dmaapbc/service/TopicService.java b/src/main/java/org/openecomp/dmaapbc/service/TopicService.java
index 0240fab..887d234 100644
--- a/src/main/java/org/openecomp/dmaapbc/service/TopicService.java
+++ b/src/main/java/org/openecomp/dmaapbc/service/TopicService.java
@@ -26,37 +26,44 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
-import org.apache.log4j.Logger;
import org.openecomp.dmaapbc.aaf.AafService;
import org.openecomp.dmaapbc.aaf.DmaapPerm;
import org.openecomp.dmaapbc.aaf.AafService.ServiceType;
import org.openecomp.dmaapbc.database.DatabaseClass;
+import org.openecomp.dmaapbc.logging.BaseLoggingClass;
+import org.openecomp.dmaapbc.logging.DmaapbcLogMessageEnum;
import org.openecomp.dmaapbc.model.ApiError;
import org.openecomp.dmaapbc.model.Dmaap;
import org.openecomp.dmaapbc.model.MR_Client;
import org.openecomp.dmaapbc.model.MR_Cluster;
import org.openecomp.dmaapbc.model.MirrorMaker;
+import org.openecomp.dmaapbc.model.ReplicationType;
import org.openecomp.dmaapbc.model.Topic;
import org.openecomp.dmaapbc.model.DmaapObject.DmaapObject_Status;
import org.openecomp.dmaapbc.util.DmaapConfig;
import org.openecomp.dmaapbc.util.Graph;
-public class TopicService {
- static final Logger logger = Logger.getLogger(TopicService.class);
+public class TopicService extends BaseLoggingClass {
+
- // TODO put these in properties file
- String topicFactory = "org.openecomp.dcae.dmaap.topicFactory";
+ // REF: https://wiki.web.att.com/pages/viewpage.action?pageId=519703122
+ private static String defaultGlobalMrHost;
+
private Map<String, Topic> mr_topics = DatabaseClass.getTopics();
private Map<String, MR_Cluster> clusters = DatabaseClass.getMr_clusters();
- private Dmaap dmaap = new DmaapService().getDmaap();
+ private static DmaapService dmaapSvc = new DmaapService();
+ private static Dmaap dmaap = new DmaapService().getDmaap();
private MR_ClientService clientService = new MR_ClientService();
private MirrorMakerService bridge = new MirrorMakerService();
-
+
+ public TopicService(){
+ DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
+ defaultGlobalMrHost = p.getProperty("MR.globalHost", "global.host.not.set");
+ }
public Map<String, Topic> getTopics() {
return mr_topics;
@@ -99,7 +106,6 @@ public class TopicService {
logger.info( "fqtn: " + nFqtn );
topic.setFqtn( nFqtn );
- boolean anythingWrong = false;
AafService aaf = new AafService(ServiceType.AAF_TopicMgr);
String t = dmaap.getTopicNsRoot() + "." + dmaap.getDmaapName() + ".mr.topic";
String instance = ":topic." + topic.getFqtn();
@@ -136,72 +142,58 @@ public class TopicService {
}
topic.setClients(clients2);
- Graph graph = new Graph( clients2 );
-
- String centralFqdn = new String();
- if ( graph.isHasCentral() ) {
- centralFqdn = clusters.get( graph.getCentralLoc() ).getFqdn();
-
- }
-
- Collection<String> locations = graph.getKeys();
- for( String loc : locations ) {
- logger.info( "loc=" + loc );
- MR_Cluster cluster = clusters.get(loc);
- logger.info( "cluster=" + cluster );
-
- if ( graph.isHasCentral() && ! centralFqdn.equals(cluster.getFqdn())) {
- logger.info( "Create a MM from " + cluster.getFqdn() + " to " + centralFqdn );
- try {
- MirrorMaker mm = bridge.getMirrorMaker(cluster.getFqdn(), centralFqdn);
- if ( mm == null ) {
- mm = new MirrorMaker(cluster.getFqdn(), centralFqdn);
- }
- mm.addTopic(topic.getFqtn());
- bridge.updateMirrorMaker(mm);
- } catch ( Exception ex ) {
- err.setCode(500);
- err.setFields( "mirror_maker.topic");
- err.setMessage("Unexpected condition: " + ex );
- anythingWrong = true;
- break;
- }
-
- }
-
+ }
+ if ( topic.getReplicationCase().involvesGlobal() ) {
+ if ( topic.getGlobalMrURL() == null ) {
+ topic.setGlobalMrURL(defaultGlobalMrHost);
}
}
-
- if ( anythingWrong ) {
+ Topic ntopic = checkForBridge( topic, err );
+ if ( ntopic == null ) {
topic.setStatus( DmaapObject_Status.INVALID);
return null;
}
-
- topic.setStatus( DmaapObject_Status.VALID);
- mr_topics.put( nFqtn, topic );
- //String prov = bridge.commit();
- //logger.info( "prov=" + prov);
+ mr_topics.put( nFqtn, ntopic );
+
err.setCode(Status.OK.getStatusCode());
- return topic;
+ return ntopic;
}
- public Topic updateTopic( Topic topic ) {
+ public Topic updateTopic( Topic topic, ApiError err ) {
+ logger.info( "Entry: updateTopic");
if ( topic.getFqtn().isEmpty()) {
return null;
}
- mr_topics.put( topic.getFqtn(), topic );
- return topic;
+ Topic ntopic = checkForBridge( topic, err );
+ if ( ntopic == null ) {
+ topic.setStatus( DmaapObject_Status.INVALID);
+ return null;
+ }
+ mr_topics.put( ntopic.getFqtn(), ntopic );
+ err.setCode(Status.OK.getStatusCode());
+ return ntopic;
}
public Topic removeTopic( String pubId, ApiError apiError ) {
- if ( mr_topics.get(pubId) == null ) {
+ Topic topic = mr_topics.get(pubId);
+ if ( topic == null ) {
apiError.setCode(Status.NOT_FOUND.getStatusCode());
apiError.setMessage("Topic " + pubId + " does not exist");
apiError.setFields("fqtn");
return null;
}
+ ArrayList<MR_Client> clients = new ArrayList<MR_Client>(clientService.getAllMrClients( pubId ));
+ for ( Iterator<MR_Client> it = clients.iterator(); it.hasNext(); ) {
+ MR_Client c = it.next();
+
+
+ clientService.removeMr_Client(c.getMrClientId(), false, apiError);
+ if ( ! apiError.is2xx()) {
+ return null;
+ }
+ }
apiError.setCode(Status.OK.getStatusCode());
return mr_topics.remove(pubId);
}
@@ -212,7 +204,7 @@ public class TopicService {
String[] Roles = { mmProvRole, mmAgentRole };
String[] actions = { "view", "pub", "sub" };
Topic bridgeAdminTopic = new Topic();
- bridgeAdminTopic.setTopicName( DatabaseClass.getDmaap().get().getBridgeAdminTopic());
+ bridgeAdminTopic.setTopicName( dmaapSvc.getBridgeAdminFqtn() );
bridgeAdminTopic.setTopicDescription( "RESERVED topic for MirroMaker Provisioning");
bridgeAdminTopic.setOwner( "DBC" );
ArrayList<MR_Client> clients = new ArrayList<MR_Client>();
@@ -234,7 +226,191 @@ public class TopicService {
return err;
}
- logger.error( "Unable to create MM provisioning topic " + bridgeAdminTopic.getFqtn());
+ errorLogger.error( DmaapbcLogMessageEnum.TOPIC_CREATE_ERROR, bridgeAdminTopic.getFqtn(), Integer.toString(err.getCode()), err.getFields(), err.getMessage());
return err;
}
+
+
+ public Topic checkForBridge( Topic topic, ApiError err ) {
+
+ if ( topic.getReplicationCase() == ReplicationType.REPLICATION_NONE ) {
+ topic.setStatus( DmaapObject_Status.VALID);
+ return topic;
+ }
+
+ boolean anythingWrong = false;
+ String centralFqdn = new String();
+ Graph graph = new Graph( topic.getClients(), true );
+
+ if ( graph.isHasCentral() ) {
+ DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
+ centralFqdn = p.getProperty("MR.CentralCname");
+ logger.info( "CentralCname=" + centralFqdn );
+ } else {
+ logger.warn( "Topic " + topic.getFqtn() + " wants to be " + topic.getReplicationCase() + " but has no cental clients");
+ }
+ Collection<String> locations = graph.getKeys();
+ for( String loc : locations ) {
+ logger.info( "loc=" + loc );
+ MR_Cluster cluster = clusters.get(loc);
+ logger.info( "cluster=" + cluster );
+
+
+
+ String source = null;
+ String target = null;
+ /*
+ * all replication rules have 1 bridge...
+ */
+ switch( topic.getReplicationCase() ) {
+ case REPLICATION_EDGE_TO_CENTRAL:
+ case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL: // NOTE: this is for E2C portion only
+ if ( graph.isHasCentral() && graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
+ break;
+ }
+ source = cluster.getFqdn();
+ target = centralFqdn;
+ break;
+ case REPLICATION_CENTRAL_TO_EDGE:
+ if ( graph.isHasCentral() && graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
+ continue;
+ }
+ source = centralFqdn;
+ target = cluster.getFqdn();
+ break;
+ case REPLICATION_CENTRAL_TO_GLOBAL:
+ if ( graph.isHasCentral() && ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
+ continue;
+ }
+ source = centralFqdn;
+ target = topic.getGlobalMrURL();
+ break;
+ case REPLICATION_GLOBAL_TO_CENTRAL:
+ case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE: // NOTE: this is for G2C portion only
+ if ( graph.isHasCentral() && ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
+ continue;
+ }
+ source = topic.getGlobalMrURL();
+ target = centralFqdn;
+ break;
+ default:
+ logger.error( "Unexpected value for ReplicationType ("+ topic.getReplicationCase() + ") for topic " + topic.getFqtn() );
+ anythingWrong = true;
+ continue;
+ }
+ if ( source != null && target != null ) {
+ try {
+ logger.info( "Create a MM from " + source + " to " + target );
+ MirrorMaker mm = bridge.getMirrorMaker( source, target);
+ if ( mm == null ) {
+ mm = new MirrorMaker(source, target);
+ }
+ mm.addTopic(topic.getFqtn());
+ bridge.updateMirrorMaker(mm);
+ } catch ( Exception ex ) {
+ err.setCode(500);
+ err.setFields( "mirror_maker.topic");
+ err.setMessage("Unexpected condition: " + ex );
+ anythingWrong = true;
+ break;
+ }
+ }
+
+
+ /*
+ * some replication rules have a 2nd bridge!
+ */
+ source = target = null;
+ switch( topic.getReplicationCase() ) {
+ case REPLICATION_EDGE_TO_CENTRAL:
+ case REPLICATION_CENTRAL_TO_EDGE:
+ case REPLICATION_CENTRAL_TO_GLOBAL:
+ case REPLICATION_GLOBAL_TO_CENTRAL:
+ continue;
+ case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL: // NOTE: this is for C2G portion only
+ if ( graph.isHasCentral() && ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
+ continue;
+ }
+ source = centralFqdn;
+ target = topic.getGlobalMrURL();
+ break;
+
+ case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE: // NOTE: this is for C2E portion only
+ if ( graph.isHasCentral() && graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
+ continue;
+ }
+ source = centralFqdn;
+ target = cluster.getFqdn();
+ break;
+ default:
+ logger.error( "Unexpected value for ReplicationType ("+ topic.getReplicationCase() + ") for topic " + topic.getFqtn() );
+ anythingWrong = true;
+ break;
+ }
+ if ( source != null && target != null ) {
+ try {
+ logger.info( "Create a MM from " + source + " to " + target );
+ MirrorMaker mm = bridge.getMirrorMaker( source, target);
+ if ( mm == null ) {
+ mm = new MirrorMaker(source, target);
+ }
+ mm.addTopic(topic.getFqtn());
+ bridge.updateMirrorMaker(mm);
+ } catch ( Exception ex ) {
+ err.setCode(500);
+ err.setFields( "mirror_maker.topic");
+ err.setMessage("Unexpected condition: " + ex );
+ anythingWrong = true;
+ break;
+ }
+ }
+
+ }
+ if ( anythingWrong ) {
+ topic.setStatus( DmaapObject_Status.INVALID);
+ return null;
+ }
+
+ topic.setStatus( DmaapObject_Status.VALID);
+ return topic;
+ }
+
+ /*
+ * Prior to 1707, we only supported EDGE_TO_CENTRAL replication.
+ * This was determined automatically based on presence of edge publishers and central subscribers.
+ * The following method is a modification of that original logic, to preserve some backwards compatibility,
+ * i.e. to be used when no ReplicationType is specified.
+ */
+ public ReplicationType reviewTopic( Topic topic ) {
+
+
+ if ( topic.getNumClients() > 1 ) {
+ Graph graph = new Graph( topic.getClients(), false );
+
+ String centralFqdn = new String();
+ if ( graph.isHasCentral() ) {
+ DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
+ centralFqdn = p.getProperty("MR.CentralCname");
+ }
+
+ Collection<String> locations = graph.getKeys();
+ for( String loc : locations ) {
+ logger.info( "loc=" + loc );
+ MR_Cluster cluster = clusters.get(loc);
+ if ( cluster == null ) {
+ logger.info( "No MR cluster for location " + loc );
+ continue;
+ }
+ if ( graph.isHasCentral() && ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
+ logger.info( "Detected case for EDGE_TO_CENTRAL from " + cluster.getFqdn() + " to " + centralFqdn );
+ return ReplicationType.REPLICATION_EDGE_TO_CENTRAL;
+
+ }
+
+ }
+ }
+
+ return ReplicationType.REPLICATION_NONE;
+ }
+
}