summaryrefslogtreecommitdiffstats
path: root/src/main/java/com/att/nsa
diff options
context:
space:
mode:
authorVarun Gudisena <vg411h@att.com>2017-08-31 10:56:56 -0500
committerVarun Gudisena <vg411h@att.com>2017-08-31 10:57:12 -0500
commitacc3ce02997219825091a2e4ed7fd493f2d440b2 (patch)
treede1db5979894f0dadd6752ae1841fdeb743a5830 /src/main/java/com/att/nsa
parent3306e2f9cc17833d5816936e0ea2973d9013b00e (diff)
Revert package name changes
Reverted package name changes to avoid any potential issues. Renamed maven group id only. Issue-id: DMAAP-74 Change-Id: I4ca4537d48e5723b2939148e5bd83645ee20dd30 Signed-off-by: Varun Gudisena <vg411h@att.com>
Diffstat (limited to 'src/main/java/com/att/nsa')
-rw-r--r--src/main/java/com/att/nsa/dmaap/DMaaPCambriaExceptionMapper.java143
-rw-r--r--src/main/java/com/att/nsa/dmaap/DMaaPWebExceptionMapper.java202
-rw-r--r--src/main/java/com/att/nsa/dmaap/HelloWorld.java42
-rw-r--r--src/main/java/com/att/nsa/dmaap/JaxrsEchoService.java91
-rw-r--r--src/main/java/com/att/nsa/dmaap/JaxrsUserService.java59
-rw-r--r--src/main/java/com/att/nsa/dmaap/filemonitor/ServicePropertiesListener.java42
-rw-r--r--src/main/java/com/att/nsa/dmaap/filemonitor/ServicePropertiesMap.java126
-rw-r--r--src/main/java/com/att/nsa/dmaap/filemonitor/ServicePropertyService.java164
-rw-r--r--src/main/java/com/att/nsa/dmaap/mmagent/CreateMirrorMaker.java43
-rw-r--r--src/main/java/com/att/nsa/dmaap/mmagent/MirrorMaker.java70
-rw-r--r--src/main/java/com/att/nsa/dmaap/mmagent/UpdateMirrorMaker.java43
-rw-r--r--src/main/java/com/att/nsa/dmaap/mmagent/UpdateWhiteList.java44
-rw-r--r--src/main/java/com/att/nsa/dmaap/service/AdminRestService.java293
-rw-r--r--src/main/java/com/att/nsa/dmaap/service/ApiKeysRestService.java254
-rw-r--r--src/main/java/com/att/nsa/dmaap/service/EventsRestService.java313
-rw-r--r--src/main/java/com/att/nsa/dmaap/service/MMRestService.java1238
-rw-r--r--src/main/java/com/att/nsa/dmaap/service/MetricsRestService.java152
-rw-r--r--src/main/java/com/att/nsa/dmaap/service/TopicRestService.java688
-rw-r--r--src/main/java/com/att/nsa/dmaap/service/TransactionRestService.java176
-rw-r--r--src/main/java/com/att/nsa/dmaap/service/UIRestServices.java198
-rw-r--r--src/main/java/com/att/nsa/dmaap/tools/ConfigTool.java818
-rw-r--r--src/main/java/com/att/nsa/dmaap/tools/ConfigToolContext.java69
-rw-r--r--src/main/java/com/att/nsa/dmaap/util/ContentLengthInterceptor.java132
-rw-r--r--src/main/java/com/att/nsa/dmaap/util/DMaaPAuthFilter.java164
-rw-r--r--src/main/java/com/att/nsa/dmaap/util/ServicePropertiesMapBean.java41
25 files changed, 5605 insertions, 0 deletions
diff --git a/src/main/java/com/att/nsa/dmaap/DMaaPCambriaExceptionMapper.java b/src/main/java/com/att/nsa/dmaap/DMaaPCambriaExceptionMapper.java
new file mode 100644
index 0000000..53c3bed
--- /dev/null
+++ b/src/main/java/com/att/nsa/dmaap/DMaaPCambriaExceptionMapper.java
@@ -0,0 +1,143 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.dmaap;
+
+
+import javax.inject.Singleton;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.ExceptionMapper;
+import javax.ws.rs.ext.Provider;
+
+import org.apache.http.HttpStatus;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import com.att.nsa.cambria.CambriaApiException;
+import com.att.nsa.cambria.exception.DMaaPErrorMessages;
+import com.att.nsa.cambria.exception.DMaaPResponseCode;
+import com.att.nsa.cambria.exception.ErrorResponse;
+
+/**
+ * Exception Mapper class to handle
+ * CambriaApiException
+ * @author author
+ *
+ */
+@Provider
+@Singleton
+public class DMaaPCambriaExceptionMapper implements ExceptionMapper<CambriaApiException>{
+
+/**
+ * Error response obj
+ */
+ private ErrorResponse errRes;
+
+/**
+ * Logger obj
+ */
+
+
+ private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(DMaaPCambriaExceptionMapper.class);
+
+
+ /**
+ * Error msg obj
+ */
+ @Autowired
+ private DMaaPErrorMessages msgs;
+
+ /**
+ * HttpServletRequest obj
+ */
+ @Context
+ private HttpServletRequest req;
+
+ /**
+ * HttpServletResponse obj
+ */
+ @Context
+ private HttpServletResponse res;
+
+ /**
+ * Contructor for DMaaPCambriaExceptionMapper
+ */
+ public DMaaPCambriaExceptionMapper() {
+ super();
+ LOGGER.info("Cambria Exception Mapper Created..");
+ }
+
+ /**
+ * The toResponse method is called when
+ * an exception of type CambriaApiException
+ * is thrown.This method will send a custom error
+ * response to the client.
+ */
+ @Override
+ public Response toResponse(CambriaApiException ex) {
+
+ LOGGER.info("Reached Cambria Exception Mapper..");
+
+ /**
+ * Cambria Generic Exception
+ */
+ if(ex instanceof CambriaApiException)
+ {
+
+ errRes = ex.getErrRes();
+ if(errRes!=null) {
+
+ Response response = Response.status(errRes.getHttpStatusCode()).header("exception",
+ errRes.getErrMapperStr()).build();
+
+ return response;
+ }
+ else
+ {
+
+ Response response = Response.status(ex.getStatus()).header("exception",
+ ex.getMessage()).build();
+
+ return response;
+ }
+
+
+ }
+ else
+ {
+ errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(), msgs.getServerUnav());
+
+ Response response = Response.status(errRes.getHttpStatusCode()).header("exception",
+ errRes.getErrMapperStr()).build();
+
+ return response;
+ }
+
+ }
+
+
+
+}
diff --git a/src/main/java/com/att/nsa/dmaap/DMaaPWebExceptionMapper.java b/src/main/java/com/att/nsa/dmaap/DMaaPWebExceptionMapper.java
new file mode 100644
index 0000000..7a9d0ba
--- /dev/null
+++ b/src/main/java/com/att/nsa/dmaap/DMaaPWebExceptionMapper.java
@@ -0,0 +1,202 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.dmaap;
+
+
+import javax.inject.Singleton;
+import javax.ws.rs.BadRequestException;
+import javax.ws.rs.InternalServerErrorException;
+import javax.ws.rs.NotAllowedException;
+import javax.ws.rs.NotAuthorizedException;
+import javax.ws.rs.NotFoundException;
+import javax.ws.rs.ServiceUnavailableException;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.ExceptionMapper;
+import javax.ws.rs.ext.Provider;
+
+import org.apache.http.HttpStatus;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import com.att.nsa.cambria.exception.DMaaPErrorMessages;
+import com.att.nsa.cambria.exception.DMaaPResponseCode;
+import com.att.nsa.cambria.exception.ErrorResponse;
+
+/**
+ * Exception Mapper class to handle
+ * Web Exceptions
+ * @author author
+ *
+ */
+@Provider
+@Singleton
+public class DMaaPWebExceptionMapper implements ExceptionMapper<WebApplicationException>{
+
+ /**
+ * Logger obj
+ */
+
+ private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(DMaaPWebExceptionMapper.class);
+ /**
+ * Error response obj
+ */
+ private ErrorResponse errRes;
+ /**
+ * Error msg obj
+ */
+ @Autowired
+ private DMaaPErrorMessages msgs;
+
+ /**
+ * Contructor for DMaaPWebExceptionMapper
+ */
+ public DMaaPWebExceptionMapper() {
+ super();
+ LOGGER.info("WebException Mapper Created..");
+ }
+
+ /**
+ * The toResponse method is called when
+ * an exception of type WebApplicationException
+ * is thrown.This method will send a custom error
+ * response to the client
+ */
+ @Override
+ public Response toResponse(WebApplicationException ex) {
+
+ LOGGER.info("Reached WebException Mapper");
+
+ /**
+ * Resource Not Found
+ */
+ if(ex instanceof NotFoundException)
+ {
+ errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,DMaaPResponseCode.RESOURCE_NOT_FOUND.
+ getResponseCode(),msgs.getNotFound());
+
+ LOGGER.info(errRes.toString());
+ Response response = Response.status(errRes.getHttpStatusCode()).header("exception",
+ errRes.getErrMapperStr()).build();
+
+ return response;
+
+ }
+ /**
+ * Internal Server Error
+ */
+ if(ex instanceof InternalServerErrorException)
+ {
+
+ int errCode = HttpStatus.SC_INTERNAL_SERVER_ERROR;
+ int dmaapErrCode = DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode();
+ String errMsg = msgs.getServerUnav();
+
+
+ if(ex.getCause().toString().contains("Json")) {
+ errCode = HttpStatus.SC_BAD_REQUEST;
+ dmaapErrCode = DMaaPResponseCode.INCORRECT_JSON.getResponseCode();
+ errMsg = ex.getCause().getMessage().substring(0, ex.getCause().getMessage().indexOf("[Source")-3);
+ }
+ else if (ex.getCause().toString().contains("UnrecognizedPropertyException")) {
+ errCode = HttpStatus.SC_BAD_REQUEST;
+ dmaapErrCode = DMaaPResponseCode.INCORRECT_JSON.getResponseCode();
+ errMsg = ex.getCause().getMessage().substring(0, ex.getCause().getMessage().indexOf("[Source")-3);
+ }
+ errRes = new ErrorResponse(errCode,dmaapErrCode,errMsg);
+
+ LOGGER.info(errRes.toString());
+ Response response = Response.status(errRes.getHttpStatusCode()).header("exception",
+ errRes.getErrMapperStr()).build();
+
+ return response;
+
+ }
+ /**
+ * UnAuthorized
+ */
+ if(ex instanceof NotAuthorizedException)
+ {
+ errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,DMaaPResponseCode.ACCESS_NOT_PERMITTED.
+ getResponseCode(),msgs.getAuthFailure());
+
+ LOGGER.info(errRes.toString());
+ Response response = Response.status(errRes.getHttpStatusCode()).header("exception",
+ errRes.getErrMapperStr()).build();
+
+ return response;
+ }
+ /**
+ * Malformed request
+ */
+ if(ex instanceof BadRequestException)
+ {
+ errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,DMaaPResponseCode.INCORRECT_JSON.
+ getResponseCode(),msgs.getBadRequest());
+
+ LOGGER.info(errRes.toString());
+ Response response = Response.status(errRes.getHttpStatusCode()).header("exception",
+ errRes.getErrMapperStr()).build();
+
+ return response;
+ }
+ /**
+ * HTTP Method not allowed
+ */
+ if(ex instanceof NotAllowedException)
+ {
+ errRes = new ErrorResponse(HttpStatus.SC_METHOD_NOT_ALLOWED,DMaaPResponseCode.METHOD_NOT_ALLOWED.
+ getResponseCode(),msgs.getMethodNotAllowed());
+
+ LOGGER.info(errRes.toString());
+ Response response = Response.status(errRes.getHttpStatusCode()).header("exception",
+ errRes.getErrMapperStr()).build();
+
+ return response;
+ }
+
+ /**
+ * Server unavailable
+ */
+ if(ex instanceof ServiceUnavailableException)
+ {
+ errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,DMaaPResponseCode.SERVER_UNAVAILABLE.
+ getResponseCode(),msgs.getServerUnav());
+
+ LOGGER.info(errRes.toString());
+ Response response = Response.status(errRes.getHttpStatusCode()).header("exception",
+ errRes.getErrMapperStr()).build();
+
+ return response;
+ }
+
+
+
+ return Response.serverError().build();
+ }
+
+
+
+
+}
+
diff --git a/src/main/java/com/att/nsa/dmaap/HelloWorld.java b/src/main/java/com/att/nsa/dmaap/HelloWorld.java
new file mode 100644
index 0000000..7dc2e0c
--- /dev/null
+++ b/src/main/java/com/att/nsa/dmaap/HelloWorld.java
@@ -0,0 +1,42 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.dmaap;
+
+import org.apache.camel.Exchange;
+
+/**
+ * Hello World Sample Camel Service
+ * @author author
+ *
+ */
+public class HelloWorld {
+ public HelloWorld () {
+ }
+ /**
+ * speak method
+ * @param e exchange
+ */
+ public final void speak(Exchange e) {
+ e.setOut(e.getIn());
+ e.getOut().setBody("Hello World!");
+ }
+} \ No newline at end of file
diff --git a/src/main/java/com/att/nsa/dmaap/JaxrsEchoService.java b/src/main/java/com/att/nsa/dmaap/JaxrsEchoService.java
new file mode 100644
index 0000000..9fcef98
--- /dev/null
+++ b/src/main/java/com/att/nsa/dmaap/JaxrsEchoService.java
@@ -0,0 +1,91 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.dmaap;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+
+import com.att.ajsc.beans.PropertiesMapBean;
+import com.att.nsa.dmaap.filemonitor.ServicePropertiesMap;
+
+/**
+ * Example JAX-RS Service
+ * @author author
+ *
+ */
+@Path("/jaxrs-services")
+public class JaxrsEchoService {
+
+ /**
+ * Logger obj
+ */
+ /*private static final Logger LOGGER = Logger
+ .getLogger(JaxrsEchoService.class);*/
+
+ private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(JaxrsEchoService.class);
+
+ /**
+ * Method ping
+ * @param input input
+ * @return str
+ */
+ @GET
+ @Path("/echo/{input}")
+ @Produces("text/plain")
+ public String ping(@PathParam("input") String input) {
+ return "Hello, " + input + ".";
+ }
+
+ /**
+ * Method to fetch property
+ * @param fileName file
+ * @param input input
+ * @return prop
+ */
+ @GET
+ @Path("/property/{fileName}/{input:.*}")
+ @Produces("text/plain")
+ public String getProperty(@PathParam("fileName") String fileName, @PathParam("input") String input) {
+ String val=null;
+ try {
+ val = ServicePropertiesMap.getProperty(fileName, input);
+ if(val == null || val.isEmpty() || val.length() < 1){
+ val = PropertiesMapBean.getProperty(fileName, input);
+ }
+ }
+ catch(Exception ex) {
+ LOGGER.info("*** Error retrieving property "+input+": "+ex);
+
+ }
+ if (val ==null) {
+ return "Property is not available";
+ }
+ return "Property value is, " + val +".";
+ }
+
+} \ No newline at end of file
diff --git a/src/main/java/com/att/nsa/dmaap/JaxrsUserService.java b/src/main/java/com/att/nsa/dmaap/JaxrsUserService.java
new file mode 100644
index 0000000..2724a51
--- /dev/null
+++ b/src/main/java/com/att/nsa/dmaap/JaxrsUserService.java
@@ -0,0 +1,59 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.dmaap;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import java.util.Map;
+import java.util.HashMap;
+
+/**
+ * Example JAX-RS User Service
+ * @author author
+ *
+ */
+@Path("/user")
+public class JaxrsUserService {
+
+ private static final Map<String,String> userIdToNameMap;
+ static {
+ userIdToNameMap = new HashMap<String,String>();
+ userIdToNameMap.put("user1","User One");
+ userIdToNameMap.put("user2","User Two");
+ }
+
+ /**
+ * Method to fetch user details
+ * @param userId user
+ * @return userDetails
+ */
+ @GET
+ @Path("/{userId}")
+ @Produces("text/plain")
+ public String lookupUser(@PathParam("userId") String userId) {
+ String name = userIdToNameMap.get(userId);
+ return name != null ? name : "unknown id";
+ }
+
+} \ No newline at end of file
diff --git a/src/main/java/com/att/nsa/dmaap/filemonitor/ServicePropertiesListener.java b/src/main/java/com/att/nsa/dmaap/filemonitor/ServicePropertiesListener.java
new file mode 100644
index 0000000..8333332
--- /dev/null
+++ b/src/main/java/com/att/nsa/dmaap/filemonitor/ServicePropertiesListener.java
@@ -0,0 +1,42 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.dmaap.filemonitor;
+
+import java.io.File;
+
+//import com.att.ssf.filemonitor.FileChangedListener;
+/**
+ * Class ServicePropertiesListener
+ * @author author
+ *
+ */
+public class ServicePropertiesListener /*implements FileChangedListener*/ {
+
+ /**
+ * Update method
+ */
+ //@Override
+ public void update(File file) throws Exception
+ {
+ ServicePropertiesMap.refresh(file);
+ }
+}
diff --git a/src/main/java/com/att/nsa/dmaap/filemonitor/ServicePropertiesMap.java b/src/main/java/com/att/nsa/dmaap/filemonitor/ServicePropertiesMap.java
new file mode 100644
index 0000000..731428d
--- /dev/null
+++ b/src/main/java/com/att/nsa/dmaap/filemonitor/ServicePropertiesMap.java
@@ -0,0 +1,126 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.dmaap.filemonitor;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * ServicePropertiesMap class
+ * @author author
+ *
+ */
+@SuppressWarnings("squid:S1118")
+public class ServicePropertiesMap
+{
+ private static HashMap<String, HashMap<String, String>> mapOfMaps =
+ new HashMap<String, HashMap<String, String>>();
+// static final Logger logger = LoggerFactory.getLogger(ServicePropertiesMap.class);
+
+ private static final EELFLogger logger = EELFManager.getInstance().getLogger(ServicePropertiesMap.class);
+ /**
+ * refresh method
+ * @param file file
+ * @throws Exception ex
+ */
+ public static void refresh(File file) throws Exception
+ {
+ try
+ {
+ logger.info("Loading properties - " + (file != null?file.getName():""));
+
+ //Store .json & .properties files into map of maps
+ String filePath = file.getPath();
+
+ if(filePath.lastIndexOf(".json")>0){
+
+ ObjectMapper om = new ObjectMapper();
+ TypeReference<HashMap<String, String>> typeRef =
+ new TypeReference<HashMap<String, String>>() {};
+ HashMap<String, String> propMap = om.readValue(file, typeRef);
+ HashMap<String, String> lcasePropMap = new HashMap<String, String>();
+ for (String key : propMap.keySet() )
+ {
+ String lcaseKey = ifNullThenEmpty(key);
+ lcasePropMap.put(lcaseKey, propMap.get(key));
+ }
+
+ mapOfMaps.put(file.getName(), lcasePropMap);
+
+
+ }else if(filePath.lastIndexOf(".properties")>0){
+ Properties prop = new Properties();
+ FileInputStream fis = new FileInputStream(file);
+ prop.load(fis);
+
+ @SuppressWarnings("unchecked")
+ HashMap<String, String> propMap = new HashMap<String, String>((Map)prop);
+
+ mapOfMaps.put(file.getName(), propMap);
+ }
+
+ logger.info("File - " + file.getName() + " is loaded into the map and the "
+ + "corresponding system properties have been refreshed");
+ }
+ catch (Exception e)
+ {
+ logger.error("File " + (file != null?file.getName():"") + " cannot be loaded into the map ", e);
+ throw new Exception("Error reading map file " + (file != null?file.getName():""), e);
+ }
+ }
+ /**
+ * Get property
+ * @param fileName fileName
+ * @param propertyKey propertyKey
+ * @return str
+ */
+ public static String getProperty(String fileName, String propertyKey)
+ {
+ HashMap<String, String> propMap = mapOfMaps.get(fileName);
+ return propMap!=null?propMap.get(ifNullThenEmpty(propertyKey)):"";
+ }
+ /**
+ * get properties
+ * @param fileName fileName
+ * @return mapProp
+ */
+ public static HashMap<String, String> getProperties(String fileName){
+ return mapOfMaps.get(fileName);
+ }
+
+ private static String ifNullThenEmpty(String key) {
+ if (key == null) {
+ return "";
+ } else {
+ return key;
+ }
+ }
+
+}
diff --git a/src/main/java/com/att/nsa/dmaap/filemonitor/ServicePropertyService.java b/src/main/java/com/att/nsa/dmaap/filemonitor/ServicePropertyService.java
new file mode 100644
index 0000000..e4f4e03
--- /dev/null
+++ b/src/main/java/com/att/nsa/dmaap/filemonitor/ServicePropertyService.java
@@ -0,0 +1,164 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.dmaap.filemonitor;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import javax.annotation.PostConstruct;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+
+//import com.att.ssf.filemonitor.FileChangedListener;
+//import com.att.ssf.filemonitor.FileMonitor;
+
+/**
+ * ServicePropertyService class
+ * @author author
+ *
+ */
+public class ServicePropertyService {
+ private boolean loadOnStartup;
+ private ServicePropertiesListener fileChangedListener;
+ private ServicePropertiesMap filePropertiesMap;
+ private String ssfFileMonitorPollingInterval;
+ private String ssfFileMonitorThreadpoolSize;
+ private List<File> fileList;
+ private static final String FILE_CHANGE_LISTENER_LOC = System
+ .getProperty("AJSC_CONF_HOME") + "/etc";
+ private static final String USER_CONFIG_FILE = "service-file-monitor.properties";
+
+ private static final EELFLogger logger = EELFManager.getInstance().getLogger(ServicePropertyService.class);
+
+ // do not remove the postConstruct annotation, init method will not be
+ // called after constructor
+ /**
+ * Init method
+ * @throws Exception ex
+ */
+ @PostConstruct
+ public void init() throws Exception {
+
+ try {
+ getFileList(FILE_CHANGE_LISTENER_LOC);
+
+// for (File file : fileList) {
+// FileChangedListener fileChangedListener = this.fileChangedListener;
+// Object filePropertiesMap = this.filePropertiesMap;
+// Method m = filePropertiesMap.getClass().getMethod(
+// "refresh", File.class);
+// m.invoke(filePropertiesMap, file);
+// FileMonitor fm = FileMonitor.getInstance();
+// fm.addFileChangedListener(file, fileChangedListener,
+// loadOnStartup);
+//
+// }
+ } catch (Exception ex) {
+ logger.error("Error creating property map ", ex);
+ }
+
+ }
+
+ private void getFileList(String dirName) throws IOException {
+ File directory = new File(dirName);
+ FileInputStream fis = null;
+
+ if (fileList == null)
+ fileList = new ArrayList<File>();
+
+ // get all the files that are ".json" or ".properties", from a directory
+ // & it's sub-directories
+ File[] fList = directory.listFiles();
+
+ for (File file : fList) {
+ // read service property files from the configuration file
+ if (file.isFile() && file.getPath().endsWith(USER_CONFIG_FILE)) {
+ try {
+ fis = new FileInputStream(file);
+ Properties prop = new Properties();
+ prop.load(fis);
+
+ for (String filePath : prop.stringPropertyNames()) {
+ fileList.add(new File(prop.getProperty(filePath)));
+ }
+ } catch (Exception ioe) {
+ logger.error("Error reading the file stream ", ioe);
+ } finally {
+ fis.close();
+ }
+ } else if (file.isDirectory()) {
+ getFileList(file.getPath());
+ }
+ }
+
+ }
+
+ public void setLoadOnStartup(boolean loadOnStartup) {
+ this.loadOnStartup = loadOnStartup;
+ }
+
+ public void setSsfFileMonitorPollingInterval(
+ String ssfFileMonitorPollingInterval) {
+ this.ssfFileMonitorPollingInterval = ssfFileMonitorPollingInterval;
+ }
+
+ public void setSsfFileMonitorThreadpoolSize(
+ String ssfFileMonitorThreadpoolSize) {
+ this.ssfFileMonitorThreadpoolSize = ssfFileMonitorThreadpoolSize;
+ }
+
+ public boolean isLoadOnStartup() {
+ return loadOnStartup;
+ }
+
+ public String getSsfFileMonitorPollingInterval() {
+ return ssfFileMonitorPollingInterval;
+ }
+
+ public String getSsfFileMonitorThreadpoolSize() {
+ return ssfFileMonitorThreadpoolSize;
+ }
+
+ public ServicePropertiesListener getFileChangedListener() {
+ return fileChangedListener;
+ }
+
+ public void setFileChangedListener(
+ ServicePropertiesListener fileChangedListener) {
+ this.fileChangedListener = fileChangedListener;
+ }
+
+ public ServicePropertiesMap getFilePropertiesMap() {
+ return filePropertiesMap;
+ }
+
+ public void setFilePropertiesMap(ServicePropertiesMap filePropertiesMap) {
+ this.filePropertiesMap = filePropertiesMap;
+ }
+}
diff --git a/src/main/java/com/att/nsa/dmaap/mmagent/CreateMirrorMaker.java b/src/main/java/com/att/nsa/dmaap/mmagent/CreateMirrorMaker.java
new file mode 100644
index 0000000..92aca38
--- /dev/null
+++ b/src/main/java/com/att/nsa/dmaap/mmagent/CreateMirrorMaker.java
@@ -0,0 +1,43 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.dmaap.mmagent;
+
+public class CreateMirrorMaker {
+ String messageID;
+ MirrorMaker createMirrorMaker;
+
+ public MirrorMaker getCreateMirrorMaker() {
+ return createMirrorMaker;
+ }
+
+ public void setCreateMirrorMaker(MirrorMaker createMirrorMaker) {
+ this.createMirrorMaker = createMirrorMaker;
+ }
+
+ public String getMessageID() {
+ return messageID;
+ }
+
+ public void setMessageID(String messageID) {
+ this.messageID = messageID;
+ }
+}
diff --git a/src/main/java/com/att/nsa/dmaap/mmagent/MirrorMaker.java b/src/main/java/com/att/nsa/dmaap/mmagent/MirrorMaker.java
new file mode 100644
index 0000000..f9e6d89
--- /dev/null
+++ b/src/main/java/com/att/nsa/dmaap/mmagent/MirrorMaker.java
@@ -0,0 +1,70 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.dmaap.mmagent;
+
+public class MirrorMaker {
+ public String name;
+ public String consumer;
+ public String producer;
+ public String whitelist;
+ public String status;
+
+ public String getStatus() {
+ return status;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getConsumer() {
+ return consumer;
+ }
+
+ public void setConsumer(String consumer) {
+ this.consumer = consumer;
+ }
+
+ public String getProducer() {
+ return producer;
+ }
+
+ public void setProducer(String producer) {
+ this.producer = producer;
+ }
+
+ public String getWhitelist() {
+ return whitelist;
+ }
+
+ public void setWhitelist(String whitelist) {
+ this.whitelist = whitelist;
+ }
+} \ No newline at end of file
diff --git a/src/main/java/com/att/nsa/dmaap/mmagent/UpdateMirrorMaker.java b/src/main/java/com/att/nsa/dmaap/mmagent/UpdateMirrorMaker.java
new file mode 100644
index 0000000..4d291f3
--- /dev/null
+++ b/src/main/java/com/att/nsa/dmaap/mmagent/UpdateMirrorMaker.java
@@ -0,0 +1,43 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.dmaap.mmagent;
+
+public class UpdateMirrorMaker {
+ String messageID;
+ MirrorMaker updateMirrorMaker;
+
+ public MirrorMaker getUpdateMirrorMaker() {
+ return updateMirrorMaker;
+ }
+
+ public void setUpdateMirrorMaker(MirrorMaker updateMirrorMaker) {
+ this.updateMirrorMaker = updateMirrorMaker;
+ }
+
+ public String getMessageID() {
+ return messageID;
+ }
+
+ public void setMessageID(String messageID) {
+ this.messageID = messageID;
+ }
+}
diff --git a/src/main/java/com/att/nsa/dmaap/mmagent/UpdateWhiteList.java b/src/main/java/com/att/nsa/dmaap/mmagent/UpdateWhiteList.java
new file mode 100644
index 0000000..616dc85
--- /dev/null
+++ b/src/main/java/com/att/nsa/dmaap/mmagent/UpdateWhiteList.java
@@ -0,0 +1,44 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.dmaap.mmagent;
+
+public class UpdateWhiteList {
+
+ String messageID;
+ MirrorMaker updateWhiteList;
+
+ public MirrorMaker getUpdateWhiteList() {
+ return updateWhiteList;
+ }
+
+ public void setUpdateWhiteList(MirrorMaker updateWhiteList) {
+ this.updateWhiteList = updateWhiteList;
+ }
+
+ public String getMessageID() {
+ return messageID;
+ }
+
+ public void setMessageID(String messageID) {
+ this.messageID = messageID;
+ }
+}
diff --git a/src/main/java/com/att/nsa/dmaap/service/AdminRestService.java b/src/main/java/com/att/nsa/dmaap/service/AdminRestService.java
new file mode 100644
index 0000000..5201dc8
--- /dev/null
+++ b/src/main/java/com/att/nsa/dmaap/service/AdminRestService.java
@@ -0,0 +1,293 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.dmaap.service;
+
+import java.io.IOException;
+import java.util.Enumeration;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.http.HttpStatus;
+//import org.apache.log4j.Logger;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import org.json.JSONException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Component;
+
+import com.att.nsa.cambria.CambriaApiException;
+import com.att.nsa.cambria.beans.DMaaPContext;
+import com.att.nsa.cambria.exception.DMaaPResponseCode;
+import com.att.nsa.cambria.exception.ErrorResponse;
+import com.att.nsa.cambria.service.AdminService;
+import com.att.nsa.cambria.utils.ConfigurationReader;
+import com.att.nsa.configs.ConfigDbException;
+import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
+
+/**
+ * Rest Service class
+ * for Admin Services
+ * @author author
+ *
+ */
+@Component
+@Path("/")
+public class AdminRestService {
+
+ /**
+ * Logger obj
+ */
+ //private static final Logger LOGGER = Logger
+ // .getLogger(AdminRestService.class);
+ private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(AdminRestService.class);
+ /**
+ * Config Reader
+ */
+ @Autowired
+ @Qualifier("configurationReader")
+ private ConfigurationReader configReader;
+
+ /**
+ * HttpServletRequest obj
+ */
+ @Context
+ private HttpServletRequest request;
+ /**
+ * HttpServletResponse obj
+ */
+ @Context
+ private HttpServletResponse response;
+ /**
+ * AdminService obj
+ */
+ @Autowired
+ private AdminService adminService;
+
+ /**
+ * Fetches a list of all the registered consumers along with their created
+ * time and last accessed details
+ *
+ * @return consumer list in json string format
+ * @throws CambriaApiException
+ * @throws AccessDeniedException
+ * @throws IOException
+ * */
+ @GET
+ @Path("/consumerCache")
+ //@Produces(MediaType.TEXT_PLAIN)
+ public void getConsumerCache() throws CambriaApiException, AccessDeniedException {
+ LOGGER.info("Fetching list of registered consumers.");
+ try {
+ adminService.showConsumerCache(getDMaaPContext());
+ LOGGER.info("Fetching Consumer Cache Successfully");
+ } catch (IOException e) {
+ LOGGER.error("Error while Fetching list of registered consumers : "
+ + e.getMessage(), e);
+
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.GET_CONSUMER_CACHE.getResponseCode(),
+ "Error while Fetching list of registered consumers " + e.getMessage());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+ }
+ }
+
+ /**
+ * Clears consumer cache
+ * @throws CambriaApiException ex
+ * @throws AccessDeniedException
+ *
+ * @throws IOException ex
+ * @throws JSONException ex
+ * */
+ @POST
+ @Path("/dropConsumerCache")
+ //@Produces(MediaType.TEXT_PLAIN)
+ public void dropConsumerCache() throws CambriaApiException, AccessDeniedException {
+ LOGGER.info("Dropping consumer cache");
+ try {
+ adminService.dropConsumerCache(getDMaaPContext());
+ LOGGER.info("Dropping Consumer Cache successfully");
+ } catch ( AccessDeniedException excp) {
+ LOGGER.error("Error while dropConsumerCache : "
+ + excp.getMessage(), excp);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.GET_BLACKLIST.getResponseCode(),
+ "Error while Fetching list of blacklist ips " + excp.getMessage());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+
+ } catch (JSONException | IOException e) {
+ LOGGER.error(
+ "Error while Dropping consumer cache : " + e.getMessage(),
+ e);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.DROP_CONSUMER_CACHE.getResponseCode(),
+ "Error while Dropping consumer cache " + e.getMessage());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+ }
+ }
+
+ /**
+ * Get list of blacklisted ips
+ * @throws CambriaApiException excp
+ */
+ @GET
+ @Path("/blacklist")
+ //@Produces(MediaType.TEXT_PLAIN)
+ public void getBlacklist() throws CambriaApiException {
+ LOGGER.info("Fetching list of blacklist ips.");
+ try {
+ Enumeration headerNames = getDMaaPContext().getRequest().getHeaderNames();
+ while (headerNames.hasMoreElements()) {
+ String key = (String) headerNames.nextElement();
+ String value = request.getHeader(key);
+
+ }
+
+ adminService.getBlacklist(getDMaaPContext());
+ LOGGER.info("Fetching list of blacklist ips Successfully");
+ }catch ( AccessDeniedException excp) {
+ LOGGER.error("Error while Fetching list of blacklist ips : "
+ + excp.getMessage(), excp);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.GET_BLACKLIST.getResponseCode(),
+ "Error while Fetching list of blacklist ips " + excp.getMessage());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+
+ } catch ( IOException excp) {
+ LOGGER.error("Error while Fetching list of blacklist ips : "
+ + excp.getMessage(), excp);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.GET_BLACKLIST.getResponseCode(),
+ "Error while Fetching list of blacklist ips " + excp.getMessage());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+
+ }
+
+ }
+
+ /**
+ * Add ip to list of blacklist ips
+ * @param ip ip
+ * @throws CambriaApiException excp
+ */
+ @POST
+ @Path("/blacklist/{ip}")
+ //@Produces(MediaType.TEXT_PLAIN)
+ public void addToBlacklist (@PathParam("ip") String ip ) throws CambriaApiException
+ {
+ LOGGER.info("Adding ip to list of blacklist ips.");
+ try {
+ adminService.addToBlacklist(getDMaaPContext(), ip);
+ LOGGER.info("Fetching list of blacklist ips Successfully");
+ } catch ( AccessDeniedException excp) {
+ LOGGER.error("Error while blacklist : "
+ + excp.getMessage(), excp);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.GET_BLACKLIST.getResponseCode(),
+ "Error while Fetching list of blacklist ips " + excp.getMessage());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+
+ } catch (IOException | ConfigDbException excp) {
+ LOGGER.error("Error while adding ip to list of blacklist ips : "
+ + excp.getMessage(), excp);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.ADD_BLACKLIST.getResponseCode(),
+ "Error while adding ip to list of blacklist ips " + excp.getMessage());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+
+ }
+
+ }
+ /**
+ * Remove ip from blacklist
+ * @param ip ip
+ * @throws CambriaApiException excp
+ * @throws AccessDeniedException excp
+ * @throws ConfigDbException excp
+ */
+ @DELETE
+ @Path("/blacklist/{ip}")
+ //@Produces(MediaType.TEXT_PLAIN)
+ public void removeFromBlacklist(@PathParam("ip") String ip) throws CambriaApiException, AccessDeniedException, ConfigDbException {
+ LOGGER.info("Fetching list of blacklist ips.");
+ try {
+ adminService.removeFromBlacklist(getDMaaPContext(), ip);
+ LOGGER.info("Fetching list of blacklist ips Successfully");
+ }catch ( AccessDeniedException excp) {
+ LOGGER.error("Error while blacklist : "
+ + excp.getMessage(), excp);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.GET_BLACKLIST.getResponseCode(),
+ "Error while removeFromBlacklist list of blacklist ips " + excp.getMessage());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+
+ } catch (IOException | ConfigDbException excp) {
+ LOGGER.error("Error while removing ip from list of blacklist ips : "
+ + excp.getMessage(), excp);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.REMOVE_BLACKLIST.getResponseCode(),
+ "Error while removing ip from list of blacklist ips " + excp.getMessage());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+
+ }
+ }
+
+ /**
+ * Create a dmaap context
+ * @return DMaaPContext
+ */
+ private DMaaPContext getDMaaPContext() {
+ DMaaPContext dmaaPContext = new DMaaPContext();
+ dmaaPContext.setConfigReader(configReader);
+ dmaaPContext.setRequest(request);
+ dmaaPContext.setResponse(response);
+ return dmaaPContext;
+ }
+
+}
diff --git a/src/main/java/com/att/nsa/dmaap/service/ApiKeysRestService.java b/src/main/java/com/att/nsa/dmaap/service/ApiKeysRestService.java
new file mode 100644
index 0000000..9f04a1f
--- /dev/null
+++ b/src/main/java/com/att/nsa/dmaap/service/ApiKeysRestService.java
@@ -0,0 +1,254 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.dmaap.service;
+
+import java.io.IOException;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.http.HttpStatus;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import org.json.JSONException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Component;
+
+import com.att.nsa.cambria.CambriaApiException;
+import com.att.nsa.cambria.beans.ApiKeyBean;
+import com.att.nsa.cambria.beans.DMaaPContext;
+import com.att.nsa.cambria.exception.DMaaPResponseCode;
+import com.att.nsa.cambria.exception.ErrorResponse;
+import com.att.nsa.cambria.service.ApiKeysService;
+import com.att.nsa.cambria.utils.ConfigurationReader;
+import com.att.nsa.configs.ConfigDbException;
+import com.att.nsa.security.db.NsaApiDb.KeyExistsException;
+import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
+
+/**
+ * This class is a CXF REST service
+ * which acts as gateway for Cambria Api
+ * Keys.
+ * @author author
+ *
+ */
+@Component
+@Path("/")
+public class ApiKeysRestService {
+
+ /**
+ * Logger obj
+ */
+ //private Logger log = Logger.getLogger(ApiKeysRestService.class.toString());
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(ApiKeysRestService.class);
+ /**
+ * HttpServletRequest obj
+ */
+ @Context
+ private HttpServletRequest request;
+
+ /**
+ * HttpServletResponse obj
+ */
+ @Context
+ private HttpServletResponse response;
+
+ /**
+ * Config Reader
+ */
+ @Autowired
+ @Qualifier("configurationReader")
+ private ConfigurationReader configReader;
+
+ /**
+ * ApiKeysService obj
+ */
+ @Autowired
+ private ApiKeysService apiKeyService;
+
+ /**
+ * Returns a list of all the existing Api keys
+ * @throws CambriaApiException
+ *
+ * @throws IOException
+ * */
+ @GET
+ public void getAllApiKeys() throws CambriaApiException {
+
+ log.info("Inside ApiKeysRestService.getAllApiKeys");
+
+ try {
+ apiKeyService.getAllApiKeys(getDmaapContext());
+ log.info("Fetching all API keys is Successful");
+ } catch (ConfigDbException | IOException e) {
+ log.error("Error while retrieving API keys: " + e);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.GENERIC_INTERNAL_ERROR.getResponseCode(),
+ "Error while retrieving API keys: "+ e.getMessage());
+ log.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+ }
+
+ }
+
+ /**
+ * Returns details of a particular api key whose <code>name</code> is passed
+ * as a parameter
+ *
+ * @param apiKeyName
+ * - name of the api key
+ * @throws CambriaApiException
+ * @throws IOException
+ * */
+ @GET
+ @Path("/{apiKey}")
+ public void getApiKey(@PathParam("apiKey") String apiKeyName) throws CambriaApiException {
+ log.info("Fetching details of api key: " + apiKeyName);
+
+ try {
+ apiKeyService.getApiKey(getDmaapContext(), apiKeyName);
+ log.info("Fetching specific API key is Successful");
+ } catch (ConfigDbException | IOException e) {
+ log.error("Error while retrieving API key details: " + e);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.GENERIC_INTERNAL_ERROR.getResponseCode(),
+ "Error while retrieving API key details: "+ e.getMessage());
+ log.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+ }
+ }
+
+
+
+ /**
+ * Creates api key using the <code>email</code> and <code>description</code>
+ *
+ * @param nsaApiKey
+ * @throws CambriaApiException
+ * @throws JSONException
+ * */
+ @POST
+ @Path("/create")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public void createApiKey(ApiKeyBean nsaApiKey) throws CambriaApiException, JSONException {
+ log.info("Creating Api Key.");
+
+ try {
+ apiKeyService.createApiKey(getDmaapContext(), nsaApiKey);
+ log.info("Creating API key is Successful");
+ } catch (KeyExistsException | ConfigDbException | IOException e) {
+ log.error("Error while Creating API key : " + e.getMessage(), e);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.GENERIC_INTERNAL_ERROR.getResponseCode(),
+ "Error while Creating API key : "+ e.getMessage());
+ log.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+ }
+
+ }
+
+ /**
+ * Updates an existing apiKey using the key name passed a parameter and the
+ * details passed.
+ *
+ * @param apiKeyName
+ * - name of the api key to be updated
+ * @param nsaApiKey
+ * @throws CambriaApiException
+ * @throws JSONException
+ * @throws IOException
+ * @throws AccessDeniedException
+ * */
+ @PUT
+ @Path("/{apiKey}")
+ public void updateApiKey(@PathParam("apiKey") String apiKeyName,
+ ApiKeyBean nsaApiKey) throws CambriaApiException, JSONException {
+ log.info("Updating Api Key.");
+
+ try {
+
+ apiKeyService
+ .updateApiKey(getDmaapContext(), apiKeyName, nsaApiKey);
+ log.error("API key updated sucessfully");
+ } catch (ConfigDbException | IOException | AccessDeniedException e) {
+ log.error("Error while Updating API key : " + apiKeyName, e);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.GENERIC_INTERNAL_ERROR.getResponseCode(),
+ "Error while Updating API key : "+ e.getMessage());
+ log.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+
+ }
+ }
+
+ /**
+ * Deletes an existing apiKey using the key name passed as a parameter.
+ *
+ * @param apiKeyName
+ * - name of the api key to be updated
+ * @throws CambriaApiException
+ * @throws IOException
+ * @throws AccessDeniedException
+ * */
+ @DELETE
+ @Path("/{apiKey}")
+ public void deleteApiKey(@PathParam("apiKey") String apiKeyName) throws CambriaApiException {
+ log.info("Deleting Api Key: " + apiKeyName);
+ try {
+ apiKeyService.deleteApiKey(getDmaapContext(), apiKeyName);
+ log.info("Api Key deleted successfully: " + apiKeyName);
+ } catch (ConfigDbException | IOException | AccessDeniedException e) {
+ log.error("Error while deleting API key : " + apiKeyName, e);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.GENERIC_INTERNAL_ERROR.getResponseCode(),
+ "Error while deleting API key : "+ e.getMessage());
+ log.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+
+ }
+ }
+
+ /**
+ * Create a dmaap context
+ * @return DMaaPContext
+ */
+ private DMaaPContext getDmaapContext() {
+ DMaaPContext dmaapContext = new DMaaPContext();
+ dmaapContext.setConfigReader(configReader);
+ dmaapContext.setRequest(request);
+ dmaapContext.setResponse(response);
+ return dmaapContext;
+ }
+
+} \ No newline at end of file
diff --git a/src/main/java/com/att/nsa/dmaap/service/EventsRestService.java b/src/main/java/com/att/nsa/dmaap/service/EventsRestService.java
new file mode 100644
index 0000000..cda431c
--- /dev/null
+++ b/src/main/java/com/att/nsa/dmaap/service/EventsRestService.java
@@ -0,0 +1,313 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.dmaap.service;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Date;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+
+import org.apache.http.HttpStatus;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Component;
+
+import com.att.nsa.cambria.CambriaApiException;
+import com.att.nsa.cambria.backends.ConsumerFactory.UnavailableException;
+import com.att.nsa.cambria.beans.DMaaPContext;
+import com.att.nsa.cambria.exception.DMaaPErrorMessages;
+import com.att.nsa.cambria.exception.DMaaPResponseCode;
+import com.att.nsa.cambria.exception.ErrorResponse;
+import com.att.nsa.cambria.metabroker.Broker.TopicExistsException;
+import com.att.nsa.cambria.service.EventsService;
+import com.att.nsa.cambria.utils.ConfigurationReader;
+import com.att.nsa.cambria.utils.Utils;
+import com.att.nsa.configs.ConfigDbException;
+import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
+import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
+import com.att.nsa.cambria.exception.DMaaPAccessDeniedException;
+/**
+ * This class is a CXF REST service which acts
+ * as gateway for MR Event Service.
+ * @author author
+ *
+ */
+@Component
+@Path("/")
+public class EventsRestService {
+
+ /**
+ * Logger obj
+ */
+ //private Logger log = Logger.getLogger(EventsRestService.class.toString());
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(EventsRestService.class);
+ /**
+ * HttpServletRequest obj
+ */
+ @Context
+ private HttpServletRequest request;
+
+ /**
+ * HttpServletResponse obj
+ */
+ @Context
+ private HttpServletResponse response;
+
+
+ /**
+ * Config Reader
+ */
+ @Autowired
+ @Qualifier("configurationReader")
+ private ConfigurationReader configReader;
+
+ @Autowired
+ private EventsService eventsService;
+
+ @Autowired
+ private DMaaPErrorMessages errorMessages;
+
+ /**
+ * This method is used to consume messages.Taking three parameter
+ * topic,consumerGroup and consumerId .Consumer decide to which topic they
+ * want to consume messages.In on consumer Group there might be many
+ * consumer may be present.
+ *
+ * @param topic
+ * specify- the topic name
+ * @param consumergroup
+ * - specify the consumer group
+ * @param consumerid
+ * -specify the consumer id
+ *
+ * handles CambriaApiException | ConfigDbException |
+ * TopicExistsException | AccessDeniedException |
+ * UnavailableException | IOException in try catch block
+ * @throws CambriaApiException
+ *
+ */
+ @GET
+ @Path("/{topic}/{consumergroup}/{consumerid}")
+ public void getEvents(@PathParam("topic") String topic, @PathParam("consumergroup")
+ String consumergroup,
+ @PathParam("consumerid") String consumerid) throws CambriaApiException {
+ // log.info("Consuming message from topic " + topic );
+ DMaaPContext dMaaPContext = getDmaapContext();
+ dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
+
+ try {
+
+ eventsService.getEvents(dMaaPContext, topic, consumergroup, consumerid);
+ }
+ catch (TopicExistsException e) {
+ log.error("Error while reading data from topic [" + topic + "].", e);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
+ DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
+ + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null,
+ consumerid,
+ request.getRemoteHost());
+ log.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+
+ }
+ catch (DMaaPAccessDeniedException | AccessDeniedException e) {
+ log.error("Error while reading data from topic [" + topic + "].", e);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
+ + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null,
+ consumerid,
+ request.getRemoteHost());
+ log.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+
+ }
+
+ catch (ConfigDbException | UnavailableException | IOException e) {
+ log.error("Error while reading data from topic [" + topic + "].", e);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
+ + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null,
+ consumerid,
+ request.getRemoteHost());
+ log.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+
+ }
+ }
+
+ /**
+ * This method is used to publish messages.Taking two parameter topic and
+ * partition.Publisher decide to which topic they want to publish message
+ * and kafka decide to which partition of topic message will send,
+ *
+ * @param topic
+ * @param msg
+ * @param partitionKey
+ *
+ * handles CambriaApiException | ConfigDbException |
+ * TopicExistsException | AccessDeniedException | IOException in
+ * try catch block
+ * @throws CambriaApiException
+ */
+
+ @POST
+ @Produces("application/json")
+ @Path("/{topic}")
+ public void pushEvents(@PathParam("topic") String topic, InputStream msg,
+ @QueryParam("partitionKey") String partitionKey) throws CambriaApiException {
+ log.info("Publishing message to topic " + topic);
+
+ try {
+ eventsService.pushEvents(getDmaapContext(), topic, msg, partitionKey, null);
+ }
+ catch ( TopicExistsException e) {
+ log.error("Error while publishing to topic [" + topic + "].", e);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
+ DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
+ + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
+ Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
+ log.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+ }
+ catch ( DMaaPAccessDeniedException | AccessDeniedException e) {
+ log.error("Error while publishing to topic [" + topic + "].", e);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
+ + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
+ Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
+ log.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+ }
+
+
+ catch (ConfigDbException | IOException | missingReqdSetting e) {
+ log.error("Error while publishing to topic [" + topic + "].", e);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
+ + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
+ Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
+ log.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+ }
+ }
+
+ /**
+ * This method is used to publish messages by passing an optional header
+ * called 'transactionId'. If the 'transactionId' is not provided in the
+ * input then a new transaction object will be created. Else the existing
+ * transaction object will be updated with the counter details.
+ *
+ * @param topic
+ * @param partitionKey
+ *
+ * handles CambriaApiException | ConfigDbException |
+ * TopicExistsException | AccessDeniedException | IOException in
+ * try catch block
+ * @throws CambriaApiException
+ */
+ @POST
+ @Produces("application/json")
+ @Path("/transaction/{topic}")
+ public void pushEventsWithTransaction(@PathParam("topic") String topic,
+ @QueryParam("partitionKey") String partitionKey) throws CambriaApiException {
+ // log.info("Publishing message with transaction id for topic " + topic
+ // );
+
+ try {
+ eventsService.pushEvents(getDmaapContext(), topic, request.getInputStream(),
+ partitionKey,
+ Utils.getFormattedDate(new Date()));
+ }
+
+ catch ( TopicExistsException e) {
+ log.error("Error while publishing to topic [" + topic + "].", e);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
+ DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
+ + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
+ Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
+ log.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+ }
+ catch ( DMaaPAccessDeniedException| AccessDeniedException e) {
+ log.error("Error while publishing to topic [" + topic + "].", e);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
+ + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
+ Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
+ log.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+ }
+
+ catch (ConfigDbException | IOException | missingReqdSetting e) {
+ log.error("Error while publishing to topic : " + topic, e);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), "Transaction-"
+ + errorMessages.getPublishMsgError() + e.getMessage(), null,
+ Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(request),
+ request.getRemoteHost(),
+ null, null);
+ log.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+
+ }
+ }
+
+ /**
+ * This method is used for taking Configuration Object,HttpServletRequest
+ * Object,HttpServletRequest HttpServletResponse Object,HttpServletSession
+ * Object.
+ *
+ * @return DMaaPContext object from where user can get Configuration
+ * Object,HttpServlet Object
+ *
+ */
+ private DMaaPContext getDmaapContext() {
+
+ DMaaPContext dmaapContext = new DMaaPContext();
+ dmaapContext.setRequest(request);
+ dmaapContext.setResponse(response);
+ dmaapContext.setConfigReader(configReader);
+
+ return dmaapContext;
+ }
+
+} \ No newline at end of file
diff --git a/src/main/java/com/att/nsa/dmaap/service/MMRestService.java b/src/main/java/com/att/nsa/dmaap/service/MMRestService.java
new file mode 100644
index 0000000..0fa396f
--- /dev/null
+++ b/src/main/java/com/att/nsa/dmaap/service/MMRestService.java
@@ -0,0 +1,1238 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.dmaap.service;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import org.json.JSONObject;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Component;
+
+import com.att.nsa.cambria.utils.ConfigurationReader;
+import com.att.nsa.cambria.utils.DMaaPResponseBuilder;
+import com.att.nsa.cambria.utils.Utils;
+import com.att.nsa.configs.ConfigDbException;
+import com.att.nsa.dmaap.mmagent.*;
+import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
+import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
+import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
+
+import edu.emory.mathcs.backport.java.util.Arrays;
+
+import com.att.ajsc.filemonitor.AJSCPropertiesMap;
+import com.att.nsa.cambria.CambriaApiException;
+import com.att.nsa.cambria.backends.ConsumerFactory.UnavailableException;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import com.att.nsa.cambria.beans.DMaaPContext;
+import com.att.nsa.cambria.constants.CambriaConstants;
+import com.att.nsa.cambria.exception.DMaaPErrorMessages;
+import com.att.nsa.cambria.metabroker.Broker.TopicExistsException;
+import com.att.nsa.cambria.security.DMaaPAAFAuthenticator;
+import com.att.nsa.cambria.security.DMaaPAAFAuthenticatorImpl;
+import com.att.nsa.cambria.service.MMService;
+
+/**
+ * Rest Service class for Mirror Maker proxy Rest Services
+ *
+ * @author <a href="mailto:"></a>
+ *
+ * @since May 25, 2016
+ */
+
+@Component
+public class MMRestService {
+
+ //private static final Logger LOGGER = Logger.getLogger(MMRestService.class);
+ private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(MMRestService.class);
+ private static final String NO_ADMIN_PERMISSION = "No Mirror Maker Admin permission.";
+ private static final String NO_USER_PERMISSION = "No Mirror Maker User permission.";
+ private static final String NO_USER_CREATE_PERMISSION = "No Mirror Maker User Create permission.";
+ private static final String NAME_DOES_NOT_MEET_REQUIREMENT = "Mirror Maker name can only contain alpha numeric";
+ private static final String INVALID_IPPORT = "This is not a valid IP:Port";
+
+ private String topic;
+ private int timeout;
+ private String consumergroup;
+ private String consumerid;
+
+ @Autowired
+ @Qualifier("configurationReader")
+ private ConfigurationReader configReader;
+
+ @Context
+ private HttpServletRequest request;
+
+ @Context
+ private HttpServletResponse response;
+
+ @Autowired
+ private MMService mirrorService;
+
+ @Autowired
+ private DMaaPErrorMessages errorMessages;
+
+ /**
+ * This method is used for taking Configuration Object,HttpServletRequest
+ * Object,HttpServletRequest HttpServletResponse Object,HttpServletSession
+ * Object.
+ *
+ * @return DMaaPContext object from where user can get Configuration
+ * Object,HttpServlet Object
+ *
+ */
+ private DMaaPContext getDmaapContext() {
+ DMaaPContext dmaapContext = new DMaaPContext();
+ dmaapContext.setRequest(request);
+ dmaapContext.setResponse(response);
+ dmaapContext.setConfigReader(configReader);
+ dmaapContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
+
+ return dmaapContext;
+ }
+
+ @POST
+ @Produces("application/json")
+ @Path("/create")
+ public void callCreateMirrorMaker(InputStream msg) {
+
+ DMaaPContext ctx = getDmaapContext();
+ if (checkMirrorMakerPermission(ctx,
+ AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeradmin.aaf"))) {
+
+ loadProperty();
+ String input = null;
+ String randomStr = getRandomNum();
+
+ InputStream inStream = null;
+ Gson gson = new Gson();
+ CreateMirrorMaker createMirrorMaker = new CreateMirrorMaker();
+
+ try {
+ input = IOUtils.toString(msg, "UTF-8");
+
+ if (input != null && input.length() > 0) {
+ input = removeExtraChar(input);
+ }
+
+ // Check if the request has CreateMirrorMaker
+ try {
+ createMirrorMaker = gson.fromJson(input, CreateMirrorMaker.class);
+
+ } catch (JsonSyntaxException ex) {
+
+ sendErrResponse(ctx, errorMessages.getIncorrectJson());
+ }
+ String name = createMirrorMaker.getCreateMirrorMaker().getName();
+ // send error message if it is not a CreateMirrorMaker request.
+ if (createMirrorMaker.getCreateMirrorMaker() == null) {
+ sendErrResponse(ctx, "This is not a CreateMirrorMaker request. Please try again.");
+ }
+
+ // MirrorMaker whitelist and status should not be passed
+ else if (createMirrorMaker.getCreateMirrorMaker().getWhitelist() != null
+ || createMirrorMaker.getCreateMirrorMaker().getStatus() != null) {
+ sendErrResponse(ctx, "This is not a CreateMirrorMaker request. Please try again.");
+ }
+
+ // if empty, blank name is entered
+ else if (StringUtils.isBlank(name)) {
+ sendErrResponse(ctx, "Name can not be empty or blank.");
+ }
+
+ // Check if the name contains only Alpha Numeric
+ else if (!isAlphaNumeric(name)) {
+ sendErrResponse(ctx, NAME_DOES_NOT_MEET_REQUIREMENT);
+
+ }
+
+ // Validate the IP and Port
+ else if (!StringUtils.isBlank(createMirrorMaker.getCreateMirrorMaker().getConsumer())
+ && !StringUtils.isBlank(createMirrorMaker.getCreateMirrorMaker().getProducer())
+ && !validateIPPort(createMirrorMaker.getCreateMirrorMaker().getConsumer())
+ || !validateIPPort(createMirrorMaker.getCreateMirrorMaker().getProducer())) {
+ sendErrResponse(ctx, INVALID_IPPORT);
+
+ }
+ // Set a random number as messageID, convert Json Object to
+ // InputStream and finally call publisher and subscriber
+ else if (isAlphaNumeric(name) && validateIPPort(createMirrorMaker.getCreateMirrorMaker().getConsumer())
+ && validateIPPort(createMirrorMaker.getCreateMirrorMaker().getProducer())) {
+
+ createMirrorMaker.setMessageID(randomStr);
+ inStream = IOUtils.toInputStream(gson.toJson(createMirrorMaker), "UTF-8");
+ callPubSub(randomStr, ctx, inStream);
+ }
+
+ } catch (IOException e) {
+
+ e.printStackTrace();
+ }
+ }
+ // Send error response if user does not provide Authorization
+ else {
+ sendErrResponse(ctx, NO_ADMIN_PERMISSION);
+ }
+ }
+
+ @POST
+ @Produces("application/json")
+ @Path("/listall")
+ public void callListAllMirrorMaker(InputStream msg) throws CambriaApiException {
+ DMaaPContext ctx = getDmaapContext();
+
+ if (checkMirrorMakerPermission(ctx,
+ AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeradmin.aaf"))) {
+
+ loadProperty();
+
+ String input = null;
+
+ try {
+ input = IOUtils.toString(msg, "UTF-8");
+
+ if (input != null && input.length() > 0) {
+ input = removeExtraChar(input);
+ }
+
+ String randomStr = getRandomNum();
+ JSONObject jsonOb = null;
+
+ try {
+ jsonOb = new JSONObject(input);
+
+ } catch (JSONException ex) {
+
+ sendErrResponse(ctx, errorMessages.getIncorrectJson());
+ }
+
+ // Check if request has listAllMirrorMaker and
+ // listAllMirrorMaker is empty
+ if (jsonOb.has("listAllMirrorMaker") && jsonOb.getJSONObject("listAllMirrorMaker").length() == 0) {
+
+ jsonOb.put("messageID", randomStr);
+ InputStream inStream = null;
+
+ try {
+ inStream = IOUtils.toInputStream(jsonOb.toString(), "UTF-8");
+
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+ }
+
+ callPubSub(randomStr, ctx, inStream);
+
+ } else {
+
+ sendErrResponse(ctx, "This is not a ListAllMirrorMaker request. Please try again.");
+ }
+
+ } catch (IOException ioe) {
+
+ ioe.printStackTrace();
+ }
+
+ } else {
+
+ sendErrResponse(getDmaapContext(), NO_ADMIN_PERMISSION);
+ }
+ }
+
+ @POST
+ @Produces("application/json")
+ @Path("/update")
+ public void callUpdateMirrorMaker(InputStream msg) throws CambriaApiException {
+
+ DMaaPContext ctx = getDmaapContext();
+ if (checkMirrorMakerPermission(ctx,
+ AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeradmin.aaf"))) {
+
+ loadProperty();
+ String input = null;
+ String randomStr = getRandomNum();
+
+ InputStream inStream = null;
+ Gson gson = new Gson();
+ UpdateMirrorMaker updateMirrorMaker = new UpdateMirrorMaker();
+
+ try {
+ input = IOUtils.toString(msg, "UTF-8");
+
+ if (input != null && input.length() > 0) {
+ input = removeExtraChar(input);
+ }
+
+ // Check if the request has UpdateMirrorMaker
+ try {
+ updateMirrorMaker = gson.fromJson(input, UpdateMirrorMaker.class);
+
+ } catch (JsonSyntaxException ex) {
+
+ sendErrResponse(ctx, errorMessages.getIncorrectJson());
+ }
+ String name = updateMirrorMaker.getUpdateMirrorMaker().getName();
+
+ // send error message if it is not a UpdateMirrorMaker request.
+ if (updateMirrorMaker.getUpdateMirrorMaker() == null) {
+ sendErrResponse(ctx, "This is not a UpdateMirrorMaker request. Please try again.");
+ }
+
+ // MirrorMaker whitelist and status should not be passed
+ else if (updateMirrorMaker.getUpdateMirrorMaker().getWhitelist() != null
+ || updateMirrorMaker.getUpdateMirrorMaker().getStatus() != null) {
+ sendErrResponse(ctx, "This is not a UpdateMirrorMaker request. Please try again.");
+ }
+
+ // if empty, blank name is entered
+ else if (StringUtils.isBlank(name)) {
+ sendErrResponse(ctx, "Name can not be empty or blank.");
+ }
+
+ // Check if the name contains only Alpha Numeric
+ else if (!isAlphaNumeric(name)) {
+ sendErrResponse(ctx, NAME_DOES_NOT_MEET_REQUIREMENT);
+
+ }
+
+ // Validate the IP and Port
+ else if (!StringUtils.isBlank(updateMirrorMaker.getUpdateMirrorMaker().getConsumer())
+ && !StringUtils.isBlank(updateMirrorMaker.getUpdateMirrorMaker().getProducer())
+ && !validateIPPort(updateMirrorMaker.getUpdateMirrorMaker().getConsumer())
+ || !validateIPPort(updateMirrorMaker.getUpdateMirrorMaker().getProducer())) {
+ sendErrResponse(ctx, INVALID_IPPORT);
+
+ }
+ // Set a random number as messageID, convert Json Object to
+ // InputStream and finally call publisher and subscriber
+ else if (isAlphaNumeric(name) && validateIPPort(updateMirrorMaker.getUpdateMirrorMaker().getConsumer())
+ && validateIPPort(updateMirrorMaker.getUpdateMirrorMaker().getProducer())) {
+
+ updateMirrorMaker.setMessageID(randomStr);
+ inStream = IOUtils.toInputStream(gson.toJson(updateMirrorMaker), "UTF-8");
+ callPubSub(randomStr, ctx, inStream);
+ }
+
+ } catch (IOException e) {
+
+ e.printStackTrace();
+ }
+ }
+ // Send error response if user does not provide Authorization
+ else {
+ sendErrResponse(ctx, NO_ADMIN_PERMISSION);
+ }
+ }
+
+ @POST
+ @Produces("application/json")
+ @Path("/delete")
+ public void callDeleteMirrorMaker(InputStream msg) throws CambriaApiException {
+ DMaaPContext ctx = getDmaapContext();
+
+ if (checkMirrorMakerPermission(ctx,
+ AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeradmin.aaf"))) {
+
+ loadProperty();
+
+ String input = null;
+
+ try {
+ input = IOUtils.toString(msg, "UTF-8");
+
+ if (input != null && input.length() > 0) {
+ input = removeExtraChar(input);
+ }
+
+ String randomStr = getRandomNum();
+ JSONObject jsonOb = null;
+
+ try {
+ jsonOb = new JSONObject(input);
+
+ } catch (JSONException ex) {
+
+ sendErrResponse(ctx, errorMessages.getIncorrectJson());
+ }
+
+ // Check if request has DeleteMirrorMaker and
+ // DeleteMirrorMaker has MirrorMaker object with name variable
+ // and check if the name contain only alpha numeric
+ if (jsonOb.has("deleteMirrorMaker") && jsonOb.getJSONObject("deleteMirrorMaker").length() == 1
+ && jsonOb.getJSONObject("deleteMirrorMaker").has("name")
+ && !StringUtils.isBlank(jsonOb.getJSONObject("deleteMirrorMaker").getString("name"))
+ && isAlphaNumeric(jsonOb.getJSONObject("deleteMirrorMaker").getString("name"))) {
+
+ jsonOb.put("messageID", randomStr);
+ InputStream inStream = null;
+
+ try {
+ inStream = IOUtils.toInputStream(jsonOb.toString(), "UTF-8");
+
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+ }
+
+ callPubSub(randomStr, ctx, inStream);
+
+ } else {
+
+ sendErrResponse(ctx, "This is not a DeleteMirrorMaker request. Please try again.");
+ }
+
+ } catch (IOException ioe) {
+
+ ioe.printStackTrace();
+ }
+
+ } else {
+
+ sendErrResponse(getDmaapContext(), NO_ADMIN_PERMISSION);
+ }
+ }
+
+ private boolean isListMirrorMaker(String msg, String messageID) {
+ String topicmsg = msg;
+ topicmsg = removeExtraChar(topicmsg);
+
+ JSONObject jObj = new JSONObject();
+ JSONArray jArray = null;
+ boolean exist = false;
+
+ if (!StringUtils.isBlank(topicmsg) && topicmsg.length() > 2) {
+ jArray = new JSONArray(topicmsg);
+
+ for (int i = 0; i < jArray.length(); i++) {
+ jObj = jArray.getJSONObject(i);
+
+ JSONObject obj = new JSONObject();
+ if (jObj.has("message")) {
+ obj = jObj.getJSONObject("message");
+ }
+ if (obj.has("messageID") && obj.get("messageID").equals(messageID) && obj.has("listMirrorMaker")) {
+ exist = true;
+ break;
+ }
+ }
+ }
+ return exist;
+ }
+
+ private void loadProperty() {
+
+ this.timeout = Integer.parseInt(
+ AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.timeout").trim());
+ this.topic = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.topic").trim();
+ this.consumergroup = AJSCPropertiesMap
+ .getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.consumergroup").trim();
+ this.consumerid = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.consumerid")
+ .trim();
+ }
+
+ private String removeExtraChar(String message) {
+ String str = message;
+ str = checkJsonFormate(str);
+
+ if (str != null && str.length() > 0) {
+ str = str.replace("\\", "");
+ str = str.replace("\"{", "{");
+ str = str.replace("}\"", "}");
+ }
+ return str;
+ }
+
+ private String getRandomNum() {
+ long random = Math.round(Math.random() * 89999) + 10000;
+ String strLong = Long.toString(random);
+ return strLong;
+ }
+
+ private boolean isAlphaNumeric(String name) {
+ String pattern = "^[a-zA-Z0-9]*$";
+ if (name.matches(pattern)) {
+ return true;
+ }
+ return false;
+ }
+
+ // This method validate IPv4
+ private boolean validateIPPort(String ipPort) {
+ String pattern = "^([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\."
+ + "([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.([01]?\\d\\d?|2[0-4]\\d|25[0-5]):"
+ + "([1-9][0-9]{0,3}|[1-5][0-9]{4}|6[0-4][0-9]{3}|65[0-4][0-9]{2}|655[0-2][0-9]|6553[0-5])$";
+ if (ipPort.matches(pattern)) {
+ return true;
+ }
+ return false;
+ }
+
+ private String checkJsonFormate(String jsonStr) {
+
+ String json = jsonStr;
+ if (jsonStr != null && jsonStr.length() > 0 && jsonStr.startsWith("[") && !jsonStr.endsWith("]")) {
+ json = json + "]";
+ }
+ return json;
+ }
+
+ private boolean checkMirrorMakerPermission(DMaaPContext ctx, String permission) {
+
+ boolean hasPermission = false;
+
+ DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
+
+ if (aaf.aafAuthentication(ctx.getRequest(), permission)) {
+ hasPermission = true;
+ }
+ return hasPermission;
+ }
+
+ private void callPubSub(String randomstr, DMaaPContext ctx, InputStream inStream) {
+ try {
+ mirrorService.pushEvents(ctx, topic, inStream, null, null);
+ long startTime = System.currentTimeMillis();
+ String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
+
+ while (!isListMirrorMaker(msgFrmSubscribe, randomstr)
+ && (System.currentTimeMillis() - startTime) < timeout) {
+ msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
+ }
+
+ JSONObject jsonObj = new JSONObject();
+ JSONObject finalJsonObj = new JSONObject();
+ JSONArray jsonArray = null;
+
+ if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
+ && isListMirrorMaker(msgFrmSubscribe, randomstr)) {
+ msgFrmSubscribe = removeExtraChar(msgFrmSubscribe);
+ jsonArray = new JSONArray(msgFrmSubscribe);
+
+ for (int i = 0; i < jsonArray.length(); i++) {
+ jsonObj = jsonArray.getJSONObject(i);
+
+ JSONObject obj = new JSONObject();
+ if (jsonObj.has("message")) {
+ obj = jsonObj.getJSONObject("message");
+ }
+ if (obj.has("messageID") && obj.get("messageID").equals(randomstr) && obj.has("listMirrorMaker")) {
+ finalJsonObj.put("listMirrorMaker", obj.get("listMirrorMaker"));
+ break;
+ }
+ }
+
+ DMaaPResponseBuilder.respondOk(ctx, finalJsonObj);
+
+ } else {
+
+ JSONObject err = new JSONObject();
+ err.append("error", "listMirrorMaker is not available, please make sure MirrorMakerAgent is running");
+ DMaaPResponseBuilder.respondOk(ctx, err);
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void sendErrResponse(DMaaPContext ctx, String errMsg) {
+ JSONObject err = new JSONObject();
+ err.append("Error", errMsg);
+
+ try {
+ DMaaPResponseBuilder.respondOk(ctx, err);
+ LOGGER.error(errMsg.toString());
+
+ } catch (JSONException | IOException e) {
+ LOGGER.error(errMsg.toString());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @POST
+ @Produces("application/json")
+ @Path("/listallwhitelist")
+ public void listWhiteList(InputStream msg) {
+
+ DMaaPContext ctx = getDmaapContext();
+ if (checkMirrorMakerPermission(ctx,
+ AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeruser.aaf"))) {
+
+ loadProperty();
+ String input = null;
+
+ try {
+ input = IOUtils.toString(msg, "UTF-8");
+
+ if (input != null && input.length() > 0) {
+ input = removeExtraChar(input);
+ }
+
+ // Check if it is correct Json object
+ JSONObject jsonOb = null;
+
+ try {
+ jsonOb = new JSONObject(input);
+
+ } catch (JSONException ex) {
+
+ sendErrResponse(ctx, errorMessages.getIncorrectJson());
+ }
+
+ // Check if the request has name and name contains only alpha
+ // numeric
+ // and check if the request has namespace and namespace contains
+ // only alpha numeric
+ if (jsonOb.length() == 2 && jsonOb.has("name") && !StringUtils.isBlank(jsonOb.getString("name"))
+ && isAlphaNumeric(jsonOb.getString("name")) && jsonOb.has("namespace")
+ && !StringUtils.isBlank(jsonOb.getString("namespace"))) {
+
+ String permission = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+ "msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString("namespace") + "|create";
+
+ // Check if the user have create permission for the
+ // namespace
+ if (checkMirrorMakerPermission(ctx, permission)) {
+
+ JSONObject listAll = new JSONObject();
+ JSONObject emptyObject = new JSONObject();
+
+ // Create a listAllMirrorMaker Json object
+ try {
+ listAll.put("listAllMirrorMaker", emptyObject);
+
+ } catch (JSONException e) {
+
+ e.printStackTrace();
+ }
+
+ // set a random number as messageID
+ String randomStr = getRandomNum();
+ listAll.put("messageID", randomStr);
+ InputStream inStream = null;
+
+ // convert listAll Json object to InputStream object
+ try {
+ inStream = IOUtils.toInputStream(listAll.toString(), "UTF-8");
+
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+ }
+ // call listAllMirrorMaker
+ mirrorService.pushEvents(ctx, topic, inStream, null, null);
+
+ // subscribe for listMirrorMaker
+ long startTime = System.currentTimeMillis();
+ String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
+
+ while (!isListMirrorMaker(msgFrmSubscribe, randomStr)
+ && (System.currentTimeMillis() - startTime) < timeout) {
+ msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
+ }
+
+ if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
+ && isListMirrorMaker(msgFrmSubscribe, randomStr)) {
+
+ JSONArray listMirrorMaker = new JSONArray();
+ listMirrorMaker = getListMirrorMaker(msgFrmSubscribe, randomStr);
+
+ String whitelist = null;
+ for (int i = 0; i < listMirrorMaker.length(); i++) {
+
+ JSONObject mm = new JSONObject();
+ mm = listMirrorMaker.getJSONObject(i);
+ String name = mm.getString("name");
+
+ if (name.equals(jsonOb.getString("name")) && mm.has("whitelist")) {
+ whitelist = mm.getString("whitelist");
+ break;
+ }
+ }
+
+ if (!StringUtils.isBlank(whitelist)) {
+
+ List<String> topicList = new ArrayList<String>();
+ List<String> finalTopicList = new ArrayList<String>();
+ topicList = Arrays.asList(whitelist.split(","));
+
+ for (String topic : topicList) {
+ if (topic != null && !topic.equals("null")
+ && getNamespace(topic).equals(jsonOb.getString("namespace"))) {
+
+ finalTopicList.add(topic);
+ }
+ }
+
+ String topicNames = "";
+
+ if (finalTopicList.size() > 0) {
+ topicNames = StringUtils.join(finalTopicList, ",");
+ }
+
+ JSONObject listAllWhiteList = new JSONObject();
+ listAllWhiteList.put("name", jsonOb.getString("name"));
+ listAllWhiteList.put("whitelist", topicNames);
+
+ DMaaPResponseBuilder.respondOk(ctx, listAllWhiteList);
+ }
+
+ } else {
+
+ JSONObject err = new JSONObject();
+ err.append("error",
+ "listWhiteList is not available, please make sure MirrorMakerAgent is running");
+ DMaaPResponseBuilder.respondOk(ctx, err);
+ }
+
+ } else {
+ sendErrResponse(ctx, NO_USER_CREATE_PERMISSION);
+ }
+
+ } else {
+
+ sendErrResponse(ctx, "This is not a ListAllWhitelist request. Please try again.");
+ }
+
+ } catch (IOException | CambriaApiException | ConfigDbException | AccessDeniedException
+ | TopicExistsException | missingReqdSetting | UnavailableException e) {
+
+ e.printStackTrace();
+ }
+ } else {
+ sendErrResponse(ctx, NO_USER_PERMISSION);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @POST
+ @Produces("application/json")
+ @Path("/createwhitelist")
+ public void createWhiteList(InputStream msg) {
+
+ DMaaPContext ctx = getDmaapContext();
+ if (checkMirrorMakerPermission(ctx,
+ AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeruser.aaf"))) {
+
+ loadProperty();
+ String input = null;
+
+ try {
+ input = IOUtils.toString(msg, "UTF-8");
+
+ if (input != null && input.length() > 0) {
+ input = removeExtraChar(input);
+ }
+
+ // Check if it is correct Json object
+ JSONObject jsonOb = null;
+
+ try {
+ jsonOb = new JSONObject(input);
+
+ } catch (JSONException ex) {
+
+ sendErrResponse(ctx, errorMessages.getIncorrectJson());
+ }
+
+ // Check if the request has name and name contains only alpha numeric,
+ // check if the request has namespace and
+ // check if the request has whitelistTopicName
+ // check if the topic name contains only alpha numeric
+ if (jsonOb.length() == 3 && jsonOb.has("name") && !StringUtils.isBlank(jsonOb.getString("name"))
+ && isAlphaNumeric(jsonOb.getString("name"))
+ && jsonOb.has("namespace") && !StringUtils.isBlank(jsonOb.getString("namespace"))
+ && jsonOb.has("whitelistTopicName") && !StringUtils.isBlank(jsonOb.getString("whitelistTopicName"))
+ && isAlphaNumeric(jsonOb.getString("whitelistTopicName").substring(jsonOb.getString("whitelistTopicName").lastIndexOf(".")+1,
+ jsonOb.getString("whitelistTopicName").length()))) {
+
+ String permission = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+ "msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString("namespace") + "|create";
+
+ // Check if the user have create permission for the
+ // namespace
+ if (checkMirrorMakerPermission(ctx, permission)) {
+
+ JSONObject listAll = new JSONObject();
+ JSONObject emptyObject = new JSONObject();
+
+ // Create a listAllMirrorMaker Json object
+ try {
+ listAll.put("listAllMirrorMaker", emptyObject);
+
+ } catch (JSONException e) {
+
+ e.printStackTrace();
+ }
+
+ // set a random number as messageID
+ String randomStr = getRandomNum();
+ listAll.put("messageID", randomStr);
+ InputStream inStream = null;
+
+ // convert listAll Json object to InputStream object
+ try {
+ inStream = IOUtils.toInputStream(listAll.toString(), "UTF-8");
+
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+ }
+ // call listAllMirrorMaker
+ mirrorService.pushEvents(ctx, topic, inStream, null, null);
+
+ // subscribe for listMirrorMaker
+ long startTime = System.currentTimeMillis();
+ String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
+
+ while (!isListMirrorMaker(msgFrmSubscribe, randomStr)
+ && (System.currentTimeMillis() - startTime) < timeout) {
+ msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
+ }
+
+ JSONArray listMirrorMaker = null;
+
+ if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
+ && isListMirrorMaker(msgFrmSubscribe, randomStr)) {
+
+ listMirrorMaker = getListMirrorMaker(msgFrmSubscribe, randomStr);
+ String whitelist = null;
+
+ for (int i = 0; i < listMirrorMaker.length(); i++) {
+ JSONObject mm = new JSONObject();
+ mm = listMirrorMaker.getJSONObject(i);
+ String name = mm.getString("name");
+
+ if (name.equals(jsonOb.getString("name")) && mm.has("whitelist")) {
+ whitelist = mm.getString("whitelist");
+ break;
+ }
+ }
+
+ List<String> topicList = new ArrayList<String>();
+ List<String> finalTopicList = new ArrayList<String>();
+
+ if (whitelist != null) {
+ topicList = Arrays.asList(whitelist.split(","));
+ }
+
+ for (String st : topicList) {
+ if (!StringUtils.isBlank(st)) {
+ finalTopicList.add(st);
+ }
+ }
+
+ String newTopic = jsonOb.getString("whitelistTopicName");
+
+ if (!topicList.contains(newTopic)
+ && getNamespace(newTopic).equals(jsonOb.getString("namespace"))) {
+
+ UpdateWhiteList updateWhiteList = new UpdateWhiteList();
+ MirrorMaker mirrorMaker = new MirrorMaker();
+ mirrorMaker.setName(jsonOb.getString("name"));
+ finalTopicList.add(newTopic);
+ String newWhitelist = "";
+
+ if (finalTopicList.size() > 0) {
+ newWhitelist = StringUtils.join(finalTopicList, ",");
+ }
+
+ mirrorMaker.setWhitelist(newWhitelist);
+
+ String newRandom = getRandomNum();
+ updateWhiteList.setMessageID(newRandom);
+ updateWhiteList.setUpdateWhiteList(mirrorMaker);
+
+ Gson g = new Gson();
+ g.toJson(updateWhiteList);
+ InputStream inputStream = null;
+ inputStream = IOUtils.toInputStream(g.toJson(updateWhiteList), "UTF-8");
+ // callPubSub(newRandom, ctx, inputStream);
+ callPubSubForWhitelist(newRandom, ctx, inputStream, jsonOb.getString("namespace"));
+
+ } else if (topicList.contains(newTopic)) {
+ sendErrResponse(ctx, "The topic already exist.");
+
+ } else if (!getNamespace(newTopic).equals(jsonOb.getString("namespace"))) {
+ sendErrResponse(ctx,
+ "The namespace of the topic does not match with the namespace you provided.");
+ }
+ } else {
+
+ JSONObject err = new JSONObject();
+ err.append("error",
+ "listWhiteList is not available, please make sure MirrorMakerAgent is running");
+ DMaaPResponseBuilder.respondOk(ctx, err);
+ }
+
+ } else {
+ sendErrResponse(ctx, NO_USER_CREATE_PERMISSION);
+ }
+
+ } else {
+
+ sendErrResponse(ctx, "This is not a createWhitelist request. Please try again.");
+ }
+
+ } catch (IOException | CambriaApiException | ConfigDbException | AccessDeniedException
+ | TopicExistsException | missingReqdSetting | UnavailableException e) {
+
+ e.printStackTrace();
+ }
+ }
+ // Send error response if user does not provide Authorization
+ else {
+ sendErrResponse(ctx, NO_USER_PERMISSION);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @POST
+ @Produces("application/json")
+ @Path("/deletewhitelist")
+ public void deleteWhiteList(InputStream msg) {
+
+ DMaaPContext ctx = getDmaapContext();
+ if (checkMirrorMakerPermission(ctx,
+ AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeruser.aaf"))) {
+
+ loadProperty();
+ String input = null;
+
+ try {
+ input = IOUtils.toString(msg, "UTF-8");
+
+ if (input != null && input.length() > 0) {
+ input = removeExtraChar(input);
+ }
+
+ // Check if it is correct Json object
+ JSONObject jsonOb = null;
+
+ try {
+ jsonOb = new JSONObject(input);
+
+ } catch (JSONException ex) {
+
+ sendErrResponse(ctx, errorMessages.getIncorrectJson());
+ }
+
+ // Check if the request has name and name contains only alpha numeric,
+ // check if the request has namespace and
+ // check if the request has whitelistTopicName
+ if (jsonOb.length() == 3 && jsonOb.has("name") && isAlphaNumeric(jsonOb.getString("name"))
+ && jsonOb.has("namespace") && jsonOb.has("whitelistTopicName")
+ && isAlphaNumeric(jsonOb.getString("whitelistTopicName").substring(jsonOb.getString("whitelistTopicName").lastIndexOf(".")+1,
+ jsonOb.getString("whitelistTopicName").length()))) {
+
+ String permission = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+ "msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString("namespace") + "|create";
+
+ // Check if the user have create permission for the
+ // namespace
+ if (checkMirrorMakerPermission(ctx, permission)) {
+
+ JSONObject listAll = new JSONObject();
+ JSONObject emptyObject = new JSONObject();
+
+ // Create a listAllMirrorMaker Json object
+ try {
+ listAll.put("listAllMirrorMaker", emptyObject);
+
+ } catch (JSONException e) {
+
+ e.printStackTrace();
+ }
+
+ // set a random number as messageID
+ String randomStr = getRandomNum();
+ listAll.put("messageID", randomStr);
+ InputStream inStream = null;
+
+ // convert listAll Json object to InputStream object
+ try {
+ inStream = IOUtils.toInputStream(listAll.toString(), "UTF-8");
+
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+ }
+ // call listAllMirrorMaker
+ mirrorService.pushEvents(ctx, topic, inStream, null, null);
+
+ // subscribe for listMirrorMaker
+ long startTime = System.currentTimeMillis();
+ String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
+
+ while (!isListMirrorMaker(msgFrmSubscribe, randomStr)
+ && (System.currentTimeMillis() - startTime) < timeout) {
+ msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
+ }
+
+ JSONObject jsonObj = new JSONObject();
+ JSONArray jsonArray = null;
+ JSONArray listMirrorMaker = null;
+
+ if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
+ && isListMirrorMaker(msgFrmSubscribe, randomStr)) {
+ msgFrmSubscribe = removeExtraChar(msgFrmSubscribe);
+ jsonArray = new JSONArray(msgFrmSubscribe);
+
+ for (int i = 0; i < jsonArray.length(); i++) {
+ jsonObj = jsonArray.getJSONObject(i);
+
+ JSONObject obj = new JSONObject();
+ if (jsonObj.has("message")) {
+ obj = jsonObj.getJSONObject("message");
+ }
+ if (obj.has("messageID") && obj.get("messageID").equals(randomStr) && obj.has("listMirrorMaker")) {
+ listMirrorMaker = obj.getJSONArray("listMirrorMaker");
+ break;
+ }
+ }
+ String whitelist = null;
+ for (int i = 0; i < listMirrorMaker.length(); i++) {
+
+ JSONObject mm = new JSONObject();
+ mm = listMirrorMaker.getJSONObject(i);
+ String name = mm.getString("name");
+
+ if (name.equals(jsonOb.getString("name")) && mm.has("whitelist")) {
+ whitelist = mm.getString("whitelist");
+ break;
+ }
+ }
+
+ List<String> topicList = new ArrayList<String>();
+
+ if (whitelist != null) {
+ topicList = Arrays.asList(whitelist.split(","));
+ }
+ boolean removeTopic = false;
+ String topicToRemove = jsonOb.getString("whitelistTopicName");
+
+ if (topicList.contains(topicToRemove)) {
+ removeTopic = true;
+ } else {
+ sendErrResponse(ctx, "The topic does not exist.");
+ }
+
+
+ if (removeTopic) {
+ UpdateWhiteList updateWhiteList = new UpdateWhiteList();
+ MirrorMaker mirrorMaker = new MirrorMaker();
+
+ mirrorMaker.setName(jsonOb.getString("name"));
+ mirrorMaker.setWhitelist(removeTopic(whitelist, topicToRemove));
+
+ String newRandom = getRandomNum();
+
+ updateWhiteList.setMessageID(newRandom);
+ updateWhiteList.setUpdateWhiteList(mirrorMaker);
+
+ Gson g = new Gson();
+ g.toJson(updateWhiteList);
+
+ InputStream inputStream = null;
+ inputStream = IOUtils.toInputStream(g.toJson(updateWhiteList), "UTF-8");
+ callPubSubForWhitelist(newRandom, ctx, inputStream, getNamespace(topicToRemove));
+ }
+
+ } else {
+
+ JSONObject err = new JSONObject();
+ err.append("error",
+ "listWhiteList is not available, please make sure MirrorMakerAgent is running");
+ DMaaPResponseBuilder.respondOk(ctx, err);
+ }
+
+ } else {
+ sendErrResponse(ctx, NO_USER_CREATE_PERMISSION);
+ }
+
+ } else {
+
+ sendErrResponse(ctx, "This is not a DeleteAllWhitelist request. Please try again.");
+ }
+
+ } catch (IOException | CambriaApiException | ConfigDbException | AccessDeniedException
+ | TopicExistsException | missingReqdSetting | UnavailableException e) {
+
+ e.printStackTrace();
+ }
+ }
+ // Send error response if user does not provide Authorization
+ else {
+ sendErrResponse(ctx, NO_USER_PERMISSION);
+ }
+ }
+
+ private String getNamespace(String topic) {
+ return topic.substring(0, topic.lastIndexOf("."));
+ }
+
+ private String removeTopic(String whitelist, String topicToRemove) {
+ List<String> topicList = new ArrayList<String>();
+ List<String> newTopicList = new ArrayList<String>();
+
+ if (whitelist.contains(",")) {
+ topicList = Arrays.asList(whitelist.split(","));
+
+ }
+
+ if (topicList.contains(topicToRemove)) {
+ for (String topic : topicList) {
+ if (!topic.equals(topicToRemove)) {
+ newTopicList.add(topic);
+ }
+ }
+ }
+
+ String newWhitelist = StringUtils.join(newTopicList, ",");
+
+ return newWhitelist;
+ }
+
+ private void callPubSubForWhitelist(String randomStr, DMaaPContext ctx, InputStream inStream, String namespace) {
+
+ try {
+ mirrorService.pushEvents(ctx, topic, inStream, null, null);
+ long startTime = System.currentTimeMillis();
+ String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
+
+ while (!isListMirrorMaker(msgFrmSubscribe, randomStr)
+ && (System.currentTimeMillis() - startTime) < timeout) {
+ msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid);
+ }
+
+ JSONObject jsonObj = new JSONObject();
+ JSONArray jsonArray = null;
+ JSONArray jsonArrayNamespace = null;
+
+ if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0
+ && isListMirrorMaker(msgFrmSubscribe, randomStr)) {
+ msgFrmSubscribe = removeExtraChar(msgFrmSubscribe);
+ jsonArray = new JSONArray(msgFrmSubscribe);
+
+ for (int i = 0; i < jsonArray.length(); i++) {
+ jsonObj = jsonArray.getJSONObject(i);
+
+ JSONObject obj = new JSONObject();
+ if (jsonObj.has("message")) {
+ obj = jsonObj.getJSONObject("message");
+ }
+ if (obj.has("messageID") && obj.get("messageID").equals(randomStr) && obj.has("listMirrorMaker")) {
+ jsonArrayNamespace = obj.getJSONArray("listMirrorMaker");
+ }
+ }
+ JSONObject finalJasonObj = new JSONObject();
+ JSONArray finalJsonArray = new JSONArray();
+
+ for (int i = 0; i < jsonArrayNamespace.length(); i++) {
+
+ JSONObject mmObj = new JSONObject();
+ mmObj = jsonArrayNamespace.getJSONObject(i);
+ String whitelist = null;
+
+ if (mmObj.has("whitelist")) {
+ whitelist = getWhitelistByNamespace(mmObj.getString("whitelist"), namespace);
+
+ if (whitelist != null) {
+ mmObj.remove("whitelist");
+ mmObj.put("whitelist", whitelist);
+ } else {
+ mmObj.remove("whitelist");
+ }
+ }
+ finalJsonArray.put(mmObj);
+ }
+ finalJasonObj.put("listMirrorMaker", finalJsonArray);
+
+ DMaaPResponseBuilder.respondOk(ctx, finalJasonObj);
+
+ } else {
+
+ JSONObject err = new JSONObject();
+ err.append("error", "listMirrorMaker is not available, please make sure MirrorMakerAgent is running");
+ DMaaPResponseBuilder.respondOk(ctx, err);
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private String getWhitelistByNamespace(String originalWhitelist, String namespace) {
+
+ String whitelist = null;
+ List<String> resultList = new ArrayList<String>();
+ List<String> whitelistList = new ArrayList<String>();
+ whitelistList = Arrays.asList(originalWhitelist.split(","));
+
+ for (String topic : whitelistList) {
+ if (StringUtils.isNotBlank(originalWhitelist) && getNamespace(topic).equals(namespace)) {
+ resultList.add(topic);
+ }
+ }
+ if (resultList.size() > 0) {
+ whitelist = StringUtils.join(resultList, ",");
+ }
+
+ return whitelist;
+ }
+
+ private JSONArray getListMirrorMaker(String msgFrmSubscribe, String randomStr) {
+ JSONObject jsonObj = new JSONObject();
+ JSONArray jsonArray = new JSONArray();
+ JSONArray listMirrorMaker = new JSONArray();
+
+ msgFrmSubscribe = removeExtraChar(msgFrmSubscribe);
+ jsonArray = new JSONArray(msgFrmSubscribe);
+
+ for (int i = 0; i < jsonArray.length(); i++) {
+ jsonObj = jsonArray.getJSONObject(i);
+
+ JSONObject obj = new JSONObject();
+ if (jsonObj.has("message")) {
+ obj = jsonObj.getJSONObject("message");
+ }
+ if (obj.has("messageID") && obj.get("messageID").equals(randomStr) && obj.has("listMirrorMaker")) {
+ listMirrorMaker = obj.getJSONArray("listMirrorMaker");
+ break;
+ }
+ }
+ return listMirrorMaker;
+ }
+}
diff --git a/src/main/java/com/att/nsa/dmaap/service/MetricsRestService.java b/src/main/java/com/att/nsa/dmaap/service/MetricsRestService.java
new file mode 100644
index 0000000..8a6240e
--- /dev/null
+++ b/src/main/java/com/att/nsa/dmaap/service/MetricsRestService.java
@@ -0,0 +1,152 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.dmaap.service;
+
+import java.io.IOException;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+
+import org.apache.http.HttpStatus;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Component;
+
+import com.att.nsa.cambria.CambriaApiException;
+import com.att.nsa.cambria.beans.DMaaPContext;
+import com.att.nsa.cambria.exception.DMaaPResponseCode;
+import com.att.nsa.cambria.exception.ErrorResponse;
+import com.att.nsa.cambria.service.MetricsService;
+import com.att.nsa.cambria.utils.ConfigurationReader;
+
+/**
+ * This class is a CXF REST service which acts
+ * as gateway for MR Metrics Service.
+ * @author author
+ *
+ */
+@Component
+@Path("/")
+public class MetricsRestService {
+
+ /**
+ * Logger obj
+ */
+ //private Logger log = Logger.getLogger(MetricsRestService.class.toString());
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class);
+ /**
+ * HttpServletRequest obj
+ */
+ @Context
+ private HttpServletRequest request;
+
+ /**
+ * HttpServletResponse obj
+ */
+ @Context
+ private HttpServletResponse response;
+
+ /**
+ * Config Reader
+ */
+ @Autowired
+ @Qualifier("configurationReader")
+ private ConfigurationReader configReader;
+
+ /**
+ * MetricsService obj
+ */
+ @Autowired
+ private MetricsService metricsService;
+
+ /**
+ * Get Metrics method
+ * @throws CambriaApiException ex
+ */
+ @GET
+ @Produces("text/plain")
+ public void getMetrics() throws CambriaApiException {
+ try {
+ log.info("MetricsRestService: getMetrics : START");
+ metricsService.get(getDmaapContext());
+ log.info("MetricsRestService: getMetrics : Completed");
+ } catch (IOException e) {
+ log.error("Error while fetching metrics data : ", e);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+ DMaaPResponseCode.GET_METRICS_ERROR.getResponseCode(),
+ "Error while fetching metrics data"+ e.getMessage());
+ log.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+ }
+ }
+
+ /**
+ * This method is for get the metrics details by the metrics name
+ *
+ * @param metricName
+ * @throws CambriaApiException
+ */
+ @GET
+ @Path("/{metricName}")
+ @Produces("text/plain")
+ public void getMetricsByName(@PathParam("metricName") String metricName)
+ throws CambriaApiException {
+
+ try {
+ log.info("MetricsProducer: getMetricsByName : START");
+ metricsService.getMetricByName(getDmaapContext(), metricName);
+ log.info("MetricsRestService: getMetricsByName : Completed");
+ } catch (IOException | CambriaApiException e) {
+ log.error("Error while fetching metrics data : ", e);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.GET_METRICS_ERROR.getResponseCode(),
+ "Error while fetching metrics data"+ e.getMessage());
+ log.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+ }
+ }
+
+ /**
+ * This method is used for taking Configuration Object,HttpServletRequest
+ * Object,HttpServletRequest HttpServletResponse Object,HttpServletSession
+ * Object.
+ *
+ * @return DMaaPContext object from where user can get Configuration
+ * Object,HttpServlet Object
+ *
+ */
+ private DMaaPContext getDmaapContext() {
+ DMaaPContext dmaapContext = new DMaaPContext();
+ dmaapContext.setConfigReader(configReader);
+ dmaapContext.setRequest(request);
+ dmaapContext.setResponse(response);
+ return dmaapContext;
+ }
+
+} \ No newline at end of file
diff --git a/src/main/java/com/att/nsa/dmaap/service/TopicRestService.java b/src/main/java/com/att/nsa/dmaap/service/TopicRestService.java
new file mode 100644
index 0000000..6742cd5
--- /dev/null
+++ b/src/main/java/com/att/nsa/dmaap/service/TopicRestService.java
@@ -0,0 +1,688 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.dmaap.service;
+
+import java.io.IOException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.http.HttpStatus;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+import org.json.JSONException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+import com.att.nsa.cambria.CambriaApiException;
+import com.att.nsa.cambria.beans.DMaaPContext;
+import com.att.nsa.cambria.beans.TopicBean;
+import com.att.nsa.cambria.constants.CambriaConstants;
+import com.att.nsa.cambria.exception.DMaaPAccessDeniedException;
+import com.att.nsa.cambria.exception.DMaaPErrorMessages;
+import com.att.nsa.cambria.exception.DMaaPResponseCode;
+import com.att.nsa.cambria.exception.ErrorResponse;
+import com.att.nsa.cambria.metabroker.Broker.TopicExistsException;
+import com.att.nsa.cambria.security.DMaaPAAFAuthenticator;
+import com.att.nsa.cambria.security.DMaaPAAFAuthenticatorImpl;
+import com.att.nsa.cambria.service.TopicService;
+import com.att.nsa.cambria.utils.ConfigurationReader;
+import com.att.nsa.configs.ConfigDbException;
+import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
+
+/**
+ * This class is a CXF REST service which acts
+ * as gateway for MR Topic Service.
+ * @author author
+ *
+ */
+
+@Component
+@Path("/")
+public class TopicRestService {
+
+ /**
+ * Logger obj
+ */
+ //private static final Logger LOGGER = Logger .getLogger(TopicRestService.class);
+ private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicRestService.class);
+ /**
+ * Config Reader
+ */
+ @Autowired
+ @Qualifier("configurationReader")
+ private ConfigurationReader configReader;
+
+ /**
+ * HttpServletRequest obj
+ */
+ @Context
+ private HttpServletRequest request;
+
+ /**
+ * HttpServletResponse obj
+ */
+ @Context
+ private HttpServletResponse response;
+
+ /**
+ * TopicService obj
+ */
+ @Autowired
+ private TopicService topicService;
+
+ /**
+ * DMaaPErrorMessages obj
+ */
+ @Autowired
+ private DMaaPErrorMessages errorMessages;
+
+ /**
+ * mrNamespace
+ */
+ //@Value("${msgRtr.namespace.aaf}")
+// private String mrNamespace;
+
+
+ /**
+ * Fetches a list of topics from the current kafka instance and converted
+ * into json object.
+ *
+ * @return list of the topics in json format
+ * @throws AccessDeniedException
+ * @throws CambriaApiException
+ * @throws IOException
+ * @throws JSONException
+ * */
+ @GET
+ //@Produces(MediaType.TEXT_PLAIN)
+ public void getTopics() throws CambriaApiException {
+ try {
+
+ LOGGER.info("Authenticating the user before fetching the topics");
+ //String permission = "com.att.dmaap.mr.topic|*|view";
+ String mrNameS= com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,"msgRtr.namespace.aaf");
+ String permission =mrNameS+"|"+"*"+"|"+"view";
+ DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
+ //Check if client is using AAF CADI Basic Authorization
+ //If yes then check for AAF role authentication else display all topics
+ if(null!=getDmaapContext().getRequest().getHeader("Authorization"))
+ {
+ if(!aaf.aafAuthentication(getDmaapContext().getRequest(), permission))
+ {
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+ DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
+ errorMessages.getNotPermitted1()+" read "+errorMessages.getNotPermitted2());
+ LOGGER.info(errRes.toString());
+ throw new DMaaPAccessDeniedException(errRes);
+
+
+ }
+ }
+
+ LOGGER.info("Fetching all Topics");
+
+ topicService.getTopics(getDmaapContext());
+
+ LOGGER.info("Returning List of all Topics");
+
+
+ } catch (JSONException | ConfigDbException | IOException excp) {
+ LOGGER.error(
+ "Failed to retrieve list of all topics: "
+ + excp.getMessage(), excp);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.GET_TOPICS_FAIL.getResponseCode(),
+ errorMessages.getTopicsfailure()+ excp.getMessage());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+
+
+ }
+ }
+
+ /**
+ * Fetches a list of topics from the current kafka instance and converted
+ * into json object.
+ *
+ * @return list of the topics in json format
+ * @throws AccessDeniedException
+ * @throws CambriaApiException
+ * @throws IOException
+ * @throws JSONException
+ * */
+ @GET
+ @Path("/listAll")
+ //@Produces(MediaType.TEXT_PLAIN)
+ public void getAllTopics() throws CambriaApiException {
+ try {
+
+ LOGGER.info("Authenticating the user before fetching the topics");
+ //String permission = "com.att.dmaap.mr.topic|*|view";
+ String mrNameS= com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,"msgRtr.namespace.aaf");
+ String permission =mrNameS+"|"+"*"+"|"+"view";
+ DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
+ //Check if client is using AAF CADI Basic Authorization
+ //If yes then check for AAF role authentication else display all topics
+ if(null!=getDmaapContext().getRequest().getHeader("Authorization"))
+ {
+ if(!aaf.aafAuthentication(getDmaapContext().getRequest(), permission))
+ {
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+ DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
+ errorMessages.getNotPermitted1()+" read "+errorMessages.getNotPermitted2());
+ LOGGER.info(errRes.toString());
+ throw new DMaaPAccessDeniedException(errRes);
+
+
+ }
+ }
+
+ LOGGER.info("Fetching all Topics");
+
+ topicService.getAllTopics(getDmaapContext());
+
+ LOGGER.info("Returning List of all Topics");
+
+
+ } catch (JSONException | ConfigDbException | IOException excp) {
+ LOGGER.error(
+ "Failed to retrieve list of all topics: "
+ + excp.getMessage(), excp);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.GET_TOPICS_FAIL.getResponseCode(),
+ errorMessages.getTopicsfailure()+ excp.getMessage());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+
+
+ }
+ }
+
+
+ /**
+ * Returns details of the topic whose name is passed as a parameter
+ *
+ * @param topicName
+ * - name of the topic
+ * @return details of a topic whose name is mentioned in the request in json
+ * format.
+ * @throws AccessDeniedException
+ * @throws DMaaPAccessDeniedException
+ * @throws IOException
+ * */
+ @GET
+ @Path("/{topicName}")
+ //@Produces(MediaType.TEXT_PLAIN)
+ public void getTopic(@PathParam("topicName") String topicName) throws CambriaApiException {
+ try {
+
+ LOGGER.info("Authenticating the user before fetching the details about topic = "+ topicName);
+ DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
+
+ //String permission= "com.att.ecomp_test.crm.mr.topic|:topic.com.att.ecomp_test.crm.preDemo|view";
+
+ //Check if client is using AAF CADI Basic Authorization
+ //If yes then check for AAF role authentication else display all topics
+ if(null!=getDmaapContext().getRequest().getHeader("Authorization"))
+ {
+ String permission = aaf.aafPermissionString(topicName, "view");
+ if(!aaf.aafAuthentication(getDmaapContext().getRequest(), permission))
+ {
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+ DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
+ errorMessages.getNotPermitted1()+" read "+errorMessages.getNotPermitted2());
+ LOGGER.info(errRes.toString());
+ throw new DMaaPAccessDeniedException(errRes);
+ }
+ }
+
+ LOGGER.info("Fetching Topic: " + topicName);
+
+ topicService.getTopic(getDmaapContext(), topicName);
+
+ LOGGER.info("Fetched details of topic: " + topicName);
+
+ } catch (ConfigDbException | IOException | TopicExistsException excp) {
+ LOGGER.error("Failed to retrieve details of topic: " + topicName,
+ excp);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.GET_TOPICS_DETAILS_FAIL.getResponseCode(),
+ errorMessages.getTopicDetailsFail()+topicName+ excp.getMessage());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+
+
+ }
+ }
+
+
+
+ /**
+ * This method is still not working. Need to check on post call and how to
+ * accept parameters for post call
+ *
+ * @param topicBean
+ * it will have the bean object
+ * @throws TopicExistsException
+ * @throws CambriaApiException
+ * @throws JSONException
+ * @throws IOException
+ * @throws AccessDeniedException
+ *
+ * */
+ @POST
+ @Path("/create")
+ @Consumes({ MediaType.APPLICATION_JSON })
+ //@Produces(MediaType.TEXT_PLAIN)
+ public void createTopic(TopicBean topicBean) throws CambriaApiException, JSONException {
+ try {
+ LOGGER.info("Creating Topic."+topicBean.getTopicName());
+
+ topicService.createTopic(getDmaapContext(), topicBean);
+
+ LOGGER.info("Topic created Successfully.");
+ }
+ catch (TopicExistsException ex){
+
+ LOGGER.error("Error while creating a topic: " + ex.getMessage(),
+ ex);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
+ DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
+ errorMessages.getCreateTopicFail()+ ex.getMessage());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+
+
+
+
+ }catch (AccessDeniedException | DMaaPAccessDeniedException excp) {
+ LOGGER.error("Error while creating a topic: " + excp.getMessage(),
+ excp);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
+ errorMessages.getCreateTopicFail()+ excp.getMessage());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+
+ }catch (CambriaApiException | IOException excp) {
+ LOGGER.error("Error while creating a topic: " + excp.getMessage(),
+ excp);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
+ errorMessages.getCreateTopicFail()+ excp.getMessage());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+
+ }
+ }
+
+ /**
+ * Deletes existing topic whose name is passed as a parameter
+ *
+ * @param topicName
+ * topic
+ * @throws CambriaApiException
+ * @throws IOException
+ * */
+ @DELETE
+ @Path("/{topicName}")
+ //@Produces(MediaType.TEXT_PLAIN)
+ public void deleteTopic(@PathParam("topicName") String topicName) throws CambriaApiException {
+ try {
+ LOGGER.info("Deleting Topic: " + topicName);
+
+ topicService.deleteTopic(getDmaapContext(), topicName);
+
+ LOGGER.info("Topic [" + topicName + "] deleted successfully.");
+ } catch (DMaaPAccessDeniedException| AccessDeniedException excp) {
+ LOGGER.error("Error while creating a topic: " + excp.getMessage(),
+ excp);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
+ errorMessages.getCreateTopicFail()+ excp.getMessage());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+
+ }catch (IOException | ConfigDbException
+ | CambriaApiException | TopicExistsException excp) {
+ LOGGER.error("Error while deleting topic: " + topicName, excp);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.DELETE_TOPIC_FAIL.getResponseCode(),
+ errorMessages.getDeleteTopicFail()+ topicName + excp.getMessage());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+
+ }
+ }
+
+ private DMaaPContext getDmaapContext() {
+
+ DMaaPContext dmaapContext = new DMaaPContext();
+ dmaapContext.setRequest(request);
+ dmaapContext.setResponse(response);
+ dmaapContext.setConfigReader(configReader);
+
+ return dmaapContext;
+
+ }
+
+ /**
+ * This method will fetch the details of publisher by giving topic name
+ *
+ * @param topicName
+ * @throws CambriaApiException
+ * @throws AccessDeniedException
+ */
+ @GET
+ @Path("/{topicName}/producers")
+ //@Produces(MediaType.TEXT_PLAIN)
+ public void getPublishersByTopicName(
+ @PathParam("topicName") String topicName) throws CambriaApiException {
+ try {
+
+// String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
+// DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
+// String permission = aaf.aafPermissionString(topicName, "view");
+// if(aaf.aafAuthentication(getDmaapContext().getRequest(), permission))
+// {
+ LOGGER.info("Fetching list of all the publishers for topic "
+ + topicName);
+
+ topicService.getPublishersByTopicName(getDmaapContext(), topicName);
+
+ LOGGER.info("Returning list of all the publishers for topic "
+ + topicName);
+// }else{
+// LOGGER.error("Error while fetching list of publishers for topic "+ topicName);
+//
+// ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+// DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
+// errorMessages.getNotPermitted1()+" fetch list of publishers "+errorMessages.getNotPermitted2());
+// LOGGER.info(errRes);
+// throw new DMaaPAccessDeniedException(errRes);
+//
+// }
+
+ } catch (IOException | ConfigDbException | TopicExistsException excp) {
+ LOGGER.error("Error while fetching list of publishers for topic "
+ + topicName, excp);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.GET_PUBLISHERS_BY_TOPIC.getResponseCode(),
+ "Error while fetching list of publishers for topic: "
+ + topicName + excp.getMessage());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+
+ }
+ }
+
+ /**
+ * proving permission for the topic for a particular publisher id
+ *
+ * @param topicName
+ * @param producerId
+ * @throws CambriaApiException
+ */
+ @PUT
+ @Path("/{topicName}/producers/{producerId}")
+ public void permitPublisherForTopic(
+ @PathParam("topicName") String topicName,
+ @PathParam("producerId") String producerId) throws CambriaApiException {
+ try {
+ LOGGER.info("Granting write access to producer [" + producerId
+ + "] for topic " + topicName);
+
+ topicService.permitPublisherForTopic(getDmaapContext(), topicName,
+ producerId);
+
+ LOGGER.info("Write access has been granted to producer ["
+ + producerId + "] for topic " + topicName);
+ } catch (AccessDeniedException | DMaaPAccessDeniedException excp) {
+ LOGGER.error("Error while creating a topic: " + excp.getMessage(),
+ excp);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
+ errorMessages.getCreateTopicFail()+ excp.getMessage());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+
+ }catch ( ConfigDbException | IOException
+ | TopicExistsException excp) {
+ LOGGER.error("Error while granting write access to producer ["
+ + producerId + "] for topic " + topicName, excp);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.PERMIT_PUBLISHER_FOR_TOPIC.getResponseCode(),
+ "Error while granting write access to producer ["
+ + producerId + "] for topic " + topicName + excp.getMessage());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+
+ }
+ }
+
+ /**
+ * Removing access for a publisher id for any particular topic
+ *
+ * @param topicName
+ * @param producerId
+ * @throws CambriaApiException
+ */
+ @DELETE
+ @Path("/{topicName}/producers/{producerId}")
+ public void denyPublisherForTopic(@PathParam("topicName") String topicName,
+ @PathParam("producerId") String producerId) throws CambriaApiException {
+ try {
+ LOGGER.info("Revoking write access to producer [" + producerId
+ + "] for topic " + topicName);
+
+ topicService.denyPublisherForTopic(getDmaapContext(), topicName,
+ producerId);
+
+ LOGGER.info("Write access revoked for producer [" + producerId
+ + "] for topic " + topicName);
+ } catch (DMaaPAccessDeniedException | AccessDeniedException excp) {
+ LOGGER.error("Error while creating a topic: " + excp.getMessage(),
+ excp);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
+ errorMessages.getCreateTopicFail()+ excp.getMessage());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+
+ }catch ( ConfigDbException | IOException
+ | TopicExistsException excp) {
+ LOGGER.error("Error while revoking write access for producer ["
+ + producerId + "] for topic " + topicName, excp);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+ DMaaPResponseCode.REVOKE_PUBLISHER_FOR_TOPIC.getResponseCode(),
+ "Error while revoking write access to producer ["
+ + producerId + "] for topic " + topicName + excp.getMessage());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+ }
+ }
+
+ /**
+ * Get the consumer details by the topic name
+ *
+ * @param topicName
+ * @throws AccessDeniedException
+ * @throws CambriaApiException
+ */
+ @GET
+ @Path("/{topicName}/consumers")
+ //@Produces(MediaType.TEXT_PLAIN)
+ public void getConsumersByTopicName(@PathParam("topicName") String topicName) throws AccessDeniedException,
+ CambriaApiException {
+ try {
+
+
+// String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"view";
+// DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
+// String permission = aaf.aafPermissionString(topicName, "view");
+// if(aaf.aafAuthentication(getDmaapContext().getRequest(), permission))
+// {
+ LOGGER.info("Fetching list of all consumers for topic " + topicName);
+
+ topicService.getConsumersByTopicName(getDmaapContext(), topicName);
+
+ LOGGER.info("Returning list of all consumers for topic "
+ + topicName);
+
+// }else{
+// LOGGER.error(
+// "Error while fetching list of all consumers for topic "
+// + topicName);
+// ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+// DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
+// errorMessages.getNotPermitted1()+" fetch list of consumers "+errorMessages.getNotPermitted2());
+// LOGGER.info(errRes);
+// throw new DMaaPAccessDeniedException(errRes);
+//
+//
+// }
+
+
+
+ } catch (IOException | ConfigDbException | TopicExistsException excp) {
+ LOGGER.error(
+ "Error while fetching list of all consumers for topic "
+ + topicName, excp);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+ DMaaPResponseCode.GET_CONSUMERS_BY_TOPIC.getResponseCode(),
+ "Error while fetching list of all consumers for topic: "
+ + topicName+ excp.getMessage());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+
+ }
+ }
+
+ /**
+ * providing access for consumer for any particular topic
+ *
+ * @param topicName
+ * @param consumerId
+ * @throws CambriaApiException
+ */
+ @PUT
+ @Path("/{topicName}/consumers/{consumerId}")
+ public void permitConsumerForTopic(
+ @PathParam("topicName") String topicName,
+ @PathParam("consumerId") String consumerId) throws CambriaApiException {
+ try {
+ LOGGER.info("Granting read access to consumer [" + consumerId
+ + "] for topic " + topicName);
+
+ topicService.permitConsumerForTopic(getDmaapContext(), topicName,
+ consumerId);
+
+ LOGGER.info("Read access granted to consumer [" + consumerId
+ + "] for topic " + topicName);
+ } catch (AccessDeniedException | ConfigDbException | IOException
+ | TopicExistsException excp) {
+ LOGGER.error("Error while granting read access to consumer ["
+ + consumerId + "] for topic " + topicName, excp);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+ DMaaPResponseCode.PERMIT_CONSUMER_FOR_TOPIC.getResponseCode(),
+ "Error while granting read access to consumer ["
+ + consumerId + "] for topic " + topicName+ excp.getMessage());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+
+ }
+ }
+
+ /**
+ * Removing access for consumer for any particular topic
+ *
+ * @param topicName
+ * @param consumerId
+ * @throws CambriaApiException
+ */
+ @DELETE
+ @Path("/{topicName}/consumers/{consumerId}")
+ public void denyConsumerForTopic(@PathParam("topicName") String topicName,
+ @PathParam("consumerId") String consumerId) throws CambriaApiException {
+ try {
+ LOGGER.info("Revoking read access to consumer [" + consumerId
+ + "] for topic " + topicName);
+
+ topicService.denyConsumerForTopic(getDmaapContext(), topicName,
+ consumerId);
+
+ LOGGER.info("Read access revoked to consumer [" + consumerId
+ + "] for topic " + topicName);
+ } catch ( ConfigDbException | IOException
+ | TopicExistsException excp) {
+ LOGGER.error("Error while revoking read access to consumer ["
+ + consumerId + "] for topic " + topicName, excp);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+ DMaaPResponseCode.REVOKE_CONSUMER_FOR_TOPIC.getResponseCode(),
+ "Error while revoking read access to consumer ["
+ + consumerId + "] for topic " + topicName+ excp.getMessage());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+ }catch (DMaaPAccessDeniedException | AccessDeniedException excp) {
+ LOGGER.error("Error while creating a topic: " + excp.getMessage(),
+ excp);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
+ errorMessages.getCreateTopicFail()+ excp.getMessage());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+
+ }
+ }
+
+
+
+
+}
diff --git a/src/main/java/com/att/nsa/dmaap/service/TransactionRestService.java b/src/main/java/com/att/nsa/dmaap/service/TransactionRestService.java
new file mode 100644
index 0000000..a44c2ad
--- /dev/null
+++ b/src/main/java/com/att/nsa/dmaap/service/TransactionRestService.java
@@ -0,0 +1,176 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.dmaap.service;
+
+import java.io.IOException;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.core.Context;
+
+import org.apache.http.HttpStatus;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Component;
+
+import com.att.aft.dme2.internal.jettison.json.JSONException;
+import com.att.nsa.cambria.CambriaApiException;
+import com.att.nsa.cambria.beans.DMaaPContext;
+import com.att.nsa.cambria.exception.DMaaPResponseCode;
+import com.att.nsa.cambria.exception.ErrorResponse;
+import com.att.nsa.cambria.service.TransactionService;
+import com.att.nsa.cambria.utils.ConfigurationReader;
+import com.att.nsa.configs.ConfigDbException;
+
+/**
+ * This class is a CXF REST service
+ * which acts as gateway for DMaaP
+ * Transaction Ids.
+ * @author author
+ *
+ */
+@Component
+@Path("/")
+public class TransactionRestService {
+
+ /**
+ * Logger obj
+ */
+ private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TransactionRestService.class);
+
+ /**
+ * HttpServletRequest obj
+ */
+ @Context
+ private HttpServletRequest request;
+
+ /**
+ * HttpServletResponse obj
+ */
+ @Context
+ private HttpServletResponse response;
+
+ /**
+ * Config Reader
+ */
+ @Autowired
+ @Qualifier("configurationReader")
+ private ConfigurationReader configReader;
+
+ @Autowired
+ private TransactionService transactionService;
+
+ /**
+ *
+ * Returns a list of all the existing Transaction Ids
+ * @throws CambriaApiException
+ *
+ * @throws IOException
+ * @exception ConfigDbException
+ * @exception IOException
+ *
+ *
+ */
+ @GET
+ public void getAllTransactionObjs() throws CambriaApiException {
+ try {
+ LOGGER.info("Retrieving list of all transactions.");
+
+ transactionService.getAllTransactionObjs(getDmaapContext());
+
+ LOGGER.info("Returning list of all transactions.");
+ } catch (ConfigDbException | IOException e) {
+ LOGGER.error("Error while retrieving list of all transactions: "
+ + e.getMessage(), e);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_EXPECTATION_FAILED,
+ DMaaPResponseCode.RETRIEVE_TRANSACTIONS.getResponseCode(),
+ "Error while retrieving list of all transactions:"+e.getMessage());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+ }
+ }
+
+ /**
+ *
+ * Returns details of a particular transaction id whose <code>name</code> is
+ * passed as a parameter
+ *
+ * @param transactionId
+ * - id of transaction
+ * @throws CambriaApiException
+ * @throws IOException
+ * @exception ConfigDbException
+ * @exception IOException
+ * @exception JSONException
+ *
+ *
+ */
+ @GET
+ @Path("/{transactionId}")
+ public void getTransactionObj(
+ @PathParam("transactionId") String transactionId) throws CambriaApiException {
+
+ LOGGER.info("Fetching details of Transaction ID : " + transactionId);
+
+ try {
+ transactionService.getTransactionObj(getDmaapContext(),
+ transactionId);
+ } catch (ConfigDbException | JSONException | IOException e) {
+ LOGGER.error("Error while retrieving transaction details for id: "
+ + transactionId, e);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_EXPECTATION_FAILED,
+ DMaaPResponseCode.RETRIEVE_TRANSACTIONS_DETAILS.getResponseCode(),
+ "Error while retrieving transaction details for id: ["
+ + transactionId + "]: " + e.getMessage());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+
+ }
+
+ LOGGER.info("Returning details of transaction " + transactionId);
+
+ }
+
+ /**
+ * This method is used for taking Configuration Object,HttpServletRequest
+ * Object,HttpServletRequest HttpServletResponse Object,HttpServletSession
+ * Object.
+ *
+ * @return DMaaPContext object from where user can get Configuration
+ * Object,HttpServlet Object
+ *
+ */
+ private DMaaPContext getDmaapContext() {
+ DMaaPContext dmaapContext = new DMaaPContext();
+ dmaapContext.setConfigReader(configReader);
+ dmaapContext.setRequest(request);
+ dmaapContext.setResponse(response);
+ return dmaapContext;
+ }
+
+} \ No newline at end of file
diff --git a/src/main/java/com/att/nsa/dmaap/service/UIRestServices.java b/src/main/java/com/att/nsa/dmaap/service/UIRestServices.java
new file mode 100644
index 0000000..79a39fb
--- /dev/null
+++ b/src/main/java/com/att/nsa/dmaap/service/UIRestServices.java
@@ -0,0 +1,198 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.dmaap.service;
+
+import java.io.IOException;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.core.Context;
+
+import kafka.common.TopicExistsException;
+
+import org.apache.http.HttpStatus;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Component;
+
+import com.att.nsa.cambria.beans.DMaaPContext;
+import com.att.nsa.cambria.service.UIService;
+import com.att.nsa.cambria.utils.ConfigurationReader;
+import com.att.nsa.cambria.utils.DMaaPResponseBuilder;
+import com.att.nsa.configs.ConfigDbException;
+
+/**
+ * UI Rest Service
+ * @author author
+ *
+ */
+@Component
+public class UIRestServices {
+
+ /**
+ * Logger obj
+ */
+ //private static final Logger LOGGER = Logger.getLogger(UIRestServices.class);
+
+ private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(UIRestServices.class);
+
+ @Autowired
+ private UIService uiService;
+
+ /**
+ * Config Reader
+ */
+ @Autowired
+ @Qualifier("configurationReader")
+ private ConfigurationReader configReader;
+
+ /**
+ * HttpServletRequest obj
+ */
+ @Context
+ private HttpServletRequest request;
+
+ /**
+ * HttpServletResponse obj
+ */
+ @Context
+ private HttpServletResponse response;
+
+ /**
+ * getting the hello
+ */
+ @GET
+ @Path("/")
+ public void hello() {
+ try {
+ LOGGER.info("Calling hello page.");
+
+ uiService.hello(getDmaapContext());
+
+ LOGGER.info("Hello page is returned.");
+ } catch (IOException excp) {
+ LOGGER.error("Error while calling hello page: " + excp.getMessage(), excp);
+ DMaaPResponseBuilder.respondWithError(getDmaapContext(), HttpStatus.SC_NOT_FOUND,
+ "Error while calling hello page: " + excp.getMessage());
+ }
+ }
+
+ /**
+ * getApikeysTable
+ */
+ @GET
+ @Path("/ui/apikeys")
+ public void getApiKeysTable() {
+ try {
+ LOGGER.info("Fetching list of all api keys.");
+
+ uiService.getApiKeysTable(getDmaapContext());
+
+ LOGGER.info("Returning list of all api keys.");
+ } catch (ConfigDbException | IOException excp) {
+ LOGGER.error("Error while fetching list of all api keys: " + excp.getMessage(), excp);
+ DMaaPResponseBuilder.respondWithError(getDmaapContext(), HttpStatus.SC_NOT_FOUND,
+ "Error while fetching list of all api keys: " + excp.getMessage());
+ }
+ }
+
+ /**
+ * getApiKey
+ *
+ * @param apiKey
+ * @exception Exception
+ */
+ @GET
+ @Path("/ui/apikeys/{apiKey}")
+ public void getApiKey(@PathParam("apiKey") String apiKey) {
+ try {
+ LOGGER.info("Fetching details of api key: " + apiKey);
+
+ uiService.getApiKey(getDmaapContext(), apiKey);
+
+ LOGGER.info("Returning details of api key: " + apiKey);
+ } catch (Exception excp) {
+ LOGGER.error("Error while fetching details of api key: " + apiKey, excp);
+ DMaaPResponseBuilder.respondWithError(getDmaapContext(), HttpStatus.SC_NOT_FOUND,
+ "Error while fetching details of api key: " + apiKey);
+ }
+ }
+
+ @GET
+ @Path("/ui/topics")
+ public void getTopicsTable() {
+ try {
+ LOGGER.info("Fetching list of all topics.");
+
+ uiService.getTopicsTable(getDmaapContext());
+
+ LOGGER.info("Returning list of all topics.");
+ } catch (ConfigDbException | IOException excp) {
+ LOGGER.error("Error while fetching list of all topics: " + excp, excp);
+ DMaaPResponseBuilder.respondWithError(getDmaapContext(), HttpStatus.SC_NOT_FOUND,
+ "Error while fetching list of all topics: " + excp.getMessage());
+ }
+ }
+
+ /**
+ *
+ * @param topic
+ */
+ @GET
+ @Path("/ui/topics/{topic}")
+ public void getTopic(@PathParam("topic") String topic) {
+ try {
+ LOGGER.info("Fetching details of topic: " + topic);
+
+ uiService.getTopic(getDmaapContext(), topic);
+
+ LOGGER.info("Returning details of topic: " + topic);
+ } catch (ConfigDbException | IOException | TopicExistsException excp) {
+ LOGGER.error("Error while fetching details of topic: " + topic, excp);
+ DMaaPResponseBuilder.respondWithError(getDmaapContext(), HttpStatus.SC_NOT_FOUND,
+ "Error while fetching details of topic: " + topic);
+ }
+ }
+
+ /**
+ * This method is used for taking Configuration Object,HttpServletRequest
+ * Object,HttpServletRequest HttpServletResponse Object,HttpServletSession
+ * Object.
+ *
+ * @return DMaaPContext object from where user can get Configuration
+ * Object,HttpServlet Object
+ *
+ */
+ private DMaaPContext getDmaapContext() {
+ DMaaPContext dmaapContext = new DMaaPContext();
+ dmaapContext.setConfigReader(configReader);
+ dmaapContext.setRequest(request);
+ dmaapContext.setResponse(response);
+ return dmaapContext;
+ }
+}
diff --git a/src/main/java/com/att/nsa/dmaap/tools/ConfigTool.java b/src/main/java/com/att/nsa/dmaap/tools/ConfigTool.java
new file mode 100644
index 0000000..4424840
--- /dev/null
+++ b/src/main/java/com/att/nsa/dmaap/tools/ConfigTool.java
@@ -0,0 +1,818 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.dmaap.tools;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.security.NoSuchAlgorithmException;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.Map.Entry;
+
+import org.json.JSONException;
+
+import com.att.nsa.apiServer.CommonServlet;
+import com.att.nsa.cambria.beans.DMaaPKafkaMetaBroker;
+import com.att.nsa.cambria.metabroker.Topic;
+import com.att.nsa.cmdtool.Command;
+import com.att.nsa.cmdtool.CommandLineTool;
+import com.att.nsa.cmdtool.CommandNotReadyException;
+import com.att.nsa.configs.ConfigDb;
+import com.att.nsa.configs.ConfigDbException;
+import com.att.nsa.configs.ConfigPath;
+import com.att.nsa.configs.confimpl.EncryptingLayer;
+import com.att.nsa.configs.confimpl.ZkConfigDb;
+import com.att.nsa.drumlin.till.data.rrConvertor;
+import com.att.nsa.drumlin.till.data.uniqueStringGenerator;
+import com.att.nsa.drumlin.till.nv.impl.nvWriteableTable;
+import com.att.nsa.security.db.BaseNsaApiDbImpl;
+import com.att.nsa.security.db.EncryptingApiDbImpl;
+import com.att.nsa.security.db.NsaApiDb.KeyExistsException;
+import com.att.nsa.security.db.simple.NsaSimpleApiKey;
+import com.att.nsa.security.db.simple.NsaSimpleApiKeyFactory;
+import com.att.nsa.util.NsaClock;
+
+public class ConfigTool extends CommandLineTool<ConfigToolContext>
+{
+ protected ConfigTool ()
+ {
+ super ( "Cambria API Config Tool", "cambriaConfig> " );
+
+ super.registerCommand ( new ListTopicCommand () );
+ super.registerCommand ( new WriteTopicCommand () );
+ super.registerCommand ( new ReadTopicCommand () );
+ super.registerCommand ( new SetTopicOwnerCommand () );
+ super.registerCommand ( new InitSecureTopicCommand () );
+ super.registerCommand ( new ListApiKeysCommand () );
+ super.registerCommand ( new PutApiCommand () );
+ super.registerCommand ( new writeApiKeyCommand () );
+ super.registerCommand ( new EncryptApiKeysCommand () );
+ super.registerCommand ( new DecryptApiKeysCommand () );
+ super.registerCommand ( new NodeFetchCommand () );
+ super.registerCommand ( new DropOldConsumerGroupsCommand () );
+ }
+
+ public static void main ( String[] args ) throws IOException
+ {
+ final String connStr = args.length>0 ? args[0] : "localhost:2181";
+ final ConfigDb db = new ZkConfigDb (
+ connStr,
+ args.length>1 ? args[1] : CommonServlet.getDefaultZkRoot ( "cambria" )
+ );
+
+ final ConfigToolContext context = new ConfigToolContext ( db, connStr, new nvWriteableTable() );
+ final ConfigTool ct = new ConfigTool ();
+ ct.runFromMain ( args, context );
+ }
+
+ private static class ListTopicCommand implements Command<ConfigToolContext>
+ {
+ @Override
+ public String[] getMatches ()
+ {
+ return new String[] { "topics", "list (\\S*)" };
+ }
+
+ @Override
+ public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
+ {
+ }
+
+ @Override
+ public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
+ {
+ try
+ {
+ final ConfigDb db = context.getDb();
+ final ConfigPath base = db.parse ( "/topics" );
+
+ if ( parts.length > 0 )
+ {
+ final ConfigPath myTopic = base.getChild ( parts[0] );
+ final String data = db.load ( myTopic );
+ if ( data != null )
+ {
+ out.println ( data );
+ }
+ else
+ {
+ out.println ( "No topic [" + parts[0] + "]" );
+ }
+ }
+ else
+ {
+ for ( ConfigPath child : db.loadChildrenNames ( base ) )
+ {
+ out.println ( child.getName () );
+ }
+ }
+ }
+ catch ( ConfigDbException e )
+ {
+ out.println ( "Command failed: " + e.getMessage() );
+ }
+ }
+
+ @Override
+ public void displayHelp ( PrintStream out )
+ {
+ out.println ( "topics" );
+ out.println ( "list <topic>" );
+ }
+ }
+
+ private static class WriteTopicCommand implements Command<ConfigToolContext>
+ {
+ @Override
+ public String[] getMatches ()
+ {
+ return new String[] { "write (\\S*) (\\S*)" };
+ }
+
+ @Override
+ public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
+ {
+ }
+
+ @Override
+ public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
+ {
+ try
+ {
+ final ConfigDb db = context.getDb();
+ final ConfigPath base = db.parse ( "/topics" );
+ final ConfigPath myTopic = base.getChild ( parts[0] );
+ db.store ( myTopic, parts[1] );
+ out.println ( "wrote [" + parts[1] + "] to topic [" + parts[0] + "]" );
+ }
+ catch ( ConfigDbException e )
+ {
+ out.println ( "Command failed: " + e.getMessage() );
+ }
+ }
+
+ @Override
+ public void displayHelp ( PrintStream out )
+ {
+ out.println ( "write <topic> <string>" );
+ out.println ( "\tBe careful with this. You can write data that's not compatible with Cambria's config db." );
+ }
+ }
+
+ private static class ReadTopicCommand implements Command<ConfigToolContext>
+ {
+ @Override
+ public String[] getMatches ()
+ {
+ return new String[] { "read (\\S*)" };
+ }
+
+ @Override
+ public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
+ {
+ }
+
+ @Override
+ public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
+ {
+ try
+ {
+ final ConfigDb db = context.getDb();
+ final ConfigPath base = db.parse ( "/topics" );
+ final ConfigPath myTopic = base.getChild ( parts[0] );
+ db.store ( myTopic, parts[1] );
+ out.println ( "wrote [" + parts[1] + "] to topic [" + parts[0] + "]" );
+ }
+ catch ( ConfigDbException e )
+ {
+ out.println ( "Command failed: " + e.getMessage() );
+ }
+ }
+
+ @Override
+ public void displayHelp ( PrintStream out )
+ {
+ out.println ( "read <topic>" );
+ out.println ( "\tRead config data for a topic." );
+ }
+ }
+
+ private static class InitSecureTopicCommand implements Command<ConfigToolContext>
+ {
+ @Override
+ public String[] getMatches ()
+ {
+ return new String[] { "initTopic (\\S*) (\\S*) (\\S*)" };
+ }
+
+ @Override
+ public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
+ {
+ }
+
+ @Override
+ public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
+ {
+ try
+ {
+ DMaaPKafkaMetaBroker.createTopicEntry ( context.getDb (),
+ context.getDb ().parse("/topics"), parts[0], parts[2], parts[1],true );
+ out.println ( "Topic [" + parts[0] + "] updated." );
+ }
+ catch ( ConfigDbException e )
+ {
+ out.println ( "Command failed: " + e.getMessage () );
+ }
+ }
+
+ @Override
+ public void displayHelp ( PrintStream out )
+ {
+ out.println ( "initTopic <topic> <ownerApiKey> <description>" );
+ }
+ }
+
+ private static class SetTopicOwnerCommand implements Command<ConfigToolContext>
+ {
+ @Override
+ public String[] getMatches ()
+ {
+ return new String[] { "setOwner (\\S*) (\\S*)" };
+ }
+
+ @Override
+ public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
+ {
+ }
+
+ @Override
+ public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
+ {
+ try
+ {
+ final Topic kt = DMaaPKafkaMetaBroker.getKafkaTopicConfig ( context.getDb(),
+ context.getDb().parse ( "/topics" ), parts[0] );
+ if ( kt != null )
+ {
+ final String desc = kt.getDescription ();
+
+ DMaaPKafkaMetaBroker.createTopicEntry ( context.getDb (),
+ context.getDb ().parse("/topics"), parts[0], desc, parts[1], true );
+ out.println ( "Topic [" + parts[0] + "] updated." );
+ }
+ else
+ {
+ out.println ( "Topic [" + parts[0] + "] doesn't exist." );
+ }
+ }
+ catch ( ConfigDbException e )
+ {
+ out.println ( "Command failed: " + e.getMessage () );
+ }
+ }
+
+ @Override
+ public void displayHelp ( PrintStream out )
+ {
+ out.println ( "setOwner <topic> <ownerApiKey>" );
+ }
+ }
+
+ private static class ListApiKeysCommand implements Command<ConfigToolContext>
+ {
+ @Override
+ public String[] getMatches ()
+ {
+ return new String[] { "listApiKeys", "listApiKey (\\S*) (\\S*) (\\S*)", "listApiKey (\\S*)" };
+ }
+
+ @Override
+ public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
+ {
+ }
+
+ @Override
+ public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
+ {
+ try
+ {
+ final ConfigDb db = context.getDb ();
+ if ( parts.length == 0 )
+ {
+ final BaseNsaApiDbImpl<NsaSimpleApiKey> readFrom = new BaseNsaApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory () );
+ int count = 0;
+ for ( String key : readFrom.loadAllKeys () )
+ {
+ out.println ( key );
+ count++;
+ }
+ out.println ( "" + count + " records." );
+ }
+ else
+ {
+ BaseNsaApiDbImpl<NsaSimpleApiKey> readFrom = new BaseNsaApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory () );
+ if ( parts.length == 3 )
+ {
+ readFrom = new EncryptingApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory (),
+ EncryptingLayer.readSecretKey ( parts[1] ), rrConvertor.base64Decode ( parts[2] ) );
+ }
+ final NsaSimpleApiKey apikey = readFrom.loadApiKey ( parts[0] );
+ if ( apikey == null )
+ {
+ out.println ( "Key '" + parts[0] + "' not found." );
+ }
+ else
+ {
+ out.println ( apikey.asJsonObject ().toString () );
+ }
+ }
+ }
+ catch ( ConfigDbException e )
+ {
+ out.println ( "Command failed: " + e.getMessage() );
+ }
+ catch ( JSONException e )
+ {
+ out.println ( "Command failed: " + e.getMessage() );
+ }
+ }
+
+ @Override
+ public void displayHelp ( PrintStream out )
+ {
+ out.println ( "listApiKeys" );
+ out.println ( "listApiKey <key>" );
+ out.println ( "listApiKey <key> <dbKey> <dbIv>" );
+ }
+ }
+
+ private static class PutApiCommand implements Command<ConfigToolContext>
+ {
+ @Override
+ public String[] getMatches ()
+ {
+ return new String[]
+ {
+ // these are <key> <enckey> <encinit> <value>
+ "putApiKey (secret) (\\S*) (\\S*) (\\S*) (\\S*)",
+ "putApiKey (email) (\\S*) (\\S*) (\\S*) (\\S*)",
+ "putApiKey (description) (\\S*) (\\S*) (\\S*) (\\S*)"
+ };
+ }
+
+ @Override
+ public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
+ {
+ }
+
+ @Override
+ public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
+ {
+ try
+ {
+ final ConfigDb db = context.getDb ();
+ if ( parts.length == 5 )
+ {
+ final BaseNsaApiDbImpl<NsaSimpleApiKey> apiKeyDb =
+ new EncryptingApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory (),
+ EncryptingLayer.readSecretKey ( parts[2] ), rrConvertor.base64Decode ( parts[3] ) );
+
+ final NsaSimpleApiKey apikey = apiKeyDb.loadApiKey ( parts[1] );
+ if ( apikey == null )
+ {
+ out.println ( "Key '" + parts[1] + "' not found." );
+ }
+ else
+ {
+ if ( parts[0].equalsIgnoreCase ( "secret" ) )
+ {
+ apikey.resetSecret ( parts[4] );
+ }
+ else if ( parts[0].equalsIgnoreCase ( "email" ) )
+ {
+ apikey.setContactEmail ( parts[4] );
+ }
+ else if ( parts[0].equalsIgnoreCase ( "description" ) )
+ {
+ apikey.setDescription ( parts[4] );
+ }
+
+ apiKeyDb.saveApiKey ( apikey );
+ out.println ( apikey.asJsonObject ().toString () );
+ }
+ }
+ }
+ catch ( ConfigDbException e )
+ {
+ out.println ( "Command failed: " + e.getMessage() );
+ }
+ catch ( JSONException e )
+ {
+ out.println ( "Command failed: " + e.getMessage() );
+ }
+ }
+
+ @Override
+ public void displayHelp ( PrintStream out )
+ {
+ out.println ( "putApiKey secret <apiKey> <dbKey> <dbIv> <newSecret>" );
+ out.println ( "putApiKey email <apiKey> <dbKey> <dbIv> <newEmail>" );
+ out.println ( "putApiKey description <apiKey> <dbKey> <dbIv> <newDescription>" );
+ }
+ }
+
+ private static class writeApiKeyCommand implements Command<ConfigToolContext>
+ {
+ @Override
+ public String[] getMatches ()
+ {
+ return new String[]
+ {
+ // <enckey> <encinit> <key> <secret>
+ "writeApiKey (\\S*) (\\S*) (\\S*) (\\S*)",
+ };
+ }
+
+ @Override
+ public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
+ {
+ }
+
+ @Override
+ public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
+ {
+ try
+ {
+ final ConfigDb db = context.getDb ();
+ if ( parts.length == 4 )
+ {
+ final BaseNsaApiDbImpl<NsaSimpleApiKey> apiKeyDb =
+ new EncryptingApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory (),
+ EncryptingLayer.readSecretKey ( parts[0] ), rrConvertor.base64Decode ( parts[1] ) );
+
+ apiKeyDb.deleteApiKey ( parts[2] );
+ final NsaSimpleApiKey apikey = apiKeyDb.createApiKey ( parts[2], parts[3] );
+ out.println ( apikey.asJsonObject ().toString () );
+ }
+ }
+ catch ( ConfigDbException e )
+ {
+ out.println ( "Command failed: " + e.getMessage() );
+ }
+ catch ( JSONException e )
+ {
+ out.println ( "Command failed: " + e.getMessage() );
+ }
+ catch ( KeyExistsException e )
+ {
+ out.println ( "Command failed: " + e.getMessage() );
+ }
+ }
+
+ @Override
+ public void displayHelp ( PrintStream out )
+ {
+ out.println ( "writeApiKey <dbKey> <dbIv> <newApiKey> <newSecret>" );
+ }
+ }
+
+ private static class EncryptApiKeysCommand implements Command<ConfigToolContext>
+ {
+ @Override
+ public String[] getMatches ()
+ {
+ return new String[] { "convertApiKeyDb", "convertApiKeyDb (\\S*) (\\S*)" };
+ }
+
+ @Override
+ public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
+ {
+ }
+
+ @Override
+ public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
+ {
+ try
+ {
+ final String key = parts.length == 2 ? parts[0] : EncryptingLayer.createSecretKey ();
+ final String iv = parts.length == 2 ? parts[1] : rrConvertor.base64Encode ( uniqueStringGenerator.createValue ( 16 ) );
+
+ // This doesn't do well when the number of API keys is giant...
+ if ( parts.length == 0 )
+ {
+ out.println ( "YOU MUST RECORD THESE VALUES AND USE THEM IN THE SERVER CONFIG" );
+ out.println ( "Key: " + key );
+ out.println ( " IV: " + iv );
+ out.println ( "\n" );
+ out.println ( "Call again with key and IV on command line." );
+ out.println ( "\n" );
+ return; // because otherwise the values get lost
+ }
+
+ final ConfigDb db = context.getDb ();
+ final BaseNsaApiDbImpl<NsaSimpleApiKey> readFrom = new BaseNsaApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory () );
+ final EncryptingApiDbImpl<NsaSimpleApiKey> writeTo = new EncryptingApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory (),
+ EncryptingLayer.readSecretKey ( key ), rrConvertor.base64Decode ( iv ) );
+
+ int count = 0;
+ for ( Entry<String, NsaSimpleApiKey> e : readFrom.loadAllKeyRecords ().entrySet () )
+ {
+ out.println ( "-------------------------------" );
+ out.println ( "Converting " + e.getKey () );
+ final String was = e.getValue ().asJsonObject ().toString ();
+ out.println ( was );
+
+ writeTo.saveApiKey ( e.getValue () );
+ count++;
+ }
+
+ out.println ( "Conversion complete, converted " + count + " records." );
+ }
+ catch ( ConfigDbException e )
+ {
+ out.println ( "Command failed: " + e.getMessage() );
+ }
+ catch ( NoSuchAlgorithmException e )
+ {
+ out.println ( "Command failed: " + e.getMessage() );
+ }
+ }
+
+ @Override
+ public void displayHelp ( PrintStream out )
+ {
+ out.println ( "convertApiKeyDb" );
+ out.println ( "\tconvert an API key DB to an encrypted DB and output the cipher details" );
+ }
+ }
+
+ private static class DecryptApiKeysCommand implements Command<ConfigToolContext>
+ {
+ @Override
+ public String[] getMatches ()
+ {
+ return new String[] { "revertApiKeyDb (\\S*) (\\S*)" };
+ }
+
+ @Override
+ public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
+ {
+ }
+
+ @Override
+ public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
+ {
+ try
+ {
+ final String keyStr = parts[0];
+ final String iv = parts[1];
+ final byte[] ivBytes = rrConvertor.base64Decode ( iv );
+
+ final ConfigDb db = context.getDb ();
+ final EncryptingApiDbImpl<NsaSimpleApiKey> readFrom = new EncryptingApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory (),
+ EncryptingLayer.readSecretKey ( keyStr ), ivBytes );
+ final BaseNsaApiDbImpl<NsaSimpleApiKey> writeTo = new BaseNsaApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory () );
+
+ int count = 0;
+ for ( String apiKey : readFrom.loadAllKeys () )
+ {
+ out.println ( "Converting " + apiKey );
+ final NsaSimpleApiKey record = readFrom.loadApiKey ( apiKey );
+ if ( record == null )
+ {
+ out.println ( "Couldn't load " + apiKey );
+ }
+ else
+ {
+ writeTo.saveApiKey ( record );
+ count++;
+ }
+ }
+ out.println ( "Conversion complete, converted " + count + " records." );
+ }
+ catch ( ConfigDbException e )
+ {
+ out.println ( "Command failed: " + e.getMessage() );
+ }
+ }
+
+ @Override
+ public void displayHelp ( PrintStream out )
+ {
+ out.println ( "revertApiKeyDb <keyBase64> <ivBase64>" );
+ out.println ( "\trevert an API key DB to a deencrypted DB" );
+ }
+ }
+
+ private static class NodeFetchCommand implements Command<ConfigToolContext>
+ {
+ @Override
+ public String[] getMatches ()
+ {
+ return new String[] { "node (\\S*)" };
+ }
+
+ @Override
+ public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
+ {
+ }
+
+ @Override
+ public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
+ {
+ try
+ {
+ final String node = parts[0];
+
+ final ConfigDb db = context.getDb ();
+ final ConfigPath cp = db.parse ( node );
+
+ boolean doneOne = false;
+ for ( ConfigPath child : db.loadChildrenNames ( cp ) )
+ {
+ out.println ( "\t- " + child.getName () );
+ doneOne = true;
+ }
+ if ( doneOne )
+ {
+ out.println ();
+ }
+ else
+ {
+ out.println ( "(No child nodes of '" + node + "')" );
+ }
+
+ final String val = db.load ( cp );
+ if ( val == null )
+ {
+ out.println ( "(No data at '" + node + "')" );
+ }
+ else
+ {
+ out.println ( val );
+ }
+ }
+ catch ( ConfigDbException e )
+ {
+ out.println ( "Command failed: " + e.getMessage() );
+ }
+ catch ( IllegalArgumentException e )
+ {
+ out.println ( "Command failed: " + e.getMessage() );
+ }
+ }
+
+ @Override
+ public void displayHelp ( PrintStream out )
+ {
+ out.println ( "node <nodeName>" );
+ out.println ( "\tread a config db node" );
+ }
+ }
+
+ private static class DropOldConsumerGroupsCommand implements Command<ConfigToolContext>
+ {
+ private final long kMaxRemovals = 500;
+
+ @Override
+ public String[] getMatches ()
+ {
+ return new String[] { "(dropOldConsumers) (\\S*)", "(showOldConsumers) (\\S*)" };
+ }
+
+ @Override
+ public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException
+ {
+ }
+
+ @Override
+ public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException
+ {
+ try
+ {
+ final boolean runDrops = parts[0].equalsIgnoreCase ( "dropOldConsumers" );
+ final String maxAgeInDaysStr = parts[1];
+ final int maxAgeInDays = Integer.parseInt ( maxAgeInDaysStr );
+ final long oldestEpochSecs = ( NsaClock.now () / 1000 ) - ( 24 * 60 * 60 * maxAgeInDays );
+
+ out.println ( "Dropping consumer groups older than " + new Date ( oldestEpochSecs * 1000 ) );
+
+ final ConfigDb db = context.getDb ();
+
+ // kafka updates consumer partition records in ZK each time a message
+ // is served. we can determine which consumers are old based on a lack
+ // of update to the partition entries
+ // (see https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper)
+
+ // kafka only works with ZK, and our configDb was constructed with a non-kafka
+ // root node. We have to switch it to get to the right content...
+ if ( ! ( db instanceof ZkConfigDb ) )
+ {
+ throw new ConfigDbException ( "You can only show/drop old consumers against a ZK config db." );
+ }
+
+ final ZkConfigDb newZkDb = new ZkConfigDb ( context.getConnectionString (), "" );
+ long cgCount = 0;
+
+ final LinkedList<ConfigPath> removals = new LinkedList<ConfigPath> ();
+ for ( ConfigPath consumerGroupName : newZkDb.loadChildrenNames ( newZkDb.parse ( "/consumers" ) ) )
+ {
+ cgCount++;
+ if ( cgCount % 500 == 0 )
+ {
+ out.println ( "" + cgCount + " groups examined" );
+ }
+
+ boolean foundAnything = false;
+ boolean foundRecentUse = false;
+ long mostRecent = -1;
+
+ // each consumer group has an "offsets" entry, which contains 0 or more topic entries.
+ // each topic contains partition nodes.
+ for ( ConfigPath topic : newZkDb.loadChildrenNames ( consumerGroupName.getChild ( "offsets" ) ) )
+ {
+ for ( ConfigPath offset : newZkDb.loadChildrenNames ( topic ) )
+ {
+ foundAnything = true;
+
+ final long modTime = newZkDb.getLastModificationTime ( offset );
+ mostRecent = Math.max ( mostRecent, modTime );
+
+ foundRecentUse = ( modTime > oldestEpochSecs );
+ if ( foundRecentUse ) break;
+ }
+ if ( foundRecentUse ) break;
+ }
+
+ // decide if this consumer group is old
+ out.println ( "Group " + consumerGroupName.getName () + " was most recently used " + new Date ( mostRecent*1000 ) );
+ if ( foundAnything && !foundRecentUse )
+ {
+ removals.add ( consumerGroupName );
+ }
+
+ if ( removals.size () >= kMaxRemovals )
+ {
+ break;
+ }
+ }
+
+ // removals
+ for ( ConfigPath consumerGroupName : removals )
+ {
+ out.println ( "Group " + consumerGroupName.getName () + " has no recent activity." );
+ if ( runDrops )
+ {
+ out.println ( "Removing group " + consumerGroupName.getName () + "..." );
+ newZkDb.clear ( consumerGroupName );
+ }
+ }
+ }
+ catch ( ConfigDbException e )
+ {
+ out.println ( "Command failed: " + e.getMessage() );
+ }
+ catch ( NumberFormatException e )
+ {
+ out.println ( "Command failed: " + e.getMessage() );
+ }
+ catch ( JSONException e )
+ {
+ out.println ( "Command failed: " + e.getMessage() );
+ }
+ }
+
+ @Override
+ public void displayHelp ( PrintStream out )
+ {
+ out.println ( "showOldConsumers <minAgeInDays>" );
+ out.println ( "dropOldConsumers <minAgeInDays>" );
+ out.println ( "\tDrop (or just show) any consumer group that has been inactive longer than <minAgeInDays> days." );
+ out.println ();
+ out.println ( "\tTo be safe, <minAgeInDays> should be much higher than the maximum storage time on the Kafka topics." );
+ out.println ( "\tA very old consumer will potentially miss messages, but will resume at the oldest message, while a" );
+ out.println ( "\tdeleted consumer will start at the current message if it ever comes back." );
+ out.println ();
+ out.println ( "\tNote that show/drops are limited to " + kMaxRemovals + " records per invocation." );
+ }
+ }
+}
diff --git a/src/main/java/com/att/nsa/dmaap/tools/ConfigToolContext.java b/src/main/java/com/att/nsa/dmaap/tools/ConfigToolContext.java
new file mode 100644
index 0000000..bb44d1f
--- /dev/null
+++ b/src/main/java/com/att/nsa/dmaap/tools/ConfigToolContext.java
@@ -0,0 +1,69 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.dmaap.tools;
+
+import com.att.nsa.cambria.beans.DMaaPMetricsSet;
+import com.att.nsa.cmdtool.CommandContext;
+import com.att.nsa.configs.ConfigDb;
+import com.att.nsa.drumlin.till.nv.rrNvReadable;
+
+public class ConfigToolContext implements CommandContext
+{
+ public ConfigToolContext ( ConfigDb db, String connStr, rrNvReadable cs )
+ {
+ fDb = db;
+ fConnStr = connStr;
+ fMetrics = new DMaaPMetricsSet( cs );
+ }
+
+ @Override
+ public void requestShutdown ()
+ {
+ fQuit = true;
+ }
+
+ @Override
+ public boolean shouldContinue ()
+ {
+ return !fQuit;
+ }
+
+ public ConfigDb getDb ()
+ {
+ return fDb;
+ }
+
+ public String getConnectionString ()
+ {
+ return fConnStr;
+ }
+
+ public DMaaPMetricsSet getMetrics ()
+ {
+ return fMetrics;
+ }
+
+ private final ConfigDb fDb;
+ private final String fConnStr;
+ private boolean fQuit = false;
+ private DMaaPMetricsSet fMetrics;
+}
diff --git a/src/main/java/com/att/nsa/dmaap/util/ContentLengthInterceptor.java b/src/main/java/com/att/nsa/dmaap/util/ContentLengthInterceptor.java
new file mode 100644
index 0000000..fe1c768
--- /dev/null
+++ b/src/main/java/com/att/nsa/dmaap/util/ContentLengthInterceptor.java
@@ -0,0 +1,132 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.dmaap.util;
+
+import java.util.Map;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.http.HttpStatus;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.springframework.stereotype.Component;
+import com.att.nsa.cambria.CambriaApiException;
+import com.att.nsa.cambria.exception.DMaaPResponseCode;
+import com.att.nsa.cambria.exception.ErrorResponse;
+import ajsc.beans.interceptors.AjscInterceptor;
+
+/**
+ * AJSC Intercepter implementation of ContentLengthFilter
+ */
+@Component
+public class ContentLengthInterceptor implements AjscInterceptor{
+
+
+ private String defLength;
+ //private Logger log = Logger.getLogger(ContentLengthInterceptor.class.toString());
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(ContentLengthInterceptor.class);
+
+
+ /**
+ * Intercepter method to intercept requests before processing
+ */
+ @Override
+ public boolean allowOrReject(HttpServletRequest httpservletrequest, HttpServletResponse httpservletresponse,
+ Map map) throws Exception {
+
+ log.info("inside Interceptor allowOrReject content length checking before pub/sub");
+
+ JSONObject jsonObj = null;
+ int requestLength = 0;
+ setDefLength(System.getProperty("maxcontentlength"));
+ try {
+ // retrieving content length from message header
+
+ if (null != httpservletrequest.getHeader("Content-Length")) {
+ requestLength = Integer.parseInt(httpservletrequest.getHeader("Content-Length"));
+ }
+ // retrieving encoding from message header
+ String transferEncoding = httpservletrequest.getHeader("Transfer-Encoding");
+ // checking for no encoding, chunked and requestLength greater then
+ // default length
+ if (null != transferEncoding && !(transferEncoding.contains("chunked"))
+ && (requestLength > Integer.parseInt(getDefLength()))) {
+ jsonObj = new JSONObject().append("defaultlength", getDefLength())
+ .append("requestlength", requestLength);
+ log.error("message length is greater than default");
+ throw new CambriaApiException(jsonObj);
+ }
+ else if (null == transferEncoding && (requestLength > Integer.parseInt(getDefLength())))
+ {
+ jsonObj = new JSONObject().append("defaultlength", getDefLength()).append(
+ "requestlength", requestLength);
+ log.error("Request message is not chunked or request length is greater than default length");
+ throw new CambriaApiException(jsonObj);
+
+
+ }
+ else
+ {
+ //chain.doFilter(req, res);
+ return true;
+ }
+
+ } catch (CambriaApiException | NumberFormatException | JSONException e) {
+
+ log.info("Exception obj--"+e);
+ log.error("message size is greater then default"+e.getMessage());
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_REQUEST_TOO_LONG,
+ DMaaPResponseCode.MSG_SIZE_EXCEEDS_MSG_LIMIT.getResponseCode(), System.getProperty("msg_size_exceeds")
+ + jsonObj.toString());
+ log.info(errRes.toString());
+
+
+ map.put(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,"test");
+ httpservletresponse.setStatus(HttpStatus.SC_REQUEST_TOO_LONG);
+ httpservletresponse.getOutputStream().write(errRes.toString().getBytes());
+ return false;
+ }
+
+
+
+ }
+
+
+ /**
+ * Get Default Content Length
+ * @return defLength
+ */
+ public String getDefLength() {
+ return defLength;
+ }
+ /**
+ * Set Default Content Length
+ * @param defLength
+ */
+ public void setDefLength(String defLength) {
+ this.defLength = defLength;
+ }
+
+
+
+}
diff --git a/src/main/java/com/att/nsa/dmaap/util/DMaaPAuthFilter.java b/src/main/java/com/att/nsa/dmaap/util/DMaaPAuthFilter.java
new file mode 100644
index 0000000..ae79938
--- /dev/null
+++ b/src/main/java/com/att/nsa/dmaap/util/DMaaPAuthFilter.java
@@ -0,0 +1,164 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.dmaap.util;
+
+import java.io.IOException;
+
+import javax.servlet.FilterChain;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServletRequest;
+
+//import com.att.eelf.configuration.EELFLogger;
+//import com.att.eelf.configuration.EELFManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import com.att.cadi.filter.CadiFilter;
+import javax.servlet.FilterConfig;
+
+/**
+ * This is a Servlet Filter class overriding the AjscCadiFilter
+ */
+@Component
+public class DMaaPAuthFilter extends CadiFilter {
+
+ // private Logger log = Logger.getLogger(DMaaPAuthFilter.class.toString());
+
+ // private static final EELFLogger log =
+ // EELFManager.getInstance().getLogger(DMaaPAuthFilter.class);
+ private Logger log = LoggerFactory.getLogger(DMaaPAuthFilter.class);
+
+ final Boolean enabled = "authentication-scheme-1".equalsIgnoreCase(System.getProperty("CadiAuthN"));
+
+ /**
+ * This method will disable Cadi Authentication if cambria headers are
+ * present in the request else continue with Cadi Authentication
+ */
+ public void init(FilterConfig filterConfig) throws ServletException {
+
+ try {
+
+ super.init(filterConfig);
+
+ } catch (Exception ex) {
+ log.error("Ajsc Cadi Filter Exception:" + ex.getMessage());
+
+ }
+ }
+
+ @Override
+ public void doFilter(ServletRequest req, ServletResponse res, FilterChain chain)
+ throws IOException, ServletException {
+
+ log.info("inside servlet filter Cambria Auth Headers checking before doing other Authentication");
+ HttpServletRequest request = (HttpServletRequest) req;
+
+ boolean forceAAF = Boolean.valueOf(System.getProperty("forceAAF"));
+ if (forceAAF ||
+ null != request.getHeader("Authorization") ||
+ (null != request.getHeader("AppName") &&
+ request.getHeader("AppName").equalsIgnoreCase("invenio") &&
+ null != request.getHeader("cookie"))) {
+
+ if (!enabled ||
+ request.getMethod().equalsIgnoreCase("head") ||
+ request.getHeader("DME2HealthCheck") != null) {
+
+ chain.doFilter(req, res);
+
+ } else {
+
+ super.doFilter(req, res, chain);
+
+ }
+ } else {
+
+ System.setProperty("CadiAuthN", "authentication-scheme-2");
+ chain.doFilter(req, res);
+
+ }
+
+ }
+
+ @Override
+ public void log(Exception e, Object... elements) {
+ // TODO Auto-generated method stub
+ // super.log(e, elements);
+ // System.out.println(convertArrayToString(elements));
+ log.error(convertArrayToString(elements), e);
+
+ }
+
+ @Override
+ public void log(Level level, Object... elements) {
+
+ // System.out.println(willWrite().compareTo(level) );
+ if (willWrite().compareTo(level) <= 0) {
+ switch (level) {
+ case DEBUG:
+ log.debug(convertArrayToString(elements));
+ break;
+ case INFO:
+ log.info(convertArrayToString(elements));
+ break;
+ case ERROR:
+ log.error(convertArrayToString(elements));
+ break;
+ case AUDIT:
+ log.info(convertArrayToString(elements));
+ break;
+ case INIT:
+ log.info(convertArrayToString(elements));
+ break;
+ case WARN:
+ log.warn(convertArrayToString(elements));
+ break;
+ default:
+
+ log.warn(convertArrayToString(elements));
+
+ }
+
+ }
+
+ }
+
+ private String convertArrayToString(Object[] elements) {
+
+ StringBuilder strBuilder = new StringBuilder();
+ for (int i = 0; i < elements.length; i++) {
+ if (elements[i] instanceof String)
+ strBuilder.append((String) elements[i]);
+ else if (elements[i] instanceof Integer)
+ strBuilder.append((Integer) elements[i]);
+ else
+ strBuilder.append(elements[i]);
+ }
+ String newString = strBuilder.toString();
+ return newString;
+ }
+
+}
diff --git a/src/main/java/com/att/nsa/dmaap/util/ServicePropertiesMapBean.java b/src/main/java/com/att/nsa/dmaap/util/ServicePropertiesMapBean.java
new file mode 100644
index 0000000..c5173c1
--- /dev/null
+++ b/src/main/java/com/att/nsa/dmaap/util/ServicePropertiesMapBean.java
@@ -0,0 +1,41 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.dmaap.util;
+
+import com.att.nsa.dmaap.filemonitor.ServicePropertiesMap;
+
+/**
+ * Class ServicePropertiesMapBean
+ * @author author
+ *
+ */
+public class ServicePropertiesMapBean {
+ /**
+ * get property
+ * @param propFileName propFileName
+ * @param propertyKey propertyKey
+ * @return str
+ */
+ public static String getProperty(String propFileName, String propertyKey) {
+ return ServicePropertiesMap.getProperty(propFileName, propertyKey);
+ }
+}