diff options
author | Varun Gudisena <vg411h@att.com> | 2017-08-31 10:56:56 -0500 |
---|---|---|
committer | Varun Gudisena <vg411h@att.com> | 2017-08-31 10:57:12 -0500 |
commit | acc3ce02997219825091a2e4ed7fd493f2d440b2 (patch) | |
tree | de1db5979894f0dadd6752ae1841fdeb743a5830 /src/main/java/com/att/nsa | |
parent | 3306e2f9cc17833d5816936e0ea2973d9013b00e (diff) |
Revert package name changes
Reverted package name changes to avoid any potential issues. Renamed maven
group id only.
Issue-id: DMAAP-74
Change-Id: I4ca4537d48e5723b2939148e5bd83645ee20dd30
Signed-off-by: Varun Gudisena <vg411h@att.com>
Diffstat (limited to 'src/main/java/com/att/nsa')
25 files changed, 5605 insertions, 0 deletions
diff --git a/src/main/java/com/att/nsa/dmaap/DMaaPCambriaExceptionMapper.java b/src/main/java/com/att/nsa/dmaap/DMaaPCambriaExceptionMapper.java new file mode 100644 index 0000000..53c3bed --- /dev/null +++ b/src/main/java/com/att/nsa/dmaap/DMaaPCambriaExceptionMapper.java @@ -0,0 +1,143 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.dmaap; + + +import javax.inject.Singleton; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.Response; +import javax.ws.rs.ext.ExceptionMapper; +import javax.ws.rs.ext.Provider; + +import org.apache.http.HttpStatus; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import org.springframework.beans.factory.annotation.Autowired; + +import com.att.nsa.cambria.CambriaApiException; +import com.att.nsa.cambria.exception.DMaaPErrorMessages; +import com.att.nsa.cambria.exception.DMaaPResponseCode; +import com.att.nsa.cambria.exception.ErrorResponse; + +/** + * Exception Mapper class to handle + * CambriaApiException + * @author author + * + */ +@Provider +@Singleton +public class DMaaPCambriaExceptionMapper implements ExceptionMapper<CambriaApiException>{ + +/** + * Error response obj + */ + private ErrorResponse errRes; + +/** + * Logger obj + */ + + + private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(DMaaPCambriaExceptionMapper.class); + + + /** + * Error msg obj + */ + @Autowired + private DMaaPErrorMessages msgs; + + /** + * HttpServletRequest obj + */ + @Context + private HttpServletRequest req; + + /** + * HttpServletResponse obj + */ + @Context + private HttpServletResponse res; + + /** + * Contructor for DMaaPCambriaExceptionMapper + */ + public DMaaPCambriaExceptionMapper() { + super(); + LOGGER.info("Cambria Exception Mapper Created.."); + } + + /** + * The toResponse method is called when + * an exception of type CambriaApiException + * is thrown.This method will send a custom error + * response to the client. + */ + @Override + public Response toResponse(CambriaApiException ex) { + + LOGGER.info("Reached Cambria Exception Mapper.."); + + /** + * Cambria Generic Exception + */ + if(ex instanceof CambriaApiException) + { + + errRes = ex.getErrRes(); + if(errRes!=null) { + + Response response = Response.status(errRes.getHttpStatusCode()).header("exception", + errRes.getErrMapperStr()).build(); + + return response; + } + else + { + + Response response = Response.status(ex.getStatus()).header("exception", + ex.getMessage()).build(); + + return response; + } + + + } + else + { + errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, + DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(), msgs.getServerUnav()); + + Response response = Response.status(errRes.getHttpStatusCode()).header("exception", + errRes.getErrMapperStr()).build(); + + return response; + } + + } + + + +} diff --git a/src/main/java/com/att/nsa/dmaap/DMaaPWebExceptionMapper.java b/src/main/java/com/att/nsa/dmaap/DMaaPWebExceptionMapper.java new file mode 100644 index 0000000..7a9d0ba --- /dev/null +++ b/src/main/java/com/att/nsa/dmaap/DMaaPWebExceptionMapper.java @@ -0,0 +1,202 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.dmaap; + + +import javax.inject.Singleton; +import javax.ws.rs.BadRequestException; +import javax.ws.rs.InternalServerErrorException; +import javax.ws.rs.NotAllowedException; +import javax.ws.rs.NotAuthorizedException; +import javax.ws.rs.NotFoundException; +import javax.ws.rs.ServiceUnavailableException; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Response; +import javax.ws.rs.ext.ExceptionMapper; +import javax.ws.rs.ext.Provider; + +import org.apache.http.HttpStatus; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import org.springframework.beans.factory.annotation.Autowired; + +import com.att.nsa.cambria.exception.DMaaPErrorMessages; +import com.att.nsa.cambria.exception.DMaaPResponseCode; +import com.att.nsa.cambria.exception.ErrorResponse; + +/** + * Exception Mapper class to handle + * Web Exceptions + * @author author + * + */ +@Provider +@Singleton +public class DMaaPWebExceptionMapper implements ExceptionMapper<WebApplicationException>{ + + /** + * Logger obj + */ + + private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(DMaaPWebExceptionMapper.class); + /** + * Error response obj + */ + private ErrorResponse errRes; + /** + * Error msg obj + */ + @Autowired + private DMaaPErrorMessages msgs; + + /** + * Contructor for DMaaPWebExceptionMapper + */ + public DMaaPWebExceptionMapper() { + super(); + LOGGER.info("WebException Mapper Created.."); + } + + /** + * The toResponse method is called when + * an exception of type WebApplicationException + * is thrown.This method will send a custom error + * response to the client + */ + @Override + public Response toResponse(WebApplicationException ex) { + + LOGGER.info("Reached WebException Mapper"); + + /** + * Resource Not Found + */ + if(ex instanceof NotFoundException) + { + errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,DMaaPResponseCode.RESOURCE_NOT_FOUND. + getResponseCode(),msgs.getNotFound()); + + LOGGER.info(errRes.toString()); + Response response = Response.status(errRes.getHttpStatusCode()).header("exception", + errRes.getErrMapperStr()).build(); + + return response; + + } + /** + * Internal Server Error + */ + if(ex instanceof InternalServerErrorException) + { + + int errCode = HttpStatus.SC_INTERNAL_SERVER_ERROR; + int dmaapErrCode = DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(); + String errMsg = msgs.getServerUnav(); + + + if(ex.getCause().toString().contains("Json")) { + errCode = HttpStatus.SC_BAD_REQUEST; + dmaapErrCode = DMaaPResponseCode.INCORRECT_JSON.getResponseCode(); + errMsg = ex.getCause().getMessage().substring(0, ex.getCause().getMessage().indexOf("[Source")-3); + } + else if (ex.getCause().toString().contains("UnrecognizedPropertyException")) { + errCode = HttpStatus.SC_BAD_REQUEST; + dmaapErrCode = DMaaPResponseCode.INCORRECT_JSON.getResponseCode(); + errMsg = ex.getCause().getMessage().substring(0, ex.getCause().getMessage().indexOf("[Source")-3); + } + errRes = new ErrorResponse(errCode,dmaapErrCode,errMsg); + + LOGGER.info(errRes.toString()); + Response response = Response.status(errRes.getHttpStatusCode()).header("exception", + errRes.getErrMapperStr()).build(); + + return response; + + } + /** + * UnAuthorized + */ + if(ex instanceof NotAuthorizedException) + { + errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,DMaaPResponseCode.ACCESS_NOT_PERMITTED. + getResponseCode(),msgs.getAuthFailure()); + + LOGGER.info(errRes.toString()); + Response response = Response.status(errRes.getHttpStatusCode()).header("exception", + errRes.getErrMapperStr()).build(); + + return response; + } + /** + * Malformed request + */ + if(ex instanceof BadRequestException) + { + errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,DMaaPResponseCode.INCORRECT_JSON. + getResponseCode(),msgs.getBadRequest()); + + LOGGER.info(errRes.toString()); + Response response = Response.status(errRes.getHttpStatusCode()).header("exception", + errRes.getErrMapperStr()).build(); + + return response; + } + /** + * HTTP Method not allowed + */ + if(ex instanceof NotAllowedException) + { + errRes = new ErrorResponse(HttpStatus.SC_METHOD_NOT_ALLOWED,DMaaPResponseCode.METHOD_NOT_ALLOWED. + getResponseCode(),msgs.getMethodNotAllowed()); + + LOGGER.info(errRes.toString()); + Response response = Response.status(errRes.getHttpStatusCode()).header("exception", + errRes.getErrMapperStr()).build(); + + return response; + } + + /** + * Server unavailable + */ + if(ex instanceof ServiceUnavailableException) + { + errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,DMaaPResponseCode.SERVER_UNAVAILABLE. + getResponseCode(),msgs.getServerUnav()); + + LOGGER.info(errRes.toString()); + Response response = Response.status(errRes.getHttpStatusCode()).header("exception", + errRes.getErrMapperStr()).build(); + + return response; + } + + + + return Response.serverError().build(); + } + + + + +} + diff --git a/src/main/java/com/att/nsa/dmaap/HelloWorld.java b/src/main/java/com/att/nsa/dmaap/HelloWorld.java new file mode 100644 index 0000000..7dc2e0c --- /dev/null +++ b/src/main/java/com/att/nsa/dmaap/HelloWorld.java @@ -0,0 +1,42 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.dmaap; + +import org.apache.camel.Exchange; + +/** + * Hello World Sample Camel Service + * @author author + * + */ +public class HelloWorld { + public HelloWorld () { + } + /** + * speak method + * @param e exchange + */ + public final void speak(Exchange e) { + e.setOut(e.getIn()); + e.getOut().setBody("Hello World!"); + } +}
\ No newline at end of file diff --git a/src/main/java/com/att/nsa/dmaap/JaxrsEchoService.java b/src/main/java/com/att/nsa/dmaap/JaxrsEchoService.java new file mode 100644 index 0000000..9fcef98 --- /dev/null +++ b/src/main/java/com/att/nsa/dmaap/JaxrsEchoService.java @@ -0,0 +1,91 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.dmaap; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + + +import com.att.ajsc.beans.PropertiesMapBean; +import com.att.nsa.dmaap.filemonitor.ServicePropertiesMap; + +/** + * Example JAX-RS Service + * @author author + * + */ +@Path("/jaxrs-services") +public class JaxrsEchoService { + + /** + * Logger obj + */ + /*private static final Logger LOGGER = Logger + .getLogger(JaxrsEchoService.class);*/ + + private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(JaxrsEchoService.class); + + /** + * Method ping + * @param input input + * @return str + */ + @GET + @Path("/echo/{input}") + @Produces("text/plain") + public String ping(@PathParam("input") String input) { + return "Hello, " + input + "."; + } + + /** + * Method to fetch property + * @param fileName file + * @param input input + * @return prop + */ + @GET + @Path("/property/{fileName}/{input:.*}") + @Produces("text/plain") + public String getProperty(@PathParam("fileName") String fileName, @PathParam("input") String input) { + String val=null; + try { + val = ServicePropertiesMap.getProperty(fileName, input); + if(val == null || val.isEmpty() || val.length() < 1){ + val = PropertiesMapBean.getProperty(fileName, input); + } + } + catch(Exception ex) { + LOGGER.info("*** Error retrieving property "+input+": "+ex); + + } + if (val ==null) { + return "Property is not available"; + } + return "Property value is, " + val +"."; + } + +}
\ No newline at end of file diff --git a/src/main/java/com/att/nsa/dmaap/JaxrsUserService.java b/src/main/java/com/att/nsa/dmaap/JaxrsUserService.java new file mode 100644 index 0000000..2724a51 --- /dev/null +++ b/src/main/java/com/att/nsa/dmaap/JaxrsUserService.java @@ -0,0 +1,59 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.dmaap; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import java.util.Map; +import java.util.HashMap; + +/** + * Example JAX-RS User Service + * @author author + * + */ +@Path("/user") +public class JaxrsUserService { + + private static final Map<String,String> userIdToNameMap; + static { + userIdToNameMap = new HashMap<String,String>(); + userIdToNameMap.put("user1","User One"); + userIdToNameMap.put("user2","User Two"); + } + + /** + * Method to fetch user details + * @param userId user + * @return userDetails + */ + @GET + @Path("/{userId}") + @Produces("text/plain") + public String lookupUser(@PathParam("userId") String userId) { + String name = userIdToNameMap.get(userId); + return name != null ? name : "unknown id"; + } + +}
\ No newline at end of file diff --git a/src/main/java/com/att/nsa/dmaap/filemonitor/ServicePropertiesListener.java b/src/main/java/com/att/nsa/dmaap/filemonitor/ServicePropertiesListener.java new file mode 100644 index 0000000..8333332 --- /dev/null +++ b/src/main/java/com/att/nsa/dmaap/filemonitor/ServicePropertiesListener.java @@ -0,0 +1,42 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.dmaap.filemonitor; + +import java.io.File; + +//import com.att.ssf.filemonitor.FileChangedListener; +/** + * Class ServicePropertiesListener + * @author author + * + */ +public class ServicePropertiesListener /*implements FileChangedListener*/ { + + /** + * Update method + */ + //@Override + public void update(File file) throws Exception + { + ServicePropertiesMap.refresh(file); + } +} diff --git a/src/main/java/com/att/nsa/dmaap/filemonitor/ServicePropertiesMap.java b/src/main/java/com/att/nsa/dmaap/filemonitor/ServicePropertiesMap.java new file mode 100644 index 0000000..731428d --- /dev/null +++ b/src/main/java/com/att/nsa/dmaap/filemonitor/ServicePropertiesMap.java @@ -0,0 +1,126 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.dmaap.filemonitor; + +import java.io.File; +import java.io.FileInputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * ServicePropertiesMap class + * @author author + * + */ +@SuppressWarnings("squid:S1118") +public class ServicePropertiesMap +{ + private static HashMap<String, HashMap<String, String>> mapOfMaps = + new HashMap<String, HashMap<String, String>>(); +// static final Logger logger = LoggerFactory.getLogger(ServicePropertiesMap.class); + + private static final EELFLogger logger = EELFManager.getInstance().getLogger(ServicePropertiesMap.class); + /** + * refresh method + * @param file file + * @throws Exception ex + */ + public static void refresh(File file) throws Exception + { + try + { + logger.info("Loading properties - " + (file != null?file.getName():"")); + + //Store .json & .properties files into map of maps + String filePath = file.getPath(); + + if(filePath.lastIndexOf(".json")>0){ + + ObjectMapper om = new ObjectMapper(); + TypeReference<HashMap<String, String>> typeRef = + new TypeReference<HashMap<String, String>>() {}; + HashMap<String, String> propMap = om.readValue(file, typeRef); + HashMap<String, String> lcasePropMap = new HashMap<String, String>(); + for (String key : propMap.keySet() ) + { + String lcaseKey = ifNullThenEmpty(key); + lcasePropMap.put(lcaseKey, propMap.get(key)); + } + + mapOfMaps.put(file.getName(), lcasePropMap); + + + }else if(filePath.lastIndexOf(".properties")>0){ + Properties prop = new Properties(); + FileInputStream fis = new FileInputStream(file); + prop.load(fis); + + @SuppressWarnings("unchecked") + HashMap<String, String> propMap = new HashMap<String, String>((Map)prop); + + mapOfMaps.put(file.getName(), propMap); + } + + logger.info("File - " + file.getName() + " is loaded into the map and the " + + "corresponding system properties have been refreshed"); + } + catch (Exception e) + { + logger.error("File " + (file != null?file.getName():"") + " cannot be loaded into the map ", e); + throw new Exception("Error reading map file " + (file != null?file.getName():""), e); + } + } + /** + * Get property + * @param fileName fileName + * @param propertyKey propertyKey + * @return str + */ + public static String getProperty(String fileName, String propertyKey) + { + HashMap<String, String> propMap = mapOfMaps.get(fileName); + return propMap!=null?propMap.get(ifNullThenEmpty(propertyKey)):""; + } + /** + * get properties + * @param fileName fileName + * @return mapProp + */ + public static HashMap<String, String> getProperties(String fileName){ + return mapOfMaps.get(fileName); + } + + private static String ifNullThenEmpty(String key) { + if (key == null) { + return ""; + } else { + return key; + } + } + +} diff --git a/src/main/java/com/att/nsa/dmaap/filemonitor/ServicePropertyService.java b/src/main/java/com/att/nsa/dmaap/filemonitor/ServicePropertyService.java new file mode 100644 index 0000000..e4f4e03 --- /dev/null +++ b/src/main/java/com/att/nsa/dmaap/filemonitor/ServicePropertyService.java @@ -0,0 +1,164 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.dmaap.filemonitor; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import javax.annotation.PostConstruct; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + + +//import com.att.ssf.filemonitor.FileChangedListener; +//import com.att.ssf.filemonitor.FileMonitor; + +/** + * ServicePropertyService class + * @author author + * + */ +public class ServicePropertyService { + private boolean loadOnStartup; + private ServicePropertiesListener fileChangedListener; + private ServicePropertiesMap filePropertiesMap; + private String ssfFileMonitorPollingInterval; + private String ssfFileMonitorThreadpoolSize; + private List<File> fileList; + private static final String FILE_CHANGE_LISTENER_LOC = System + .getProperty("AJSC_CONF_HOME") + "/etc"; + private static final String USER_CONFIG_FILE = "service-file-monitor.properties"; + + private static final EELFLogger logger = EELFManager.getInstance().getLogger(ServicePropertyService.class); + + // do not remove the postConstruct annotation, init method will not be + // called after constructor + /** + * Init method + * @throws Exception ex + */ + @PostConstruct + public void init() throws Exception { + + try { + getFileList(FILE_CHANGE_LISTENER_LOC); + +// for (File file : fileList) { +// FileChangedListener fileChangedListener = this.fileChangedListener; +// Object filePropertiesMap = this.filePropertiesMap; +// Method m = filePropertiesMap.getClass().getMethod( +// "refresh", File.class); +// m.invoke(filePropertiesMap, file); +// FileMonitor fm = FileMonitor.getInstance(); +// fm.addFileChangedListener(file, fileChangedListener, +// loadOnStartup); +// +// } + } catch (Exception ex) { + logger.error("Error creating property map ", ex); + } + + } + + private void getFileList(String dirName) throws IOException { + File directory = new File(dirName); + FileInputStream fis = null; + + if (fileList == null) + fileList = new ArrayList<File>(); + + // get all the files that are ".json" or ".properties", from a directory + // & it's sub-directories + File[] fList = directory.listFiles(); + + for (File file : fList) { + // read service property files from the configuration file + if (file.isFile() && file.getPath().endsWith(USER_CONFIG_FILE)) { + try { + fis = new FileInputStream(file); + Properties prop = new Properties(); + prop.load(fis); + + for (String filePath : prop.stringPropertyNames()) { + fileList.add(new File(prop.getProperty(filePath))); + } + } catch (Exception ioe) { + logger.error("Error reading the file stream ", ioe); + } finally { + fis.close(); + } + } else if (file.isDirectory()) { + getFileList(file.getPath()); + } + } + + } + + public void setLoadOnStartup(boolean loadOnStartup) { + this.loadOnStartup = loadOnStartup; + } + + public void setSsfFileMonitorPollingInterval( + String ssfFileMonitorPollingInterval) { + this.ssfFileMonitorPollingInterval = ssfFileMonitorPollingInterval; + } + + public void setSsfFileMonitorThreadpoolSize( + String ssfFileMonitorThreadpoolSize) { + this.ssfFileMonitorThreadpoolSize = ssfFileMonitorThreadpoolSize; + } + + public boolean isLoadOnStartup() { + return loadOnStartup; + } + + public String getSsfFileMonitorPollingInterval() { + return ssfFileMonitorPollingInterval; + } + + public String getSsfFileMonitorThreadpoolSize() { + return ssfFileMonitorThreadpoolSize; + } + + public ServicePropertiesListener getFileChangedListener() { + return fileChangedListener; + } + + public void setFileChangedListener( + ServicePropertiesListener fileChangedListener) { + this.fileChangedListener = fileChangedListener; + } + + public ServicePropertiesMap getFilePropertiesMap() { + return filePropertiesMap; + } + + public void setFilePropertiesMap(ServicePropertiesMap filePropertiesMap) { + this.filePropertiesMap = filePropertiesMap; + } +} diff --git a/src/main/java/com/att/nsa/dmaap/mmagent/CreateMirrorMaker.java b/src/main/java/com/att/nsa/dmaap/mmagent/CreateMirrorMaker.java new file mode 100644 index 0000000..92aca38 --- /dev/null +++ b/src/main/java/com/att/nsa/dmaap/mmagent/CreateMirrorMaker.java @@ -0,0 +1,43 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.dmaap.mmagent; + +public class CreateMirrorMaker { + String messageID; + MirrorMaker createMirrorMaker; + + public MirrorMaker getCreateMirrorMaker() { + return createMirrorMaker; + } + + public void setCreateMirrorMaker(MirrorMaker createMirrorMaker) { + this.createMirrorMaker = createMirrorMaker; + } + + public String getMessageID() { + return messageID; + } + + public void setMessageID(String messageID) { + this.messageID = messageID; + } +} diff --git a/src/main/java/com/att/nsa/dmaap/mmagent/MirrorMaker.java b/src/main/java/com/att/nsa/dmaap/mmagent/MirrorMaker.java new file mode 100644 index 0000000..f9e6d89 --- /dev/null +++ b/src/main/java/com/att/nsa/dmaap/mmagent/MirrorMaker.java @@ -0,0 +1,70 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.dmaap.mmagent; + +public class MirrorMaker { + public String name; + public String consumer; + public String producer; + public String whitelist; + public String status; + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getConsumer() { + return consumer; + } + + public void setConsumer(String consumer) { + this.consumer = consumer; + } + + public String getProducer() { + return producer; + } + + public void setProducer(String producer) { + this.producer = producer; + } + + public String getWhitelist() { + return whitelist; + } + + public void setWhitelist(String whitelist) { + this.whitelist = whitelist; + } +}
\ No newline at end of file diff --git a/src/main/java/com/att/nsa/dmaap/mmagent/UpdateMirrorMaker.java b/src/main/java/com/att/nsa/dmaap/mmagent/UpdateMirrorMaker.java new file mode 100644 index 0000000..4d291f3 --- /dev/null +++ b/src/main/java/com/att/nsa/dmaap/mmagent/UpdateMirrorMaker.java @@ -0,0 +1,43 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.dmaap.mmagent; + +public class UpdateMirrorMaker { + String messageID; + MirrorMaker updateMirrorMaker; + + public MirrorMaker getUpdateMirrorMaker() { + return updateMirrorMaker; + } + + public void setUpdateMirrorMaker(MirrorMaker updateMirrorMaker) { + this.updateMirrorMaker = updateMirrorMaker; + } + + public String getMessageID() { + return messageID; + } + + public void setMessageID(String messageID) { + this.messageID = messageID; + } +} diff --git a/src/main/java/com/att/nsa/dmaap/mmagent/UpdateWhiteList.java b/src/main/java/com/att/nsa/dmaap/mmagent/UpdateWhiteList.java new file mode 100644 index 0000000..616dc85 --- /dev/null +++ b/src/main/java/com/att/nsa/dmaap/mmagent/UpdateWhiteList.java @@ -0,0 +1,44 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.dmaap.mmagent; + +public class UpdateWhiteList { + + String messageID; + MirrorMaker updateWhiteList; + + public MirrorMaker getUpdateWhiteList() { + return updateWhiteList; + } + + public void setUpdateWhiteList(MirrorMaker updateWhiteList) { + this.updateWhiteList = updateWhiteList; + } + + public String getMessageID() { + return messageID; + } + + public void setMessageID(String messageID) { + this.messageID = messageID; + } +} diff --git a/src/main/java/com/att/nsa/dmaap/service/AdminRestService.java b/src/main/java/com/att/nsa/dmaap/service/AdminRestService.java new file mode 100644 index 0000000..5201dc8 --- /dev/null +++ b/src/main/java/com/att/nsa/dmaap/service/AdminRestService.java @@ -0,0 +1,293 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.dmaap.service; + +import java.io.IOException; +import java.util.Enumeration; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.DELETE; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; + +import org.apache.http.HttpStatus; +//import org.apache.log4j.Logger; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import org.json.JSONException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Component; + +import com.att.nsa.cambria.CambriaApiException; +import com.att.nsa.cambria.beans.DMaaPContext; +import com.att.nsa.cambria.exception.DMaaPResponseCode; +import com.att.nsa.cambria.exception.ErrorResponse; +import com.att.nsa.cambria.service.AdminService; +import com.att.nsa.cambria.utils.ConfigurationReader; +import com.att.nsa.configs.ConfigDbException; +import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException; + +/** + * Rest Service class + * for Admin Services + * @author author + * + */ +@Component +@Path("/") +public class AdminRestService { + + /** + * Logger obj + */ + //private static final Logger LOGGER = Logger + // .getLogger(AdminRestService.class); + private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(AdminRestService.class); + /** + * Config Reader + */ + @Autowired + @Qualifier("configurationReader") + private ConfigurationReader configReader; + + /** + * HttpServletRequest obj + */ + @Context + private HttpServletRequest request; + /** + * HttpServletResponse obj + */ + @Context + private HttpServletResponse response; + /** + * AdminService obj + */ + @Autowired + private AdminService adminService; + + /** + * Fetches a list of all the registered consumers along with their created + * time and last accessed details + * + * @return consumer list in json string format + * @throws CambriaApiException + * @throws AccessDeniedException + * @throws IOException + * */ + @GET + @Path("/consumerCache") + //@Produces(MediaType.TEXT_PLAIN) + public void getConsumerCache() throws CambriaApiException, AccessDeniedException { + LOGGER.info("Fetching list of registered consumers."); + try { + adminService.showConsumerCache(getDMaaPContext()); + LOGGER.info("Fetching Consumer Cache Successfully"); + } catch (IOException e) { + LOGGER.error("Error while Fetching list of registered consumers : " + + e.getMessage(), e); + + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, + DMaaPResponseCode.GET_CONSUMER_CACHE.getResponseCode(), + "Error while Fetching list of registered consumers " + e.getMessage()); + LOGGER.info(errRes.toString()); + throw new CambriaApiException(errRes); + } + } + + /** + * Clears consumer cache + * @throws CambriaApiException ex + * @throws AccessDeniedException + * + * @throws IOException ex + * @throws JSONException ex + * */ + @POST + @Path("/dropConsumerCache") + //@Produces(MediaType.TEXT_PLAIN) + public void dropConsumerCache() throws CambriaApiException, AccessDeniedException { + LOGGER.info("Dropping consumer cache"); + try { + adminService.dropConsumerCache(getDMaaPContext()); + LOGGER.info("Dropping Consumer Cache successfully"); + } catch ( AccessDeniedException excp) { + LOGGER.error("Error while dropConsumerCache : " + + excp.getMessage(), excp); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, + DMaaPResponseCode.GET_BLACKLIST.getResponseCode(), + "Error while Fetching list of blacklist ips " + excp.getMessage()); + LOGGER.info(errRes.toString()); + throw new CambriaApiException(errRes); + + } catch (JSONException | IOException e) { + LOGGER.error( + "Error while Dropping consumer cache : " + e.getMessage(), + e); + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, + DMaaPResponseCode.DROP_CONSUMER_CACHE.getResponseCode(), + "Error while Dropping consumer cache " + e.getMessage()); + LOGGER.info(errRes.toString()); + throw new CambriaApiException(errRes); + } + } + + /** + * Get list of blacklisted ips + * @throws CambriaApiException excp + */ + @GET + @Path("/blacklist") + //@Produces(MediaType.TEXT_PLAIN) + public void getBlacklist() throws CambriaApiException { + LOGGER.info("Fetching list of blacklist ips."); + try { + Enumeration headerNames = getDMaaPContext().getRequest().getHeaderNames(); + while (headerNames.hasMoreElements()) { + String key = (String) headerNames.nextElement(); + String value = request.getHeader(key); + + } + + adminService.getBlacklist(getDMaaPContext()); + LOGGER.info("Fetching list of blacklist ips Successfully"); + }catch ( AccessDeniedException excp) { + LOGGER.error("Error while Fetching list of blacklist ips : " + + excp.getMessage(), excp); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, + DMaaPResponseCode.GET_BLACKLIST.getResponseCode(), + "Error while Fetching list of blacklist ips " + excp.getMessage()); + LOGGER.info(errRes.toString()); + throw new CambriaApiException(errRes); + + } catch ( IOException excp) { + LOGGER.error("Error while Fetching list of blacklist ips : " + + excp.getMessage(), excp); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, + DMaaPResponseCode.GET_BLACKLIST.getResponseCode(), + "Error while Fetching list of blacklist ips " + excp.getMessage()); + LOGGER.info(errRes.toString()); + throw new CambriaApiException(errRes); + + } + + } + + /** + * Add ip to list of blacklist ips + * @param ip ip + * @throws CambriaApiException excp + */ + @POST + @Path("/blacklist/{ip}") + //@Produces(MediaType.TEXT_PLAIN) + public void addToBlacklist (@PathParam("ip") String ip ) throws CambriaApiException + { + LOGGER.info("Adding ip to list of blacklist ips."); + try { + adminService.addToBlacklist(getDMaaPContext(), ip); + LOGGER.info("Fetching list of blacklist ips Successfully"); + } catch ( AccessDeniedException excp) { + LOGGER.error("Error while blacklist : " + + excp.getMessage(), excp); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, + DMaaPResponseCode.GET_BLACKLIST.getResponseCode(), + "Error while Fetching list of blacklist ips " + excp.getMessage()); + LOGGER.info(errRes.toString()); + throw new CambriaApiException(errRes); + + } catch (IOException | ConfigDbException excp) { + LOGGER.error("Error while adding ip to list of blacklist ips : " + + excp.getMessage(), excp); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, + DMaaPResponseCode.ADD_BLACKLIST.getResponseCode(), + "Error while adding ip to list of blacklist ips " + excp.getMessage()); + LOGGER.info(errRes.toString()); + throw new CambriaApiException(errRes); + + } + + } + /** + * Remove ip from blacklist + * @param ip ip + * @throws CambriaApiException excp + * @throws AccessDeniedException excp + * @throws ConfigDbException excp + */ + @DELETE + @Path("/blacklist/{ip}") + //@Produces(MediaType.TEXT_PLAIN) + public void removeFromBlacklist(@PathParam("ip") String ip) throws CambriaApiException, AccessDeniedException, ConfigDbException { + LOGGER.info("Fetching list of blacklist ips."); + try { + adminService.removeFromBlacklist(getDMaaPContext(), ip); + LOGGER.info("Fetching list of blacklist ips Successfully"); + }catch ( AccessDeniedException excp) { + LOGGER.error("Error while blacklist : " + + excp.getMessage(), excp); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, + DMaaPResponseCode.GET_BLACKLIST.getResponseCode(), + "Error while removeFromBlacklist list of blacklist ips " + excp.getMessage()); + LOGGER.info(errRes.toString()); + throw new CambriaApiException(errRes); + + } catch (IOException | ConfigDbException excp) { + LOGGER.error("Error while removing ip from list of blacklist ips : " + + excp.getMessage(), excp); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, + DMaaPResponseCode.REMOVE_BLACKLIST.getResponseCode(), + "Error while removing ip from list of blacklist ips " + excp.getMessage()); + LOGGER.info(errRes.toString()); + throw new CambriaApiException(errRes); + + } + } + + /** + * Create a dmaap context + * @return DMaaPContext + */ + private DMaaPContext getDMaaPContext() { + DMaaPContext dmaaPContext = new DMaaPContext(); + dmaaPContext.setConfigReader(configReader); + dmaaPContext.setRequest(request); + dmaaPContext.setResponse(response); + return dmaaPContext; + } + +} diff --git a/src/main/java/com/att/nsa/dmaap/service/ApiKeysRestService.java b/src/main/java/com/att/nsa/dmaap/service/ApiKeysRestService.java new file mode 100644 index 0000000..9f04a1f --- /dev/null +++ b/src/main/java/com/att/nsa/dmaap/service/ApiKeysRestService.java @@ -0,0 +1,254 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.dmaap.service; + +import java.io.IOException; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; + +import org.apache.http.HttpStatus; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import org.json.JSONException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Component; + +import com.att.nsa.cambria.CambriaApiException; +import com.att.nsa.cambria.beans.ApiKeyBean; +import com.att.nsa.cambria.beans.DMaaPContext; +import com.att.nsa.cambria.exception.DMaaPResponseCode; +import com.att.nsa.cambria.exception.ErrorResponse; +import com.att.nsa.cambria.service.ApiKeysService; +import com.att.nsa.cambria.utils.ConfigurationReader; +import com.att.nsa.configs.ConfigDbException; +import com.att.nsa.security.db.NsaApiDb.KeyExistsException; +import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException; + +/** + * This class is a CXF REST service + * which acts as gateway for Cambria Api + * Keys. + * @author author + * + */ +@Component +@Path("/") +public class ApiKeysRestService { + + /** + * Logger obj + */ + //private Logger log = Logger.getLogger(ApiKeysRestService.class.toString()); + private static final EELFLogger log = EELFManager.getInstance().getLogger(ApiKeysRestService.class); + /** + * HttpServletRequest obj + */ + @Context + private HttpServletRequest request; + + /** + * HttpServletResponse obj + */ + @Context + private HttpServletResponse response; + + /** + * Config Reader + */ + @Autowired + @Qualifier("configurationReader") + private ConfigurationReader configReader; + + /** + * ApiKeysService obj + */ + @Autowired + private ApiKeysService apiKeyService; + + /** + * Returns a list of all the existing Api keys + * @throws CambriaApiException + * + * @throws IOException + * */ + @GET + public void getAllApiKeys() throws CambriaApiException { + + log.info("Inside ApiKeysRestService.getAllApiKeys"); + + try { + apiKeyService.getAllApiKeys(getDmaapContext()); + log.info("Fetching all API keys is Successful"); + } catch (ConfigDbException | IOException e) { + log.error("Error while retrieving API keys: " + e); + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, + DMaaPResponseCode.GENERIC_INTERNAL_ERROR.getResponseCode(), + "Error while retrieving API keys: "+ e.getMessage()); + log.info(errRes.toString()); + throw new CambriaApiException(errRes); + } + + } + + /** + * Returns details of a particular api key whose <code>name</code> is passed + * as a parameter + * + * @param apiKeyName + * - name of the api key + * @throws CambriaApiException + * @throws IOException + * */ + @GET + @Path("/{apiKey}") + public void getApiKey(@PathParam("apiKey") String apiKeyName) throws CambriaApiException { + log.info("Fetching details of api key: " + apiKeyName); + + try { + apiKeyService.getApiKey(getDmaapContext(), apiKeyName); + log.info("Fetching specific API key is Successful"); + } catch (ConfigDbException | IOException e) { + log.error("Error while retrieving API key details: " + e); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, + DMaaPResponseCode.GENERIC_INTERNAL_ERROR.getResponseCode(), + "Error while retrieving API key details: "+ e.getMessage()); + log.info(errRes.toString()); + throw new CambriaApiException(errRes); + } + } + + + + /** + * Creates api key using the <code>email</code> and <code>description</code> + * + * @param nsaApiKey + * @throws CambriaApiException + * @throws JSONException + * */ + @POST + @Path("/create") + @Consumes(MediaType.APPLICATION_JSON) + public void createApiKey(ApiKeyBean nsaApiKey) throws CambriaApiException, JSONException { + log.info("Creating Api Key."); + + try { + apiKeyService.createApiKey(getDmaapContext(), nsaApiKey); + log.info("Creating API key is Successful"); + } catch (KeyExistsException | ConfigDbException | IOException e) { + log.error("Error while Creating API key : " + e.getMessage(), e); + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, + DMaaPResponseCode.GENERIC_INTERNAL_ERROR.getResponseCode(), + "Error while Creating API key : "+ e.getMessage()); + log.info(errRes.toString()); + throw new CambriaApiException(errRes); + } + + } + + /** + * Updates an existing apiKey using the key name passed a parameter and the + * details passed. + * + * @param apiKeyName + * - name of the api key to be updated + * @param nsaApiKey + * @throws CambriaApiException + * @throws JSONException + * @throws IOException + * @throws AccessDeniedException + * */ + @PUT + @Path("/{apiKey}") + public void updateApiKey(@PathParam("apiKey") String apiKeyName, + ApiKeyBean nsaApiKey) throws CambriaApiException, JSONException { + log.info("Updating Api Key."); + + try { + + apiKeyService + .updateApiKey(getDmaapContext(), apiKeyName, nsaApiKey); + log.error("API key updated sucessfully"); + } catch (ConfigDbException | IOException | AccessDeniedException e) { + log.error("Error while Updating API key : " + apiKeyName, e); + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, + DMaaPResponseCode.GENERIC_INTERNAL_ERROR.getResponseCode(), + "Error while Updating API key : "+ e.getMessage()); + log.info(errRes.toString()); + throw new CambriaApiException(errRes); + + } + } + + /** + * Deletes an existing apiKey using the key name passed as a parameter. + * + * @param apiKeyName + * - name of the api key to be updated + * @throws CambriaApiException + * @throws IOException + * @throws AccessDeniedException + * */ + @DELETE + @Path("/{apiKey}") + public void deleteApiKey(@PathParam("apiKey") String apiKeyName) throws CambriaApiException { + log.info("Deleting Api Key: " + apiKeyName); + try { + apiKeyService.deleteApiKey(getDmaapContext(), apiKeyName); + log.info("Api Key deleted successfully: " + apiKeyName); + } catch (ConfigDbException | IOException | AccessDeniedException e) { + log.error("Error while deleting API key : " + apiKeyName, e); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, + DMaaPResponseCode.GENERIC_INTERNAL_ERROR.getResponseCode(), + "Error while deleting API key : "+ e.getMessage()); + log.info(errRes.toString()); + throw new CambriaApiException(errRes); + + } + } + + /** + * Create a dmaap context + * @return DMaaPContext + */ + private DMaaPContext getDmaapContext() { + DMaaPContext dmaapContext = new DMaaPContext(); + dmaapContext.setConfigReader(configReader); + dmaapContext.setRequest(request); + dmaapContext.setResponse(response); + return dmaapContext; + } + +}
\ No newline at end of file diff --git a/src/main/java/com/att/nsa/dmaap/service/EventsRestService.java b/src/main/java/com/att/nsa/dmaap/service/EventsRestService.java new file mode 100644 index 0000000..cda431c --- /dev/null +++ b/src/main/java/com/att/nsa/dmaap/service/EventsRestService.java @@ -0,0 +1,313 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.dmaap.service; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Date; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; + +import org.apache.http.HttpStatus; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Component; + +import com.att.nsa.cambria.CambriaApiException; +import com.att.nsa.cambria.backends.ConsumerFactory.UnavailableException; +import com.att.nsa.cambria.beans.DMaaPContext; +import com.att.nsa.cambria.exception.DMaaPErrorMessages; +import com.att.nsa.cambria.exception.DMaaPResponseCode; +import com.att.nsa.cambria.exception.ErrorResponse; +import com.att.nsa.cambria.metabroker.Broker.TopicExistsException; +import com.att.nsa.cambria.service.EventsService; +import com.att.nsa.cambria.utils.ConfigurationReader; +import com.att.nsa.cambria.utils.Utils; +import com.att.nsa.configs.ConfigDbException; +import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; +import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException; +import com.att.nsa.cambria.exception.DMaaPAccessDeniedException; +/** + * This class is a CXF REST service which acts + * as gateway for MR Event Service. + * @author author + * + */ +@Component +@Path("/") +public class EventsRestService { + + /** + * Logger obj + */ + //private Logger log = Logger.getLogger(EventsRestService.class.toString()); + private static final EELFLogger log = EELFManager.getInstance().getLogger(EventsRestService.class); + /** + * HttpServletRequest obj + */ + @Context + private HttpServletRequest request; + + /** + * HttpServletResponse obj + */ + @Context + private HttpServletResponse response; + + + /** + * Config Reader + */ + @Autowired + @Qualifier("configurationReader") + private ConfigurationReader configReader; + + @Autowired + private EventsService eventsService; + + @Autowired + private DMaaPErrorMessages errorMessages; + + /** + * This method is used to consume messages.Taking three parameter + * topic,consumerGroup and consumerId .Consumer decide to which topic they + * want to consume messages.In on consumer Group there might be many + * consumer may be present. + * + * @param topic + * specify- the topic name + * @param consumergroup + * - specify the consumer group + * @param consumerid + * -specify the consumer id + * + * handles CambriaApiException | ConfigDbException | + * TopicExistsException | AccessDeniedException | + * UnavailableException | IOException in try catch block + * @throws CambriaApiException + * + */ + @GET + @Path("/{topic}/{consumergroup}/{consumerid}") + public void getEvents(@PathParam("topic") String topic, @PathParam("consumergroup") + String consumergroup, + @PathParam("consumerid") String consumerid) throws CambriaApiException { + // log.info("Consuming message from topic " + topic ); + DMaaPContext dMaaPContext = getDmaapContext(); + dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date())); + + try { + + eventsService.getEvents(dMaaPContext, topic, consumergroup, consumerid); + } + catch (TopicExistsException e) { + log.error("Error while reading data from topic [" + topic + "].", e); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT, + DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError() + + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null, + consumerid, + request.getRemoteHost()); + log.info(errRes.toString()); + throw new CambriaApiException(errRes); + + } + catch (DMaaPAccessDeniedException | AccessDeniedException e) { + log.error("Error while reading data from topic [" + topic + "].", e); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, + DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError() + + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null, + consumerid, + request.getRemoteHost()); + log.info(errRes.toString()); + throw new CambriaApiException(errRes); + + } + + catch (ConfigDbException | UnavailableException | IOException e) { + log.error("Error while reading data from topic [" + topic + "].", e); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, + DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError() + + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null, + consumerid, + request.getRemoteHost()); + log.info(errRes.toString()); + throw new CambriaApiException(errRes); + + } + } + + /** + * This method is used to publish messages.Taking two parameter topic and + * partition.Publisher decide to which topic they want to publish message + * and kafka decide to which partition of topic message will send, + * + * @param topic + * @param msg + * @param partitionKey + * + * handles CambriaApiException | ConfigDbException | + * TopicExistsException | AccessDeniedException | IOException in + * try catch block + * @throws CambriaApiException + */ + + @POST + @Produces("application/json") + @Path("/{topic}") + public void pushEvents(@PathParam("topic") String topic, InputStream msg, + @QueryParam("partitionKey") String partitionKey) throws CambriaApiException { + log.info("Publishing message to topic " + topic); + + try { + eventsService.pushEvents(getDmaapContext(), topic, msg, partitionKey, null); + } + catch ( TopicExistsException e) { + log.error("Error while publishing to topic [" + topic + "].", e); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT, + DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError() + + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, + Utils.getUserApiKey(request), request.getRemoteHost(), null, null); + log.info(errRes.toString()); + throw new CambriaApiException(errRes); + } + catch ( DMaaPAccessDeniedException | AccessDeniedException e) { + log.error("Error while publishing to topic [" + topic + "].", e); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, + DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError() + + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, + Utils.getUserApiKey(request), request.getRemoteHost(), null, null); + log.info(errRes.toString()); + throw new CambriaApiException(errRes); + } + + + catch (ConfigDbException | IOException | missingReqdSetting e) { + log.error("Error while publishing to topic [" + topic + "].", e); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, + DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError() + + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, + Utils.getUserApiKey(request), request.getRemoteHost(), null, null); + log.info(errRes.toString()); + throw new CambriaApiException(errRes); + } + } + + /** + * This method is used to publish messages by passing an optional header + * called 'transactionId'. If the 'transactionId' is not provided in the + * input then a new transaction object will be created. Else the existing + * transaction object will be updated with the counter details. + * + * @param topic + * @param partitionKey + * + * handles CambriaApiException | ConfigDbException | + * TopicExistsException | AccessDeniedException | IOException in + * try catch block + * @throws CambriaApiException + */ + @POST + @Produces("application/json") + @Path("/transaction/{topic}") + public void pushEventsWithTransaction(@PathParam("topic") String topic, + @QueryParam("partitionKey") String partitionKey) throws CambriaApiException { + // log.info("Publishing message with transaction id for topic " + topic + // ); + + try { + eventsService.pushEvents(getDmaapContext(), topic, request.getInputStream(), + partitionKey, + Utils.getFormattedDate(new Date())); + } + + catch ( TopicExistsException e) { + log.error("Error while publishing to topic [" + topic + "].", e); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT, + DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError() + + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, + Utils.getUserApiKey(request), request.getRemoteHost(), null, null); + log.info(errRes.toString()); + throw new CambriaApiException(errRes); + } + catch ( DMaaPAccessDeniedException| AccessDeniedException e) { + log.error("Error while publishing to topic [" + topic + "].", e); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, + DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError() + + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, + Utils.getUserApiKey(request), request.getRemoteHost(), null, null); + log.info(errRes.toString()); + throw new CambriaApiException(errRes); + } + + catch (ConfigDbException | IOException | missingReqdSetting e) { + log.error("Error while publishing to topic : " + topic, e); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, + DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), "Transaction-" + + errorMessages.getPublishMsgError() + e.getMessage(), null, + Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(request), + request.getRemoteHost(), + null, null); + log.info(errRes.toString()); + throw new CambriaApiException(errRes); + + } + } + + /** + * This method is used for taking Configuration Object,HttpServletRequest + * Object,HttpServletRequest HttpServletResponse Object,HttpServletSession + * Object. + * + * @return DMaaPContext object from where user can get Configuration + * Object,HttpServlet Object + * + */ + private DMaaPContext getDmaapContext() { + + DMaaPContext dmaapContext = new DMaaPContext(); + dmaapContext.setRequest(request); + dmaapContext.setResponse(response); + dmaapContext.setConfigReader(configReader); + + return dmaapContext; + } + +}
\ No newline at end of file diff --git a/src/main/java/com/att/nsa/dmaap/service/MMRestService.java b/src/main/java/com/att/nsa/dmaap/service/MMRestService.java new file mode 100644 index 0000000..0fa396f --- /dev/null +++ b/src/main/java/com/att/nsa/dmaap/service/MMRestService.java @@ -0,0 +1,1238 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.dmaap.service; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; +import org.json.JSONObject; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Component; + +import com.att.nsa.cambria.utils.ConfigurationReader; +import com.att.nsa.cambria.utils.DMaaPResponseBuilder; +import com.att.nsa.cambria.utils.Utils; +import com.att.nsa.configs.ConfigDbException; +import com.att.nsa.dmaap.mmagent.*; +import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; +import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException; +import com.google.gson.Gson; +import com.google.gson.JsonSyntaxException; + +import edu.emory.mathcs.backport.java.util.Arrays; + +import com.att.ajsc.filemonitor.AJSCPropertiesMap; +import com.att.nsa.cambria.CambriaApiException; +import com.att.nsa.cambria.backends.ConsumerFactory.UnavailableException; + +import org.json.JSONArray; +import org.json.JSONException; +import com.att.nsa.cambria.beans.DMaaPContext; +import com.att.nsa.cambria.constants.CambriaConstants; +import com.att.nsa.cambria.exception.DMaaPErrorMessages; +import com.att.nsa.cambria.metabroker.Broker.TopicExistsException; +import com.att.nsa.cambria.security.DMaaPAAFAuthenticator; +import com.att.nsa.cambria.security.DMaaPAAFAuthenticatorImpl; +import com.att.nsa.cambria.service.MMService; + +/** + * Rest Service class for Mirror Maker proxy Rest Services + * + * @author <a href="mailto:"></a> + * + * @since May 25, 2016 + */ + +@Component +public class MMRestService { + + //private static final Logger LOGGER = Logger.getLogger(MMRestService.class); + private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(MMRestService.class); + private static final String NO_ADMIN_PERMISSION = "No Mirror Maker Admin permission."; + private static final String NO_USER_PERMISSION = "No Mirror Maker User permission."; + private static final String NO_USER_CREATE_PERMISSION = "No Mirror Maker User Create permission."; + private static final String NAME_DOES_NOT_MEET_REQUIREMENT = "Mirror Maker name can only contain alpha numeric"; + private static final String INVALID_IPPORT = "This is not a valid IP:Port"; + + private String topic; + private int timeout; + private String consumergroup; + private String consumerid; + + @Autowired + @Qualifier("configurationReader") + private ConfigurationReader configReader; + + @Context + private HttpServletRequest request; + + @Context + private HttpServletResponse response; + + @Autowired + private MMService mirrorService; + + @Autowired + private DMaaPErrorMessages errorMessages; + + /** + * This method is used for taking Configuration Object,HttpServletRequest + * Object,HttpServletRequest HttpServletResponse Object,HttpServletSession + * Object. + * + * @return DMaaPContext object from where user can get Configuration + * Object,HttpServlet Object + * + */ + private DMaaPContext getDmaapContext() { + DMaaPContext dmaapContext = new DMaaPContext(); + dmaapContext.setRequest(request); + dmaapContext.setResponse(response); + dmaapContext.setConfigReader(configReader); + dmaapContext.setConsumerRequestTime(Utils.getFormattedDate(new Date())); + + return dmaapContext; + } + + @POST + @Produces("application/json") + @Path("/create") + public void callCreateMirrorMaker(InputStream msg) { + + DMaaPContext ctx = getDmaapContext(); + if (checkMirrorMakerPermission(ctx, + AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeradmin.aaf"))) { + + loadProperty(); + String input = null; + String randomStr = getRandomNum(); + + InputStream inStream = null; + Gson gson = new Gson(); + CreateMirrorMaker createMirrorMaker = new CreateMirrorMaker(); + + try { + input = IOUtils.toString(msg, "UTF-8"); + + if (input != null && input.length() > 0) { + input = removeExtraChar(input); + } + + // Check if the request has CreateMirrorMaker + try { + createMirrorMaker = gson.fromJson(input, CreateMirrorMaker.class); + + } catch (JsonSyntaxException ex) { + + sendErrResponse(ctx, errorMessages.getIncorrectJson()); + } + String name = createMirrorMaker.getCreateMirrorMaker().getName(); + // send error message if it is not a CreateMirrorMaker request. + if (createMirrorMaker.getCreateMirrorMaker() == null) { + sendErrResponse(ctx, "This is not a CreateMirrorMaker request. Please try again."); + } + + // MirrorMaker whitelist and status should not be passed + else if (createMirrorMaker.getCreateMirrorMaker().getWhitelist() != null + || createMirrorMaker.getCreateMirrorMaker().getStatus() != null) { + sendErrResponse(ctx, "This is not a CreateMirrorMaker request. Please try again."); + } + + // if empty, blank name is entered + else if (StringUtils.isBlank(name)) { + sendErrResponse(ctx, "Name can not be empty or blank."); + } + + // Check if the name contains only Alpha Numeric + else if (!isAlphaNumeric(name)) { + sendErrResponse(ctx, NAME_DOES_NOT_MEET_REQUIREMENT); + + } + + // Validate the IP and Port + else if (!StringUtils.isBlank(createMirrorMaker.getCreateMirrorMaker().getConsumer()) + && !StringUtils.isBlank(createMirrorMaker.getCreateMirrorMaker().getProducer()) + && !validateIPPort(createMirrorMaker.getCreateMirrorMaker().getConsumer()) + || !validateIPPort(createMirrorMaker.getCreateMirrorMaker().getProducer())) { + sendErrResponse(ctx, INVALID_IPPORT); + + } + // Set a random number as messageID, convert Json Object to + // InputStream and finally call publisher and subscriber + else if (isAlphaNumeric(name) && validateIPPort(createMirrorMaker.getCreateMirrorMaker().getConsumer()) + && validateIPPort(createMirrorMaker.getCreateMirrorMaker().getProducer())) { + + createMirrorMaker.setMessageID(randomStr); + inStream = IOUtils.toInputStream(gson.toJson(createMirrorMaker), "UTF-8"); + callPubSub(randomStr, ctx, inStream); + } + + } catch (IOException e) { + + e.printStackTrace(); + } + } + // Send error response if user does not provide Authorization + else { + sendErrResponse(ctx, NO_ADMIN_PERMISSION); + } + } + + @POST + @Produces("application/json") + @Path("/listall") + public void callListAllMirrorMaker(InputStream msg) throws CambriaApiException { + DMaaPContext ctx = getDmaapContext(); + + if (checkMirrorMakerPermission(ctx, + AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeradmin.aaf"))) { + + loadProperty(); + + String input = null; + + try { + input = IOUtils.toString(msg, "UTF-8"); + + if (input != null && input.length() > 0) { + input = removeExtraChar(input); + } + + String randomStr = getRandomNum(); + JSONObject jsonOb = null; + + try { + jsonOb = new JSONObject(input); + + } catch (JSONException ex) { + + sendErrResponse(ctx, errorMessages.getIncorrectJson()); + } + + // Check if request has listAllMirrorMaker and + // listAllMirrorMaker is empty + if (jsonOb.has("listAllMirrorMaker") && jsonOb.getJSONObject("listAllMirrorMaker").length() == 0) { + + jsonOb.put("messageID", randomStr); + InputStream inStream = null; + + try { + inStream = IOUtils.toInputStream(jsonOb.toString(), "UTF-8"); + + } catch (IOException ioe) { + ioe.printStackTrace(); + } + + callPubSub(randomStr, ctx, inStream); + + } else { + + sendErrResponse(ctx, "This is not a ListAllMirrorMaker request. Please try again."); + } + + } catch (IOException ioe) { + + ioe.printStackTrace(); + } + + } else { + + sendErrResponse(getDmaapContext(), NO_ADMIN_PERMISSION); + } + } + + @POST + @Produces("application/json") + @Path("/update") + public void callUpdateMirrorMaker(InputStream msg) throws CambriaApiException { + + DMaaPContext ctx = getDmaapContext(); + if (checkMirrorMakerPermission(ctx, + AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeradmin.aaf"))) { + + loadProperty(); + String input = null; + String randomStr = getRandomNum(); + + InputStream inStream = null; + Gson gson = new Gson(); + UpdateMirrorMaker updateMirrorMaker = new UpdateMirrorMaker(); + + try { + input = IOUtils.toString(msg, "UTF-8"); + + if (input != null && input.length() > 0) { + input = removeExtraChar(input); + } + + // Check if the request has UpdateMirrorMaker + try { + updateMirrorMaker = gson.fromJson(input, UpdateMirrorMaker.class); + + } catch (JsonSyntaxException ex) { + + sendErrResponse(ctx, errorMessages.getIncorrectJson()); + } + String name = updateMirrorMaker.getUpdateMirrorMaker().getName(); + + // send error message if it is not a UpdateMirrorMaker request. + if (updateMirrorMaker.getUpdateMirrorMaker() == null) { + sendErrResponse(ctx, "This is not a UpdateMirrorMaker request. Please try again."); + } + + // MirrorMaker whitelist and status should not be passed + else if (updateMirrorMaker.getUpdateMirrorMaker().getWhitelist() != null + || updateMirrorMaker.getUpdateMirrorMaker().getStatus() != null) { + sendErrResponse(ctx, "This is not a UpdateMirrorMaker request. Please try again."); + } + + // if empty, blank name is entered + else if (StringUtils.isBlank(name)) { + sendErrResponse(ctx, "Name can not be empty or blank."); + } + + // Check if the name contains only Alpha Numeric + else if (!isAlphaNumeric(name)) { + sendErrResponse(ctx, NAME_DOES_NOT_MEET_REQUIREMENT); + + } + + // Validate the IP and Port + else if (!StringUtils.isBlank(updateMirrorMaker.getUpdateMirrorMaker().getConsumer()) + && !StringUtils.isBlank(updateMirrorMaker.getUpdateMirrorMaker().getProducer()) + && !validateIPPort(updateMirrorMaker.getUpdateMirrorMaker().getConsumer()) + || !validateIPPort(updateMirrorMaker.getUpdateMirrorMaker().getProducer())) { + sendErrResponse(ctx, INVALID_IPPORT); + + } + // Set a random number as messageID, convert Json Object to + // InputStream and finally call publisher and subscriber + else if (isAlphaNumeric(name) && validateIPPort(updateMirrorMaker.getUpdateMirrorMaker().getConsumer()) + && validateIPPort(updateMirrorMaker.getUpdateMirrorMaker().getProducer())) { + + updateMirrorMaker.setMessageID(randomStr); + inStream = IOUtils.toInputStream(gson.toJson(updateMirrorMaker), "UTF-8"); + callPubSub(randomStr, ctx, inStream); + } + + } catch (IOException e) { + + e.printStackTrace(); + } + } + // Send error response if user does not provide Authorization + else { + sendErrResponse(ctx, NO_ADMIN_PERMISSION); + } + } + + @POST + @Produces("application/json") + @Path("/delete") + public void callDeleteMirrorMaker(InputStream msg) throws CambriaApiException { + DMaaPContext ctx = getDmaapContext(); + + if (checkMirrorMakerPermission(ctx, + AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeradmin.aaf"))) { + + loadProperty(); + + String input = null; + + try { + input = IOUtils.toString(msg, "UTF-8"); + + if (input != null && input.length() > 0) { + input = removeExtraChar(input); + } + + String randomStr = getRandomNum(); + JSONObject jsonOb = null; + + try { + jsonOb = new JSONObject(input); + + } catch (JSONException ex) { + + sendErrResponse(ctx, errorMessages.getIncorrectJson()); + } + + // Check if request has DeleteMirrorMaker and + // DeleteMirrorMaker has MirrorMaker object with name variable + // and check if the name contain only alpha numeric + if (jsonOb.has("deleteMirrorMaker") && jsonOb.getJSONObject("deleteMirrorMaker").length() == 1 + && jsonOb.getJSONObject("deleteMirrorMaker").has("name") + && !StringUtils.isBlank(jsonOb.getJSONObject("deleteMirrorMaker").getString("name")) + && isAlphaNumeric(jsonOb.getJSONObject("deleteMirrorMaker").getString("name"))) { + + jsonOb.put("messageID", randomStr); + InputStream inStream = null; + + try { + inStream = IOUtils.toInputStream(jsonOb.toString(), "UTF-8"); + + } catch (IOException ioe) { + ioe.printStackTrace(); + } + + callPubSub(randomStr, ctx, inStream); + + } else { + + sendErrResponse(ctx, "This is not a DeleteMirrorMaker request. Please try again."); + } + + } catch (IOException ioe) { + + ioe.printStackTrace(); + } + + } else { + + sendErrResponse(getDmaapContext(), NO_ADMIN_PERMISSION); + } + } + + private boolean isListMirrorMaker(String msg, String messageID) { + String topicmsg = msg; + topicmsg = removeExtraChar(topicmsg); + + JSONObject jObj = new JSONObject(); + JSONArray jArray = null; + boolean exist = false; + + if (!StringUtils.isBlank(topicmsg) && topicmsg.length() > 2) { + jArray = new JSONArray(topicmsg); + + for (int i = 0; i < jArray.length(); i++) { + jObj = jArray.getJSONObject(i); + + JSONObject obj = new JSONObject(); + if (jObj.has("message")) { + obj = jObj.getJSONObject("message"); + } + if (obj.has("messageID") && obj.get("messageID").equals(messageID) && obj.has("listMirrorMaker")) { + exist = true; + break; + } + } + } + return exist; + } + + private void loadProperty() { + + this.timeout = Integer.parseInt( + AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.timeout").trim()); + this.topic = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.topic").trim(); + this.consumergroup = AJSCPropertiesMap + .getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.consumergroup").trim(); + this.consumerid = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormaker.consumerid") + .trim(); + } + + private String removeExtraChar(String message) { + String str = message; + str = checkJsonFormate(str); + + if (str != null && str.length() > 0) { + str = str.replace("\\", ""); + str = str.replace("\"{", "{"); + str = str.replace("}\"", "}"); + } + return str; + } + + private String getRandomNum() { + long random = Math.round(Math.random() * 89999) + 10000; + String strLong = Long.toString(random); + return strLong; + } + + private boolean isAlphaNumeric(String name) { + String pattern = "^[a-zA-Z0-9]*$"; + if (name.matches(pattern)) { + return true; + } + return false; + } + + // This method validate IPv4 + private boolean validateIPPort(String ipPort) { + String pattern = "^([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\." + + "([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.([01]?\\d\\d?|2[0-4]\\d|25[0-5]):" + + "([1-9][0-9]{0,3}|[1-5][0-9]{4}|6[0-4][0-9]{3}|65[0-4][0-9]{2}|655[0-2][0-9]|6553[0-5])$"; + if (ipPort.matches(pattern)) { + return true; + } + return false; + } + + private String checkJsonFormate(String jsonStr) { + + String json = jsonStr; + if (jsonStr != null && jsonStr.length() > 0 && jsonStr.startsWith("[") && !jsonStr.endsWith("]")) { + json = json + "]"; + } + return json; + } + + private boolean checkMirrorMakerPermission(DMaaPContext ctx, String permission) { + + boolean hasPermission = false; + + DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); + + if (aaf.aafAuthentication(ctx.getRequest(), permission)) { + hasPermission = true; + } + return hasPermission; + } + + private void callPubSub(String randomstr, DMaaPContext ctx, InputStream inStream) { + try { + mirrorService.pushEvents(ctx, topic, inStream, null, null); + long startTime = System.currentTimeMillis(); + String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid); + + while (!isListMirrorMaker(msgFrmSubscribe, randomstr) + && (System.currentTimeMillis() - startTime) < timeout) { + msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid); + } + + JSONObject jsonObj = new JSONObject(); + JSONObject finalJsonObj = new JSONObject(); + JSONArray jsonArray = null; + + if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0 + && isListMirrorMaker(msgFrmSubscribe, randomstr)) { + msgFrmSubscribe = removeExtraChar(msgFrmSubscribe); + jsonArray = new JSONArray(msgFrmSubscribe); + + for (int i = 0; i < jsonArray.length(); i++) { + jsonObj = jsonArray.getJSONObject(i); + + JSONObject obj = new JSONObject(); + if (jsonObj.has("message")) { + obj = jsonObj.getJSONObject("message"); + } + if (obj.has("messageID") && obj.get("messageID").equals(randomstr) && obj.has("listMirrorMaker")) { + finalJsonObj.put("listMirrorMaker", obj.get("listMirrorMaker")); + break; + } + } + + DMaaPResponseBuilder.respondOk(ctx, finalJsonObj); + + } else { + + JSONObject err = new JSONObject(); + err.append("error", "listMirrorMaker is not available, please make sure MirrorMakerAgent is running"); + DMaaPResponseBuilder.respondOk(ctx, err); + } + + } catch (Exception e) { + e.printStackTrace(); + } + } + + private void sendErrResponse(DMaaPContext ctx, String errMsg) { + JSONObject err = new JSONObject(); + err.append("Error", errMsg); + + try { + DMaaPResponseBuilder.respondOk(ctx, err); + LOGGER.error(errMsg.toString()); + + } catch (JSONException | IOException e) { + LOGGER.error(errMsg.toString()); + } + } + + @SuppressWarnings("unchecked") + @POST + @Produces("application/json") + @Path("/listallwhitelist") + public void listWhiteList(InputStream msg) { + + DMaaPContext ctx = getDmaapContext(); + if (checkMirrorMakerPermission(ctx, + AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeruser.aaf"))) { + + loadProperty(); + String input = null; + + try { + input = IOUtils.toString(msg, "UTF-8"); + + if (input != null && input.length() > 0) { + input = removeExtraChar(input); + } + + // Check if it is correct Json object + JSONObject jsonOb = null; + + try { + jsonOb = new JSONObject(input); + + } catch (JSONException ex) { + + sendErrResponse(ctx, errorMessages.getIncorrectJson()); + } + + // Check if the request has name and name contains only alpha + // numeric + // and check if the request has namespace and namespace contains + // only alpha numeric + if (jsonOb.length() == 2 && jsonOb.has("name") && !StringUtils.isBlank(jsonOb.getString("name")) + && isAlphaNumeric(jsonOb.getString("name")) && jsonOb.has("namespace") + && !StringUtils.isBlank(jsonOb.getString("namespace"))) { + + String permission = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, + "msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString("namespace") + "|create"; + + // Check if the user have create permission for the + // namespace + if (checkMirrorMakerPermission(ctx, permission)) { + + JSONObject listAll = new JSONObject(); + JSONObject emptyObject = new JSONObject(); + + // Create a listAllMirrorMaker Json object + try { + listAll.put("listAllMirrorMaker", emptyObject); + + } catch (JSONException e) { + + e.printStackTrace(); + } + + // set a random number as messageID + String randomStr = getRandomNum(); + listAll.put("messageID", randomStr); + InputStream inStream = null; + + // convert listAll Json object to InputStream object + try { + inStream = IOUtils.toInputStream(listAll.toString(), "UTF-8"); + + } catch (IOException ioe) { + ioe.printStackTrace(); + } + // call listAllMirrorMaker + mirrorService.pushEvents(ctx, topic, inStream, null, null); + + // subscribe for listMirrorMaker + long startTime = System.currentTimeMillis(); + String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid); + + while (!isListMirrorMaker(msgFrmSubscribe, randomStr) + && (System.currentTimeMillis() - startTime) < timeout) { + msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid); + } + + if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0 + && isListMirrorMaker(msgFrmSubscribe, randomStr)) { + + JSONArray listMirrorMaker = new JSONArray(); + listMirrorMaker = getListMirrorMaker(msgFrmSubscribe, randomStr); + + String whitelist = null; + for (int i = 0; i < listMirrorMaker.length(); i++) { + + JSONObject mm = new JSONObject(); + mm = listMirrorMaker.getJSONObject(i); + String name = mm.getString("name"); + + if (name.equals(jsonOb.getString("name")) && mm.has("whitelist")) { + whitelist = mm.getString("whitelist"); + break; + } + } + + if (!StringUtils.isBlank(whitelist)) { + + List<String> topicList = new ArrayList<String>(); + List<String> finalTopicList = new ArrayList<String>(); + topicList = Arrays.asList(whitelist.split(",")); + + for (String topic : topicList) { + if (topic != null && !topic.equals("null") + && getNamespace(topic).equals(jsonOb.getString("namespace"))) { + + finalTopicList.add(topic); + } + } + + String topicNames = ""; + + if (finalTopicList.size() > 0) { + topicNames = StringUtils.join(finalTopicList, ","); + } + + JSONObject listAllWhiteList = new JSONObject(); + listAllWhiteList.put("name", jsonOb.getString("name")); + listAllWhiteList.put("whitelist", topicNames); + + DMaaPResponseBuilder.respondOk(ctx, listAllWhiteList); + } + + } else { + + JSONObject err = new JSONObject(); + err.append("error", + "listWhiteList is not available, please make sure MirrorMakerAgent is running"); + DMaaPResponseBuilder.respondOk(ctx, err); + } + + } else { + sendErrResponse(ctx, NO_USER_CREATE_PERMISSION); + } + + } else { + + sendErrResponse(ctx, "This is not a ListAllWhitelist request. Please try again."); + } + + } catch (IOException | CambriaApiException | ConfigDbException | AccessDeniedException + | TopicExistsException | missingReqdSetting | UnavailableException e) { + + e.printStackTrace(); + } + } else { + sendErrResponse(ctx, NO_USER_PERMISSION); + } + } + + @SuppressWarnings("unchecked") + @POST + @Produces("application/json") + @Path("/createwhitelist") + public void createWhiteList(InputStream msg) { + + DMaaPContext ctx = getDmaapContext(); + if (checkMirrorMakerPermission(ctx, + AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeruser.aaf"))) { + + loadProperty(); + String input = null; + + try { + input = IOUtils.toString(msg, "UTF-8"); + + if (input != null && input.length() > 0) { + input = removeExtraChar(input); + } + + // Check if it is correct Json object + JSONObject jsonOb = null; + + try { + jsonOb = new JSONObject(input); + + } catch (JSONException ex) { + + sendErrResponse(ctx, errorMessages.getIncorrectJson()); + } + + // Check if the request has name and name contains only alpha numeric, + // check if the request has namespace and + // check if the request has whitelistTopicName + // check if the topic name contains only alpha numeric + if (jsonOb.length() == 3 && jsonOb.has("name") && !StringUtils.isBlank(jsonOb.getString("name")) + && isAlphaNumeric(jsonOb.getString("name")) + && jsonOb.has("namespace") && !StringUtils.isBlank(jsonOb.getString("namespace")) + && jsonOb.has("whitelistTopicName") && !StringUtils.isBlank(jsonOb.getString("whitelistTopicName")) + && isAlphaNumeric(jsonOb.getString("whitelistTopicName").substring(jsonOb.getString("whitelistTopicName").lastIndexOf(".")+1, + jsonOb.getString("whitelistTopicName").length()))) { + + String permission = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, + "msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString("namespace") + "|create"; + + // Check if the user have create permission for the + // namespace + if (checkMirrorMakerPermission(ctx, permission)) { + + JSONObject listAll = new JSONObject(); + JSONObject emptyObject = new JSONObject(); + + // Create a listAllMirrorMaker Json object + try { + listAll.put("listAllMirrorMaker", emptyObject); + + } catch (JSONException e) { + + e.printStackTrace(); + } + + // set a random number as messageID + String randomStr = getRandomNum(); + listAll.put("messageID", randomStr); + InputStream inStream = null; + + // convert listAll Json object to InputStream object + try { + inStream = IOUtils.toInputStream(listAll.toString(), "UTF-8"); + + } catch (IOException ioe) { + ioe.printStackTrace(); + } + // call listAllMirrorMaker + mirrorService.pushEvents(ctx, topic, inStream, null, null); + + // subscribe for listMirrorMaker + long startTime = System.currentTimeMillis(); + String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid); + + while (!isListMirrorMaker(msgFrmSubscribe, randomStr) + && (System.currentTimeMillis() - startTime) < timeout) { + msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid); + } + + JSONArray listMirrorMaker = null; + + if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0 + && isListMirrorMaker(msgFrmSubscribe, randomStr)) { + + listMirrorMaker = getListMirrorMaker(msgFrmSubscribe, randomStr); + String whitelist = null; + + for (int i = 0; i < listMirrorMaker.length(); i++) { + JSONObject mm = new JSONObject(); + mm = listMirrorMaker.getJSONObject(i); + String name = mm.getString("name"); + + if (name.equals(jsonOb.getString("name")) && mm.has("whitelist")) { + whitelist = mm.getString("whitelist"); + break; + } + } + + List<String> topicList = new ArrayList<String>(); + List<String> finalTopicList = new ArrayList<String>(); + + if (whitelist != null) { + topicList = Arrays.asList(whitelist.split(",")); + } + + for (String st : topicList) { + if (!StringUtils.isBlank(st)) { + finalTopicList.add(st); + } + } + + String newTopic = jsonOb.getString("whitelistTopicName"); + + if (!topicList.contains(newTopic) + && getNamespace(newTopic).equals(jsonOb.getString("namespace"))) { + + UpdateWhiteList updateWhiteList = new UpdateWhiteList(); + MirrorMaker mirrorMaker = new MirrorMaker(); + mirrorMaker.setName(jsonOb.getString("name")); + finalTopicList.add(newTopic); + String newWhitelist = ""; + + if (finalTopicList.size() > 0) { + newWhitelist = StringUtils.join(finalTopicList, ","); + } + + mirrorMaker.setWhitelist(newWhitelist); + + String newRandom = getRandomNum(); + updateWhiteList.setMessageID(newRandom); + updateWhiteList.setUpdateWhiteList(mirrorMaker); + + Gson g = new Gson(); + g.toJson(updateWhiteList); + InputStream inputStream = null; + inputStream = IOUtils.toInputStream(g.toJson(updateWhiteList), "UTF-8"); + // callPubSub(newRandom, ctx, inputStream); + callPubSubForWhitelist(newRandom, ctx, inputStream, jsonOb.getString("namespace")); + + } else if (topicList.contains(newTopic)) { + sendErrResponse(ctx, "The topic already exist."); + + } else if (!getNamespace(newTopic).equals(jsonOb.getString("namespace"))) { + sendErrResponse(ctx, + "The namespace of the topic does not match with the namespace you provided."); + } + } else { + + JSONObject err = new JSONObject(); + err.append("error", + "listWhiteList is not available, please make sure MirrorMakerAgent is running"); + DMaaPResponseBuilder.respondOk(ctx, err); + } + + } else { + sendErrResponse(ctx, NO_USER_CREATE_PERMISSION); + } + + } else { + + sendErrResponse(ctx, "This is not a createWhitelist request. Please try again."); + } + + } catch (IOException | CambriaApiException | ConfigDbException | AccessDeniedException + | TopicExistsException | missingReqdSetting | UnavailableException e) { + + e.printStackTrace(); + } + } + // Send error response if user does not provide Authorization + else { + sendErrResponse(ctx, NO_USER_PERMISSION); + } + } + + @SuppressWarnings("unchecked") + @POST + @Produces("application/json") + @Path("/deletewhitelist") + public void deleteWhiteList(InputStream msg) { + + DMaaPContext ctx = getDmaapContext(); + if (checkMirrorMakerPermission(ctx, + AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.mirrormakeruser.aaf"))) { + + loadProperty(); + String input = null; + + try { + input = IOUtils.toString(msg, "UTF-8"); + + if (input != null && input.length() > 0) { + input = removeExtraChar(input); + } + + // Check if it is correct Json object + JSONObject jsonOb = null; + + try { + jsonOb = new JSONObject(input); + + } catch (JSONException ex) { + + sendErrResponse(ctx, errorMessages.getIncorrectJson()); + } + + // Check if the request has name and name contains only alpha numeric, + // check if the request has namespace and + // check if the request has whitelistTopicName + if (jsonOb.length() == 3 && jsonOb.has("name") && isAlphaNumeric(jsonOb.getString("name")) + && jsonOb.has("namespace") && jsonOb.has("whitelistTopicName") + && isAlphaNumeric(jsonOb.getString("whitelistTopicName").substring(jsonOb.getString("whitelistTopicName").lastIndexOf(".")+1, + jsonOb.getString("whitelistTopicName").length()))) { + + String permission = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, + "msgRtr.mirrormakeruser.aaf.create") + jsonOb.getString("namespace") + "|create"; + + // Check if the user have create permission for the + // namespace + if (checkMirrorMakerPermission(ctx, permission)) { + + JSONObject listAll = new JSONObject(); + JSONObject emptyObject = new JSONObject(); + + // Create a listAllMirrorMaker Json object + try { + listAll.put("listAllMirrorMaker", emptyObject); + + } catch (JSONException e) { + + e.printStackTrace(); + } + + // set a random number as messageID + String randomStr = getRandomNum(); + listAll.put("messageID", randomStr); + InputStream inStream = null; + + // convert listAll Json object to InputStream object + try { + inStream = IOUtils.toInputStream(listAll.toString(), "UTF-8"); + + } catch (IOException ioe) { + ioe.printStackTrace(); + } + // call listAllMirrorMaker + mirrorService.pushEvents(ctx, topic, inStream, null, null); + + // subscribe for listMirrorMaker + long startTime = System.currentTimeMillis(); + String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid); + + while (!isListMirrorMaker(msgFrmSubscribe, randomStr) + && (System.currentTimeMillis() - startTime) < timeout) { + msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid); + } + + JSONObject jsonObj = new JSONObject(); + JSONArray jsonArray = null; + JSONArray listMirrorMaker = null; + + if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0 + && isListMirrorMaker(msgFrmSubscribe, randomStr)) { + msgFrmSubscribe = removeExtraChar(msgFrmSubscribe); + jsonArray = new JSONArray(msgFrmSubscribe); + + for (int i = 0; i < jsonArray.length(); i++) { + jsonObj = jsonArray.getJSONObject(i); + + JSONObject obj = new JSONObject(); + if (jsonObj.has("message")) { + obj = jsonObj.getJSONObject("message"); + } + if (obj.has("messageID") && obj.get("messageID").equals(randomStr) && obj.has("listMirrorMaker")) { + listMirrorMaker = obj.getJSONArray("listMirrorMaker"); + break; + } + } + String whitelist = null; + for (int i = 0; i < listMirrorMaker.length(); i++) { + + JSONObject mm = new JSONObject(); + mm = listMirrorMaker.getJSONObject(i); + String name = mm.getString("name"); + + if (name.equals(jsonOb.getString("name")) && mm.has("whitelist")) { + whitelist = mm.getString("whitelist"); + break; + } + } + + List<String> topicList = new ArrayList<String>(); + + if (whitelist != null) { + topicList = Arrays.asList(whitelist.split(",")); + } + boolean removeTopic = false; + String topicToRemove = jsonOb.getString("whitelistTopicName"); + + if (topicList.contains(topicToRemove)) { + removeTopic = true; + } else { + sendErrResponse(ctx, "The topic does not exist."); + } + + + if (removeTopic) { + UpdateWhiteList updateWhiteList = new UpdateWhiteList(); + MirrorMaker mirrorMaker = new MirrorMaker(); + + mirrorMaker.setName(jsonOb.getString("name")); + mirrorMaker.setWhitelist(removeTopic(whitelist, topicToRemove)); + + String newRandom = getRandomNum(); + + updateWhiteList.setMessageID(newRandom); + updateWhiteList.setUpdateWhiteList(mirrorMaker); + + Gson g = new Gson(); + g.toJson(updateWhiteList); + + InputStream inputStream = null; + inputStream = IOUtils.toInputStream(g.toJson(updateWhiteList), "UTF-8"); + callPubSubForWhitelist(newRandom, ctx, inputStream, getNamespace(topicToRemove)); + } + + } else { + + JSONObject err = new JSONObject(); + err.append("error", + "listWhiteList is not available, please make sure MirrorMakerAgent is running"); + DMaaPResponseBuilder.respondOk(ctx, err); + } + + } else { + sendErrResponse(ctx, NO_USER_CREATE_PERMISSION); + } + + } else { + + sendErrResponse(ctx, "This is not a DeleteAllWhitelist request. Please try again."); + } + + } catch (IOException | CambriaApiException | ConfigDbException | AccessDeniedException + | TopicExistsException | missingReqdSetting | UnavailableException e) { + + e.printStackTrace(); + } + } + // Send error response if user does not provide Authorization + else { + sendErrResponse(ctx, NO_USER_PERMISSION); + } + } + + private String getNamespace(String topic) { + return topic.substring(0, topic.lastIndexOf(".")); + } + + private String removeTopic(String whitelist, String topicToRemove) { + List<String> topicList = new ArrayList<String>(); + List<String> newTopicList = new ArrayList<String>(); + + if (whitelist.contains(",")) { + topicList = Arrays.asList(whitelist.split(",")); + + } + + if (topicList.contains(topicToRemove)) { + for (String topic : topicList) { + if (!topic.equals(topicToRemove)) { + newTopicList.add(topic); + } + } + } + + String newWhitelist = StringUtils.join(newTopicList, ","); + + return newWhitelist; + } + + private void callPubSubForWhitelist(String randomStr, DMaaPContext ctx, InputStream inStream, String namespace) { + + try { + mirrorService.pushEvents(ctx, topic, inStream, null, null); + long startTime = System.currentTimeMillis(); + String msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid); + + while (!isListMirrorMaker(msgFrmSubscribe, randomStr) + && (System.currentTimeMillis() - startTime) < timeout) { + msgFrmSubscribe = mirrorService.subscribe(ctx, topic, consumergroup, consumerid); + } + + JSONObject jsonObj = new JSONObject(); + JSONArray jsonArray = null; + JSONArray jsonArrayNamespace = null; + + if (msgFrmSubscribe != null && msgFrmSubscribe.length() > 0 + && isListMirrorMaker(msgFrmSubscribe, randomStr)) { + msgFrmSubscribe = removeExtraChar(msgFrmSubscribe); + jsonArray = new JSONArray(msgFrmSubscribe); + + for (int i = 0; i < jsonArray.length(); i++) { + jsonObj = jsonArray.getJSONObject(i); + + JSONObject obj = new JSONObject(); + if (jsonObj.has("message")) { + obj = jsonObj.getJSONObject("message"); + } + if (obj.has("messageID") && obj.get("messageID").equals(randomStr) && obj.has("listMirrorMaker")) { + jsonArrayNamespace = obj.getJSONArray("listMirrorMaker"); + } + } + JSONObject finalJasonObj = new JSONObject(); + JSONArray finalJsonArray = new JSONArray(); + + for (int i = 0; i < jsonArrayNamespace.length(); i++) { + + JSONObject mmObj = new JSONObject(); + mmObj = jsonArrayNamespace.getJSONObject(i); + String whitelist = null; + + if (mmObj.has("whitelist")) { + whitelist = getWhitelistByNamespace(mmObj.getString("whitelist"), namespace); + + if (whitelist != null) { + mmObj.remove("whitelist"); + mmObj.put("whitelist", whitelist); + } else { + mmObj.remove("whitelist"); + } + } + finalJsonArray.put(mmObj); + } + finalJasonObj.put("listMirrorMaker", finalJsonArray); + + DMaaPResponseBuilder.respondOk(ctx, finalJasonObj); + + } else { + + JSONObject err = new JSONObject(); + err.append("error", "listMirrorMaker is not available, please make sure MirrorMakerAgent is running"); + DMaaPResponseBuilder.respondOk(ctx, err); + } + + } catch (Exception e) { + e.printStackTrace(); + } + } + + private String getWhitelistByNamespace(String originalWhitelist, String namespace) { + + String whitelist = null; + List<String> resultList = new ArrayList<String>(); + List<String> whitelistList = new ArrayList<String>(); + whitelistList = Arrays.asList(originalWhitelist.split(",")); + + for (String topic : whitelistList) { + if (StringUtils.isNotBlank(originalWhitelist) && getNamespace(topic).equals(namespace)) { + resultList.add(topic); + } + } + if (resultList.size() > 0) { + whitelist = StringUtils.join(resultList, ","); + } + + return whitelist; + } + + private JSONArray getListMirrorMaker(String msgFrmSubscribe, String randomStr) { + JSONObject jsonObj = new JSONObject(); + JSONArray jsonArray = new JSONArray(); + JSONArray listMirrorMaker = new JSONArray(); + + msgFrmSubscribe = removeExtraChar(msgFrmSubscribe); + jsonArray = new JSONArray(msgFrmSubscribe); + + for (int i = 0; i < jsonArray.length(); i++) { + jsonObj = jsonArray.getJSONObject(i); + + JSONObject obj = new JSONObject(); + if (jsonObj.has("message")) { + obj = jsonObj.getJSONObject("message"); + } + if (obj.has("messageID") && obj.get("messageID").equals(randomStr) && obj.has("listMirrorMaker")) { + listMirrorMaker = obj.getJSONArray("listMirrorMaker"); + break; + } + } + return listMirrorMaker; + } +} diff --git a/src/main/java/com/att/nsa/dmaap/service/MetricsRestService.java b/src/main/java/com/att/nsa/dmaap/service/MetricsRestService.java new file mode 100644 index 0000000..8a6240e --- /dev/null +++ b/src/main/java/com/att/nsa/dmaap/service/MetricsRestService.java @@ -0,0 +1,152 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.dmaap.service; + +import java.io.IOException; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; + +import org.apache.http.HttpStatus; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Component; + +import com.att.nsa.cambria.CambriaApiException; +import com.att.nsa.cambria.beans.DMaaPContext; +import com.att.nsa.cambria.exception.DMaaPResponseCode; +import com.att.nsa.cambria.exception.ErrorResponse; +import com.att.nsa.cambria.service.MetricsService; +import com.att.nsa.cambria.utils.ConfigurationReader; + +/** + * This class is a CXF REST service which acts + * as gateway for MR Metrics Service. + * @author author + * + */ +@Component +@Path("/") +public class MetricsRestService { + + /** + * Logger obj + */ + //private Logger log = Logger.getLogger(MetricsRestService.class.toString()); + private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class); + /** + * HttpServletRequest obj + */ + @Context + private HttpServletRequest request; + + /** + * HttpServletResponse obj + */ + @Context + private HttpServletResponse response; + + /** + * Config Reader + */ + @Autowired + @Qualifier("configurationReader") + private ConfigurationReader configReader; + + /** + * MetricsService obj + */ + @Autowired + private MetricsService metricsService; + + /** + * Get Metrics method + * @throws CambriaApiException ex + */ + @GET + @Produces("text/plain") + public void getMetrics() throws CambriaApiException { + try { + log.info("MetricsRestService: getMetrics : START"); + metricsService.get(getDmaapContext()); + log.info("MetricsRestService: getMetrics : Completed"); + } catch (IOException e) { + log.error("Error while fetching metrics data : ", e); + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, + DMaaPResponseCode.GET_METRICS_ERROR.getResponseCode(), + "Error while fetching metrics data"+ e.getMessage()); + log.info(errRes.toString()); + throw new CambriaApiException(errRes); + } + } + + /** + * This method is for get the metrics details by the metrics name + * + * @param metricName + * @throws CambriaApiException + */ + @GET + @Path("/{metricName}") + @Produces("text/plain") + public void getMetricsByName(@PathParam("metricName") String metricName) + throws CambriaApiException { + + try { + log.info("MetricsProducer: getMetricsByName : START"); + metricsService.getMetricByName(getDmaapContext(), metricName); + log.info("MetricsRestService: getMetricsByName : Completed"); + } catch (IOException | CambriaApiException e) { + log.error("Error while fetching metrics data : ", e); + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, + DMaaPResponseCode.GET_METRICS_ERROR.getResponseCode(), + "Error while fetching metrics data"+ e.getMessage()); + log.info(errRes.toString()); + throw new CambriaApiException(errRes); + } + } + + /** + * This method is used for taking Configuration Object,HttpServletRequest + * Object,HttpServletRequest HttpServletResponse Object,HttpServletSession + * Object. + * + * @return DMaaPContext object from where user can get Configuration + * Object,HttpServlet Object + * + */ + private DMaaPContext getDmaapContext() { + DMaaPContext dmaapContext = new DMaaPContext(); + dmaapContext.setConfigReader(configReader); + dmaapContext.setRequest(request); + dmaapContext.setResponse(response); + return dmaapContext; + } + +}
\ No newline at end of file diff --git a/src/main/java/com/att/nsa/dmaap/service/TopicRestService.java b/src/main/java/com/att/nsa/dmaap/service/TopicRestService.java new file mode 100644 index 0000000..6742cd5 --- /dev/null +++ b/src/main/java/com/att/nsa/dmaap/service/TopicRestService.java @@ -0,0 +1,688 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.dmaap.service; + +import java.io.IOException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; + +import org.apache.http.HttpStatus; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +import org.json.JSONException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; +import com.att.nsa.cambria.CambriaApiException; +import com.att.nsa.cambria.beans.DMaaPContext; +import com.att.nsa.cambria.beans.TopicBean; +import com.att.nsa.cambria.constants.CambriaConstants; +import com.att.nsa.cambria.exception.DMaaPAccessDeniedException; +import com.att.nsa.cambria.exception.DMaaPErrorMessages; +import com.att.nsa.cambria.exception.DMaaPResponseCode; +import com.att.nsa.cambria.exception.ErrorResponse; +import com.att.nsa.cambria.metabroker.Broker.TopicExistsException; +import com.att.nsa.cambria.security.DMaaPAAFAuthenticator; +import com.att.nsa.cambria.security.DMaaPAAFAuthenticatorImpl; +import com.att.nsa.cambria.service.TopicService; +import com.att.nsa.cambria.utils.ConfigurationReader; +import com.att.nsa.configs.ConfigDbException; +import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException; + +/** + * This class is a CXF REST service which acts + * as gateway for MR Topic Service. + * @author author + * + */ + +@Component +@Path("/") +public class TopicRestService { + + /** + * Logger obj + */ + //private static final Logger LOGGER = Logger .getLogger(TopicRestService.class); + private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicRestService.class); + /** + * Config Reader + */ + @Autowired + @Qualifier("configurationReader") + private ConfigurationReader configReader; + + /** + * HttpServletRequest obj + */ + @Context + private HttpServletRequest request; + + /** + * HttpServletResponse obj + */ + @Context + private HttpServletResponse response; + + /** + * TopicService obj + */ + @Autowired + private TopicService topicService; + + /** + * DMaaPErrorMessages obj + */ + @Autowired + private DMaaPErrorMessages errorMessages; + + /** + * mrNamespace + */ + //@Value("${msgRtr.namespace.aaf}") +// private String mrNamespace; + + + /** + * Fetches a list of topics from the current kafka instance and converted + * into json object. + * + * @return list of the topics in json format + * @throws AccessDeniedException + * @throws CambriaApiException + * @throws IOException + * @throws JSONException + * */ + @GET + //@Produces(MediaType.TEXT_PLAIN) + public void getTopics() throws CambriaApiException { + try { + + LOGGER.info("Authenticating the user before fetching the topics"); + //String permission = "com.att.dmaap.mr.topic|*|view"; + String mrNameS= com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,"msgRtr.namespace.aaf"); + String permission =mrNameS+"|"+"*"+"|"+"view"; + DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); + //Check if client is using AAF CADI Basic Authorization + //If yes then check for AAF role authentication else display all topics + if(null!=getDmaapContext().getRequest().getHeader("Authorization")) + { + if(!aaf.aafAuthentication(getDmaapContext().getRequest(), permission)) + { + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, + DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), + errorMessages.getNotPermitted1()+" read "+errorMessages.getNotPermitted2()); + LOGGER.info(errRes.toString()); + throw new DMaaPAccessDeniedException(errRes); + + + } + } + + LOGGER.info("Fetching all Topics"); + + topicService.getTopics(getDmaapContext()); + + LOGGER.info("Returning List of all Topics"); + + + } catch (JSONException | ConfigDbException | IOException excp) { + LOGGER.error( + "Failed to retrieve list of all topics: " + + excp.getMessage(), excp); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, + DMaaPResponseCode.GET_TOPICS_FAIL.getResponseCode(), + errorMessages.getTopicsfailure()+ excp.getMessage()); + LOGGER.info(errRes.toString()); + throw new CambriaApiException(errRes); + + + } + } + + /** + * Fetches a list of topics from the current kafka instance and converted + * into json object. + * + * @return list of the topics in json format + * @throws AccessDeniedException + * @throws CambriaApiException + * @throws IOException + * @throws JSONException + * */ + @GET + @Path("/listAll") + //@Produces(MediaType.TEXT_PLAIN) + public void getAllTopics() throws CambriaApiException { + try { + + LOGGER.info("Authenticating the user before fetching the topics"); + //String permission = "com.att.dmaap.mr.topic|*|view"; + String mrNameS= com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,"msgRtr.namespace.aaf"); + String permission =mrNameS+"|"+"*"+"|"+"view"; + DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); + //Check if client is using AAF CADI Basic Authorization + //If yes then check for AAF role authentication else display all topics + if(null!=getDmaapContext().getRequest().getHeader("Authorization")) + { + if(!aaf.aafAuthentication(getDmaapContext().getRequest(), permission)) + { + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, + DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), + errorMessages.getNotPermitted1()+" read "+errorMessages.getNotPermitted2()); + LOGGER.info(errRes.toString()); + throw new DMaaPAccessDeniedException(errRes); + + + } + } + + LOGGER.info("Fetching all Topics"); + + topicService.getAllTopics(getDmaapContext()); + + LOGGER.info("Returning List of all Topics"); + + + } catch (JSONException | ConfigDbException | IOException excp) { + LOGGER.error( + "Failed to retrieve list of all topics: " + + excp.getMessage(), excp); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, + DMaaPResponseCode.GET_TOPICS_FAIL.getResponseCode(), + errorMessages.getTopicsfailure()+ excp.getMessage()); + LOGGER.info(errRes.toString()); + throw new CambriaApiException(errRes); + + + } + } + + + /** + * Returns details of the topic whose name is passed as a parameter + * + * @param topicName + * - name of the topic + * @return details of a topic whose name is mentioned in the request in json + * format. + * @throws AccessDeniedException + * @throws DMaaPAccessDeniedException + * @throws IOException + * */ + @GET + @Path("/{topicName}") + //@Produces(MediaType.TEXT_PLAIN) + public void getTopic(@PathParam("topicName") String topicName) throws CambriaApiException { + try { + + LOGGER.info("Authenticating the user before fetching the details about topic = "+ topicName); + DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); + + //String permission= "com.att.ecomp_test.crm.mr.topic|:topic.com.att.ecomp_test.crm.preDemo|view"; + + //Check if client is using AAF CADI Basic Authorization + //If yes then check for AAF role authentication else display all topics + if(null!=getDmaapContext().getRequest().getHeader("Authorization")) + { + String permission = aaf.aafPermissionString(topicName, "view"); + if(!aaf.aafAuthentication(getDmaapContext().getRequest(), permission)) + { + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, + DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), + errorMessages.getNotPermitted1()+" read "+errorMessages.getNotPermitted2()); + LOGGER.info(errRes.toString()); + throw new DMaaPAccessDeniedException(errRes); + } + } + + LOGGER.info("Fetching Topic: " + topicName); + + topicService.getTopic(getDmaapContext(), topicName); + + LOGGER.info("Fetched details of topic: " + topicName); + + } catch (ConfigDbException | IOException | TopicExistsException excp) { + LOGGER.error("Failed to retrieve details of topic: " + topicName, + excp); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, + DMaaPResponseCode.GET_TOPICS_DETAILS_FAIL.getResponseCode(), + errorMessages.getTopicDetailsFail()+topicName+ excp.getMessage()); + LOGGER.info(errRes.toString()); + throw new CambriaApiException(errRes); + + + } + } + + + + /** + * This method is still not working. Need to check on post call and how to + * accept parameters for post call + * + * @param topicBean + * it will have the bean object + * @throws TopicExistsException + * @throws CambriaApiException + * @throws JSONException + * @throws IOException + * @throws AccessDeniedException + * + * */ + @POST + @Path("/create") + @Consumes({ MediaType.APPLICATION_JSON }) + //@Produces(MediaType.TEXT_PLAIN) + public void createTopic(TopicBean topicBean) throws CambriaApiException, JSONException { + try { + LOGGER.info("Creating Topic."+topicBean.getTopicName()); + + topicService.createTopic(getDmaapContext(), topicBean); + + LOGGER.info("Topic created Successfully."); + } + catch (TopicExistsException ex){ + + LOGGER.error("Error while creating a topic: " + ex.getMessage(), + ex); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT, + DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(), + errorMessages.getCreateTopicFail()+ ex.getMessage()); + LOGGER.info(errRes.toString()); + throw new CambriaApiException(errRes); + + + + + }catch (AccessDeniedException | DMaaPAccessDeniedException excp) { + LOGGER.error("Error while creating a topic: " + excp.getMessage(), + excp); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, + DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(), + errorMessages.getCreateTopicFail()+ excp.getMessage()); + LOGGER.info(errRes.toString()); + throw new CambriaApiException(errRes); + + }catch (CambriaApiException | IOException excp) { + LOGGER.error("Error while creating a topic: " + excp.getMessage(), + excp); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, + DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(), + errorMessages.getCreateTopicFail()+ excp.getMessage()); + LOGGER.info(errRes.toString()); + throw new CambriaApiException(errRes); + + } + } + + /** + * Deletes existing topic whose name is passed as a parameter + * + * @param topicName + * topic + * @throws CambriaApiException + * @throws IOException + * */ + @DELETE + @Path("/{topicName}") + //@Produces(MediaType.TEXT_PLAIN) + public void deleteTopic(@PathParam("topicName") String topicName) throws CambriaApiException { + try { + LOGGER.info("Deleting Topic: " + topicName); + + topicService.deleteTopic(getDmaapContext(), topicName); + + LOGGER.info("Topic [" + topicName + "] deleted successfully."); + } catch (DMaaPAccessDeniedException| AccessDeniedException excp) { + LOGGER.error("Error while creating a topic: " + excp.getMessage(), + excp); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, + DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(), + errorMessages.getCreateTopicFail()+ excp.getMessage()); + LOGGER.info(errRes.toString()); + throw new CambriaApiException(errRes); + + }catch (IOException | ConfigDbException + | CambriaApiException | TopicExistsException excp) { + LOGGER.error("Error while deleting topic: " + topicName, excp); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, + DMaaPResponseCode.DELETE_TOPIC_FAIL.getResponseCode(), + errorMessages.getDeleteTopicFail()+ topicName + excp.getMessage()); + LOGGER.info(errRes.toString()); + throw new CambriaApiException(errRes); + + } + } + + private DMaaPContext getDmaapContext() { + + DMaaPContext dmaapContext = new DMaaPContext(); + dmaapContext.setRequest(request); + dmaapContext.setResponse(response); + dmaapContext.setConfigReader(configReader); + + return dmaapContext; + + } + + /** + * This method will fetch the details of publisher by giving topic name + * + * @param topicName + * @throws CambriaApiException + * @throws AccessDeniedException + */ + @GET + @Path("/{topicName}/producers") + //@Produces(MediaType.TEXT_PLAIN) + public void getPublishersByTopicName( + @PathParam("topicName") String topicName) throws CambriaApiException { + try { + +// String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage"; +// DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); +// String permission = aaf.aafPermissionString(topicName, "view"); +// if(aaf.aafAuthentication(getDmaapContext().getRequest(), permission)) +// { + LOGGER.info("Fetching list of all the publishers for topic " + + topicName); + + topicService.getPublishersByTopicName(getDmaapContext(), topicName); + + LOGGER.info("Returning list of all the publishers for topic " + + topicName); +// }else{ +// LOGGER.error("Error while fetching list of publishers for topic "+ topicName); +// +// ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, +// DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), +// errorMessages.getNotPermitted1()+" fetch list of publishers "+errorMessages.getNotPermitted2()); +// LOGGER.info(errRes); +// throw new DMaaPAccessDeniedException(errRes); +// +// } + + } catch (IOException | ConfigDbException | TopicExistsException excp) { + LOGGER.error("Error while fetching list of publishers for topic " + + topicName, excp); + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, + DMaaPResponseCode.GET_PUBLISHERS_BY_TOPIC.getResponseCode(), + "Error while fetching list of publishers for topic: " + + topicName + excp.getMessage()); + LOGGER.info(errRes.toString()); + throw new CambriaApiException(errRes); + + } + } + + /** + * proving permission for the topic for a particular publisher id + * + * @param topicName + * @param producerId + * @throws CambriaApiException + */ + @PUT + @Path("/{topicName}/producers/{producerId}") + public void permitPublisherForTopic( + @PathParam("topicName") String topicName, + @PathParam("producerId") String producerId) throws CambriaApiException { + try { + LOGGER.info("Granting write access to producer [" + producerId + + "] for topic " + topicName); + + topicService.permitPublisherForTopic(getDmaapContext(), topicName, + producerId); + + LOGGER.info("Write access has been granted to producer [" + + producerId + "] for topic " + topicName); + } catch (AccessDeniedException | DMaaPAccessDeniedException excp) { + LOGGER.error("Error while creating a topic: " + excp.getMessage(), + excp); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, + DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(), + errorMessages.getCreateTopicFail()+ excp.getMessage()); + LOGGER.info(errRes.toString()); + throw new CambriaApiException(errRes); + + }catch ( ConfigDbException | IOException + | TopicExistsException excp) { + LOGGER.error("Error while granting write access to producer [" + + producerId + "] for topic " + topicName, excp); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, + DMaaPResponseCode.PERMIT_PUBLISHER_FOR_TOPIC.getResponseCode(), + "Error while granting write access to producer [" + + producerId + "] for topic " + topicName + excp.getMessage()); + LOGGER.info(errRes.toString()); + throw new CambriaApiException(errRes); + + } + } + + /** + * Removing access for a publisher id for any particular topic + * + * @param topicName + * @param producerId + * @throws CambriaApiException + */ + @DELETE + @Path("/{topicName}/producers/{producerId}") + public void denyPublisherForTopic(@PathParam("topicName") String topicName, + @PathParam("producerId") String producerId) throws CambriaApiException { + try { + LOGGER.info("Revoking write access to producer [" + producerId + + "] for topic " + topicName); + + topicService.denyPublisherForTopic(getDmaapContext(), topicName, + producerId); + + LOGGER.info("Write access revoked for producer [" + producerId + + "] for topic " + topicName); + } catch (DMaaPAccessDeniedException | AccessDeniedException excp) { + LOGGER.error("Error while creating a topic: " + excp.getMessage(), + excp); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, + DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(), + errorMessages.getCreateTopicFail()+ excp.getMessage()); + LOGGER.info(errRes.toString()); + throw new CambriaApiException(errRes); + + }catch ( ConfigDbException | IOException + | TopicExistsException excp) { + LOGGER.error("Error while revoking write access for producer [" + + producerId + "] for topic " + topicName, excp); + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, + DMaaPResponseCode.REVOKE_PUBLISHER_FOR_TOPIC.getResponseCode(), + "Error while revoking write access to producer [" + + producerId + "] for topic " + topicName + excp.getMessage()); + LOGGER.info(errRes.toString()); + throw new CambriaApiException(errRes); + } + } + + /** + * Get the consumer details by the topic name + * + * @param topicName + * @throws AccessDeniedException + * @throws CambriaApiException + */ + @GET + @Path("/{topicName}/consumers") + //@Produces(MediaType.TEXT_PLAIN) + public void getConsumersByTopicName(@PathParam("topicName") String topicName) throws AccessDeniedException, + CambriaApiException { + try { + + +// String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"view"; +// DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); +// String permission = aaf.aafPermissionString(topicName, "view"); +// if(aaf.aafAuthentication(getDmaapContext().getRequest(), permission)) +// { + LOGGER.info("Fetching list of all consumers for topic " + topicName); + + topicService.getConsumersByTopicName(getDmaapContext(), topicName); + + LOGGER.info("Returning list of all consumers for topic " + + topicName); + +// }else{ +// LOGGER.error( +// "Error while fetching list of all consumers for topic " +// + topicName); +// ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, +// DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), +// errorMessages.getNotPermitted1()+" fetch list of consumers "+errorMessages.getNotPermitted2()); +// LOGGER.info(errRes); +// throw new DMaaPAccessDeniedException(errRes); +// +// +// } + + + + } catch (IOException | ConfigDbException | TopicExistsException excp) { + LOGGER.error( + "Error while fetching list of all consumers for topic " + + topicName, excp); + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, + DMaaPResponseCode.GET_CONSUMERS_BY_TOPIC.getResponseCode(), + "Error while fetching list of all consumers for topic: " + + topicName+ excp.getMessage()); + LOGGER.info(errRes.toString()); + throw new CambriaApiException(errRes); + + } + } + + /** + * providing access for consumer for any particular topic + * + * @param topicName + * @param consumerId + * @throws CambriaApiException + */ + @PUT + @Path("/{topicName}/consumers/{consumerId}") + public void permitConsumerForTopic( + @PathParam("topicName") String topicName, + @PathParam("consumerId") String consumerId) throws CambriaApiException { + try { + LOGGER.info("Granting read access to consumer [" + consumerId + + "] for topic " + topicName); + + topicService.permitConsumerForTopic(getDmaapContext(), topicName, + consumerId); + + LOGGER.info("Read access granted to consumer [" + consumerId + + "] for topic " + topicName); + } catch (AccessDeniedException | ConfigDbException | IOException + | TopicExistsException excp) { + LOGGER.error("Error while granting read access to consumer [" + + consumerId + "] for topic " + topicName, excp); + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, + DMaaPResponseCode.PERMIT_CONSUMER_FOR_TOPIC.getResponseCode(), + "Error while granting read access to consumer [" + + consumerId + "] for topic " + topicName+ excp.getMessage()); + LOGGER.info(errRes.toString()); + throw new CambriaApiException(errRes); + + } + } + + /** + * Removing access for consumer for any particular topic + * + * @param topicName + * @param consumerId + * @throws CambriaApiException + */ + @DELETE + @Path("/{topicName}/consumers/{consumerId}") + public void denyConsumerForTopic(@PathParam("topicName") String topicName, + @PathParam("consumerId") String consumerId) throws CambriaApiException { + try { + LOGGER.info("Revoking read access to consumer [" + consumerId + + "] for topic " + topicName); + + topicService.denyConsumerForTopic(getDmaapContext(), topicName, + consumerId); + + LOGGER.info("Read access revoked to consumer [" + consumerId + + "] for topic " + topicName); + } catch ( ConfigDbException | IOException + | TopicExistsException excp) { + LOGGER.error("Error while revoking read access to consumer [" + + consumerId + "] for topic " + topicName, excp); + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, + DMaaPResponseCode.REVOKE_CONSUMER_FOR_TOPIC.getResponseCode(), + "Error while revoking read access to consumer [" + + consumerId + "] for topic " + topicName+ excp.getMessage()); + LOGGER.info(errRes.toString()); + throw new CambriaApiException(errRes); + }catch (DMaaPAccessDeniedException | AccessDeniedException excp) { + LOGGER.error("Error while creating a topic: " + excp.getMessage(), + excp); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, + DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(), + errorMessages.getCreateTopicFail()+ excp.getMessage()); + LOGGER.info(errRes.toString()); + throw new CambriaApiException(errRes); + + } + } + + + + +} diff --git a/src/main/java/com/att/nsa/dmaap/service/TransactionRestService.java b/src/main/java/com/att/nsa/dmaap/service/TransactionRestService.java new file mode 100644 index 0000000..a44c2ad --- /dev/null +++ b/src/main/java/com/att/nsa/dmaap/service/TransactionRestService.java @@ -0,0 +1,176 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.dmaap.service; + +import java.io.IOException; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.core.Context; + +import org.apache.http.HttpStatus; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Component; + +import com.att.aft.dme2.internal.jettison.json.JSONException; +import com.att.nsa.cambria.CambriaApiException; +import com.att.nsa.cambria.beans.DMaaPContext; +import com.att.nsa.cambria.exception.DMaaPResponseCode; +import com.att.nsa.cambria.exception.ErrorResponse; +import com.att.nsa.cambria.service.TransactionService; +import com.att.nsa.cambria.utils.ConfigurationReader; +import com.att.nsa.configs.ConfigDbException; + +/** + * This class is a CXF REST service + * which acts as gateway for DMaaP + * Transaction Ids. + * @author author + * + */ +@Component +@Path("/") +public class TransactionRestService { + + /** + * Logger obj + */ + private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TransactionRestService.class); + + /** + * HttpServletRequest obj + */ + @Context + private HttpServletRequest request; + + /** + * HttpServletResponse obj + */ + @Context + private HttpServletResponse response; + + /** + * Config Reader + */ + @Autowired + @Qualifier("configurationReader") + private ConfigurationReader configReader; + + @Autowired + private TransactionService transactionService; + + /** + * + * Returns a list of all the existing Transaction Ids + * @throws CambriaApiException + * + * @throws IOException + * @exception ConfigDbException + * @exception IOException + * + * + */ + @GET + public void getAllTransactionObjs() throws CambriaApiException { + try { + LOGGER.info("Retrieving list of all transactions."); + + transactionService.getAllTransactionObjs(getDmaapContext()); + + LOGGER.info("Returning list of all transactions."); + } catch (ConfigDbException | IOException e) { + LOGGER.error("Error while retrieving list of all transactions: " + + e.getMessage(), e); + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_EXPECTATION_FAILED, + DMaaPResponseCode.RETRIEVE_TRANSACTIONS.getResponseCode(), + "Error while retrieving list of all transactions:"+e.getMessage()); + LOGGER.info(errRes.toString()); + throw new CambriaApiException(errRes); + } + } + + /** + * + * Returns details of a particular transaction id whose <code>name</code> is + * passed as a parameter + * + * @param transactionId + * - id of transaction + * @throws CambriaApiException + * @throws IOException + * @exception ConfigDbException + * @exception IOException + * @exception JSONException + * + * + */ + @GET + @Path("/{transactionId}") + public void getTransactionObj( + @PathParam("transactionId") String transactionId) throws CambriaApiException { + + LOGGER.info("Fetching details of Transaction ID : " + transactionId); + + try { + transactionService.getTransactionObj(getDmaapContext(), + transactionId); + } catch (ConfigDbException | JSONException | IOException e) { + LOGGER.error("Error while retrieving transaction details for id: " + + transactionId, e); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_EXPECTATION_FAILED, + DMaaPResponseCode.RETRIEVE_TRANSACTIONS_DETAILS.getResponseCode(), + "Error while retrieving transaction details for id: [" + + transactionId + "]: " + e.getMessage()); + LOGGER.info(errRes.toString()); + throw new CambriaApiException(errRes); + + } + + LOGGER.info("Returning details of transaction " + transactionId); + + } + + /** + * This method is used for taking Configuration Object,HttpServletRequest + * Object,HttpServletRequest HttpServletResponse Object,HttpServletSession + * Object. + * + * @return DMaaPContext object from where user can get Configuration + * Object,HttpServlet Object + * + */ + private DMaaPContext getDmaapContext() { + DMaaPContext dmaapContext = new DMaaPContext(); + dmaapContext.setConfigReader(configReader); + dmaapContext.setRequest(request); + dmaapContext.setResponse(response); + return dmaapContext; + } + +}
\ No newline at end of file diff --git a/src/main/java/com/att/nsa/dmaap/service/UIRestServices.java b/src/main/java/com/att/nsa/dmaap/service/UIRestServices.java new file mode 100644 index 0000000..79a39fb --- /dev/null +++ b/src/main/java/com/att/nsa/dmaap/service/UIRestServices.java @@ -0,0 +1,198 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.dmaap.service; + +import java.io.IOException; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.core.Context; + +import kafka.common.TopicExistsException; + +import org.apache.http.HttpStatus; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Component; + +import com.att.nsa.cambria.beans.DMaaPContext; +import com.att.nsa.cambria.service.UIService; +import com.att.nsa.cambria.utils.ConfigurationReader; +import com.att.nsa.cambria.utils.DMaaPResponseBuilder; +import com.att.nsa.configs.ConfigDbException; + +/** + * UI Rest Service + * @author author + * + */ +@Component +public class UIRestServices { + + /** + * Logger obj + */ + //private static final Logger LOGGER = Logger.getLogger(UIRestServices.class); + + private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(UIRestServices.class); + + @Autowired + private UIService uiService; + + /** + * Config Reader + */ + @Autowired + @Qualifier("configurationReader") + private ConfigurationReader configReader; + + /** + * HttpServletRequest obj + */ + @Context + private HttpServletRequest request; + + /** + * HttpServletResponse obj + */ + @Context + private HttpServletResponse response; + + /** + * getting the hello + */ + @GET + @Path("/") + public void hello() { + try { + LOGGER.info("Calling hello page."); + + uiService.hello(getDmaapContext()); + + LOGGER.info("Hello page is returned."); + } catch (IOException excp) { + LOGGER.error("Error while calling hello page: " + excp.getMessage(), excp); + DMaaPResponseBuilder.respondWithError(getDmaapContext(), HttpStatus.SC_NOT_FOUND, + "Error while calling hello page: " + excp.getMessage()); + } + } + + /** + * getApikeysTable + */ + @GET + @Path("/ui/apikeys") + public void getApiKeysTable() { + try { + LOGGER.info("Fetching list of all api keys."); + + uiService.getApiKeysTable(getDmaapContext()); + + LOGGER.info("Returning list of all api keys."); + } catch (ConfigDbException | IOException excp) { + LOGGER.error("Error while fetching list of all api keys: " + excp.getMessage(), excp); + DMaaPResponseBuilder.respondWithError(getDmaapContext(), HttpStatus.SC_NOT_FOUND, + "Error while fetching list of all api keys: " + excp.getMessage()); + } + } + + /** + * getApiKey + * + * @param apiKey + * @exception Exception + */ + @GET + @Path("/ui/apikeys/{apiKey}") + public void getApiKey(@PathParam("apiKey") String apiKey) { + try { + LOGGER.info("Fetching details of api key: " + apiKey); + + uiService.getApiKey(getDmaapContext(), apiKey); + + LOGGER.info("Returning details of api key: " + apiKey); + } catch (Exception excp) { + LOGGER.error("Error while fetching details of api key: " + apiKey, excp); + DMaaPResponseBuilder.respondWithError(getDmaapContext(), HttpStatus.SC_NOT_FOUND, + "Error while fetching details of api key: " + apiKey); + } + } + + @GET + @Path("/ui/topics") + public void getTopicsTable() { + try { + LOGGER.info("Fetching list of all topics."); + + uiService.getTopicsTable(getDmaapContext()); + + LOGGER.info("Returning list of all topics."); + } catch (ConfigDbException | IOException excp) { + LOGGER.error("Error while fetching list of all topics: " + excp, excp); + DMaaPResponseBuilder.respondWithError(getDmaapContext(), HttpStatus.SC_NOT_FOUND, + "Error while fetching list of all topics: " + excp.getMessage()); + } + } + + /** + * + * @param topic + */ + @GET + @Path("/ui/topics/{topic}") + public void getTopic(@PathParam("topic") String topic) { + try { + LOGGER.info("Fetching details of topic: " + topic); + + uiService.getTopic(getDmaapContext(), topic); + + LOGGER.info("Returning details of topic: " + topic); + } catch (ConfigDbException | IOException | TopicExistsException excp) { + LOGGER.error("Error while fetching details of topic: " + topic, excp); + DMaaPResponseBuilder.respondWithError(getDmaapContext(), HttpStatus.SC_NOT_FOUND, + "Error while fetching details of topic: " + topic); + } + } + + /** + * This method is used for taking Configuration Object,HttpServletRequest + * Object,HttpServletRequest HttpServletResponse Object,HttpServletSession + * Object. + * + * @return DMaaPContext object from where user can get Configuration + * Object,HttpServlet Object + * + */ + private DMaaPContext getDmaapContext() { + DMaaPContext dmaapContext = new DMaaPContext(); + dmaapContext.setConfigReader(configReader); + dmaapContext.setRequest(request); + dmaapContext.setResponse(response); + return dmaapContext; + } +} diff --git a/src/main/java/com/att/nsa/dmaap/tools/ConfigTool.java b/src/main/java/com/att/nsa/dmaap/tools/ConfigTool.java new file mode 100644 index 0000000..4424840 --- /dev/null +++ b/src/main/java/com/att/nsa/dmaap/tools/ConfigTool.java @@ -0,0 +1,818 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.dmaap.tools; + +import java.io.IOException; +import java.io.PrintStream; +import java.security.NoSuchAlgorithmException; +import java.util.Date; +import java.util.LinkedList; +import java.util.Map.Entry; + +import org.json.JSONException; + +import com.att.nsa.apiServer.CommonServlet; +import com.att.nsa.cambria.beans.DMaaPKafkaMetaBroker; +import com.att.nsa.cambria.metabroker.Topic; +import com.att.nsa.cmdtool.Command; +import com.att.nsa.cmdtool.CommandLineTool; +import com.att.nsa.cmdtool.CommandNotReadyException; +import com.att.nsa.configs.ConfigDb; +import com.att.nsa.configs.ConfigDbException; +import com.att.nsa.configs.ConfigPath; +import com.att.nsa.configs.confimpl.EncryptingLayer; +import com.att.nsa.configs.confimpl.ZkConfigDb; +import com.att.nsa.drumlin.till.data.rrConvertor; +import com.att.nsa.drumlin.till.data.uniqueStringGenerator; +import com.att.nsa.drumlin.till.nv.impl.nvWriteableTable; +import com.att.nsa.security.db.BaseNsaApiDbImpl; +import com.att.nsa.security.db.EncryptingApiDbImpl; +import com.att.nsa.security.db.NsaApiDb.KeyExistsException; +import com.att.nsa.security.db.simple.NsaSimpleApiKey; +import com.att.nsa.security.db.simple.NsaSimpleApiKeyFactory; +import com.att.nsa.util.NsaClock; + +public class ConfigTool extends CommandLineTool<ConfigToolContext> +{ + protected ConfigTool () + { + super ( "Cambria API Config Tool", "cambriaConfig> " ); + + super.registerCommand ( new ListTopicCommand () ); + super.registerCommand ( new WriteTopicCommand () ); + super.registerCommand ( new ReadTopicCommand () ); + super.registerCommand ( new SetTopicOwnerCommand () ); + super.registerCommand ( new InitSecureTopicCommand () ); + super.registerCommand ( new ListApiKeysCommand () ); + super.registerCommand ( new PutApiCommand () ); + super.registerCommand ( new writeApiKeyCommand () ); + super.registerCommand ( new EncryptApiKeysCommand () ); + super.registerCommand ( new DecryptApiKeysCommand () ); + super.registerCommand ( new NodeFetchCommand () ); + super.registerCommand ( new DropOldConsumerGroupsCommand () ); + } + + public static void main ( String[] args ) throws IOException + { + final String connStr = args.length>0 ? args[0] : "localhost:2181"; + final ConfigDb db = new ZkConfigDb ( + connStr, + args.length>1 ? args[1] : CommonServlet.getDefaultZkRoot ( "cambria" ) + ); + + final ConfigToolContext context = new ConfigToolContext ( db, connStr, new nvWriteableTable() ); + final ConfigTool ct = new ConfigTool (); + ct.runFromMain ( args, context ); + } + + private static class ListTopicCommand implements Command<ConfigToolContext> + { + @Override + public String[] getMatches () + { + return new String[] { "topics", "list (\\S*)" }; + } + + @Override + public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException + { + } + + @Override + public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException + { + try + { + final ConfigDb db = context.getDb(); + final ConfigPath base = db.parse ( "/topics" ); + + if ( parts.length > 0 ) + { + final ConfigPath myTopic = base.getChild ( parts[0] ); + final String data = db.load ( myTopic ); + if ( data != null ) + { + out.println ( data ); + } + else + { + out.println ( "No topic [" + parts[0] + "]" ); + } + } + else + { + for ( ConfigPath child : db.loadChildrenNames ( base ) ) + { + out.println ( child.getName () ); + } + } + } + catch ( ConfigDbException e ) + { + out.println ( "Command failed: " + e.getMessage() ); + } + } + + @Override + public void displayHelp ( PrintStream out ) + { + out.println ( "topics" ); + out.println ( "list <topic>" ); + } + } + + private static class WriteTopicCommand implements Command<ConfigToolContext> + { + @Override + public String[] getMatches () + { + return new String[] { "write (\\S*) (\\S*)" }; + } + + @Override + public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException + { + } + + @Override + public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException + { + try + { + final ConfigDb db = context.getDb(); + final ConfigPath base = db.parse ( "/topics" ); + final ConfigPath myTopic = base.getChild ( parts[0] ); + db.store ( myTopic, parts[1] ); + out.println ( "wrote [" + parts[1] + "] to topic [" + parts[0] + "]" ); + } + catch ( ConfigDbException e ) + { + out.println ( "Command failed: " + e.getMessage() ); + } + } + + @Override + public void displayHelp ( PrintStream out ) + { + out.println ( "write <topic> <string>" ); + out.println ( "\tBe careful with this. You can write data that's not compatible with Cambria's config db." ); + } + } + + private static class ReadTopicCommand implements Command<ConfigToolContext> + { + @Override + public String[] getMatches () + { + return new String[] { "read (\\S*)" }; + } + + @Override + public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException + { + } + + @Override + public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException + { + try + { + final ConfigDb db = context.getDb(); + final ConfigPath base = db.parse ( "/topics" ); + final ConfigPath myTopic = base.getChild ( parts[0] ); + db.store ( myTopic, parts[1] ); + out.println ( "wrote [" + parts[1] + "] to topic [" + parts[0] + "]" ); + } + catch ( ConfigDbException e ) + { + out.println ( "Command failed: " + e.getMessage() ); + } + } + + @Override + public void displayHelp ( PrintStream out ) + { + out.println ( "read <topic>" ); + out.println ( "\tRead config data for a topic." ); + } + } + + private static class InitSecureTopicCommand implements Command<ConfigToolContext> + { + @Override + public String[] getMatches () + { + return new String[] { "initTopic (\\S*) (\\S*) (\\S*)" }; + } + + @Override + public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException + { + } + + @Override + public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException + { + try + { + DMaaPKafkaMetaBroker.createTopicEntry ( context.getDb (), + context.getDb ().parse("/topics"), parts[0], parts[2], parts[1],true ); + out.println ( "Topic [" + parts[0] + "] updated." ); + } + catch ( ConfigDbException e ) + { + out.println ( "Command failed: " + e.getMessage () ); + } + } + + @Override + public void displayHelp ( PrintStream out ) + { + out.println ( "initTopic <topic> <ownerApiKey> <description>" ); + } + } + + private static class SetTopicOwnerCommand implements Command<ConfigToolContext> + { + @Override + public String[] getMatches () + { + return new String[] { "setOwner (\\S*) (\\S*)" }; + } + + @Override + public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException + { + } + + @Override + public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException + { + try + { + final Topic kt = DMaaPKafkaMetaBroker.getKafkaTopicConfig ( context.getDb(), + context.getDb().parse ( "/topics" ), parts[0] ); + if ( kt != null ) + { + final String desc = kt.getDescription (); + + DMaaPKafkaMetaBroker.createTopicEntry ( context.getDb (), + context.getDb ().parse("/topics"), parts[0], desc, parts[1], true ); + out.println ( "Topic [" + parts[0] + "] updated." ); + } + else + { + out.println ( "Topic [" + parts[0] + "] doesn't exist." ); + } + } + catch ( ConfigDbException e ) + { + out.println ( "Command failed: " + e.getMessage () ); + } + } + + @Override + public void displayHelp ( PrintStream out ) + { + out.println ( "setOwner <topic> <ownerApiKey>" ); + } + } + + private static class ListApiKeysCommand implements Command<ConfigToolContext> + { + @Override + public String[] getMatches () + { + return new String[] { "listApiKeys", "listApiKey (\\S*) (\\S*) (\\S*)", "listApiKey (\\S*)" }; + } + + @Override + public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException + { + } + + @Override + public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException + { + try + { + final ConfigDb db = context.getDb (); + if ( parts.length == 0 ) + { + final BaseNsaApiDbImpl<NsaSimpleApiKey> readFrom = new BaseNsaApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory () ); + int count = 0; + for ( String key : readFrom.loadAllKeys () ) + { + out.println ( key ); + count++; + } + out.println ( "" + count + " records." ); + } + else + { + BaseNsaApiDbImpl<NsaSimpleApiKey> readFrom = new BaseNsaApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory () ); + if ( parts.length == 3 ) + { + readFrom = new EncryptingApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory (), + EncryptingLayer.readSecretKey ( parts[1] ), rrConvertor.base64Decode ( parts[2] ) ); + } + final NsaSimpleApiKey apikey = readFrom.loadApiKey ( parts[0] ); + if ( apikey == null ) + { + out.println ( "Key '" + parts[0] + "' not found." ); + } + else + { + out.println ( apikey.asJsonObject ().toString () ); + } + } + } + catch ( ConfigDbException e ) + { + out.println ( "Command failed: " + e.getMessage() ); + } + catch ( JSONException e ) + { + out.println ( "Command failed: " + e.getMessage() ); + } + } + + @Override + public void displayHelp ( PrintStream out ) + { + out.println ( "listApiKeys" ); + out.println ( "listApiKey <key>" ); + out.println ( "listApiKey <key> <dbKey> <dbIv>" ); + } + } + + private static class PutApiCommand implements Command<ConfigToolContext> + { + @Override + public String[] getMatches () + { + return new String[] + { + // these are <key> <enckey> <encinit> <value> + "putApiKey (secret) (\\S*) (\\S*) (\\S*) (\\S*)", + "putApiKey (email) (\\S*) (\\S*) (\\S*) (\\S*)", + "putApiKey (description) (\\S*) (\\S*) (\\S*) (\\S*)" + }; + } + + @Override + public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException + { + } + + @Override + public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException + { + try + { + final ConfigDb db = context.getDb (); + if ( parts.length == 5 ) + { + final BaseNsaApiDbImpl<NsaSimpleApiKey> apiKeyDb = + new EncryptingApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory (), + EncryptingLayer.readSecretKey ( parts[2] ), rrConvertor.base64Decode ( parts[3] ) ); + + final NsaSimpleApiKey apikey = apiKeyDb.loadApiKey ( parts[1] ); + if ( apikey == null ) + { + out.println ( "Key '" + parts[1] + "' not found." ); + } + else + { + if ( parts[0].equalsIgnoreCase ( "secret" ) ) + { + apikey.resetSecret ( parts[4] ); + } + else if ( parts[0].equalsIgnoreCase ( "email" ) ) + { + apikey.setContactEmail ( parts[4] ); + } + else if ( parts[0].equalsIgnoreCase ( "description" ) ) + { + apikey.setDescription ( parts[4] ); + } + + apiKeyDb.saveApiKey ( apikey ); + out.println ( apikey.asJsonObject ().toString () ); + } + } + } + catch ( ConfigDbException e ) + { + out.println ( "Command failed: " + e.getMessage() ); + } + catch ( JSONException e ) + { + out.println ( "Command failed: " + e.getMessage() ); + } + } + + @Override + public void displayHelp ( PrintStream out ) + { + out.println ( "putApiKey secret <apiKey> <dbKey> <dbIv> <newSecret>" ); + out.println ( "putApiKey email <apiKey> <dbKey> <dbIv> <newEmail>" ); + out.println ( "putApiKey description <apiKey> <dbKey> <dbIv> <newDescription>" ); + } + } + + private static class writeApiKeyCommand implements Command<ConfigToolContext> + { + @Override + public String[] getMatches () + { + return new String[] + { + // <enckey> <encinit> <key> <secret> + "writeApiKey (\\S*) (\\S*) (\\S*) (\\S*)", + }; + } + + @Override + public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException + { + } + + @Override + public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException + { + try + { + final ConfigDb db = context.getDb (); + if ( parts.length == 4 ) + { + final BaseNsaApiDbImpl<NsaSimpleApiKey> apiKeyDb = + new EncryptingApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory (), + EncryptingLayer.readSecretKey ( parts[0] ), rrConvertor.base64Decode ( parts[1] ) ); + + apiKeyDb.deleteApiKey ( parts[2] ); + final NsaSimpleApiKey apikey = apiKeyDb.createApiKey ( parts[2], parts[3] ); + out.println ( apikey.asJsonObject ().toString () ); + } + } + catch ( ConfigDbException e ) + { + out.println ( "Command failed: " + e.getMessage() ); + } + catch ( JSONException e ) + { + out.println ( "Command failed: " + e.getMessage() ); + } + catch ( KeyExistsException e ) + { + out.println ( "Command failed: " + e.getMessage() ); + } + } + + @Override + public void displayHelp ( PrintStream out ) + { + out.println ( "writeApiKey <dbKey> <dbIv> <newApiKey> <newSecret>" ); + } + } + + private static class EncryptApiKeysCommand implements Command<ConfigToolContext> + { + @Override + public String[] getMatches () + { + return new String[] { "convertApiKeyDb", "convertApiKeyDb (\\S*) (\\S*)" }; + } + + @Override + public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException + { + } + + @Override + public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException + { + try + { + final String key = parts.length == 2 ? parts[0] : EncryptingLayer.createSecretKey (); + final String iv = parts.length == 2 ? parts[1] : rrConvertor.base64Encode ( uniqueStringGenerator.createValue ( 16 ) ); + + // This doesn't do well when the number of API keys is giant... + if ( parts.length == 0 ) + { + out.println ( "YOU MUST RECORD THESE VALUES AND USE THEM IN THE SERVER CONFIG" ); + out.println ( "Key: " + key ); + out.println ( " IV: " + iv ); + out.println ( "\n" ); + out.println ( "Call again with key and IV on command line." ); + out.println ( "\n" ); + return; // because otherwise the values get lost + } + + final ConfigDb db = context.getDb (); + final BaseNsaApiDbImpl<NsaSimpleApiKey> readFrom = new BaseNsaApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory () ); + final EncryptingApiDbImpl<NsaSimpleApiKey> writeTo = new EncryptingApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory (), + EncryptingLayer.readSecretKey ( key ), rrConvertor.base64Decode ( iv ) ); + + int count = 0; + for ( Entry<String, NsaSimpleApiKey> e : readFrom.loadAllKeyRecords ().entrySet () ) + { + out.println ( "-------------------------------" ); + out.println ( "Converting " + e.getKey () ); + final String was = e.getValue ().asJsonObject ().toString (); + out.println ( was ); + + writeTo.saveApiKey ( e.getValue () ); + count++; + } + + out.println ( "Conversion complete, converted " + count + " records." ); + } + catch ( ConfigDbException e ) + { + out.println ( "Command failed: " + e.getMessage() ); + } + catch ( NoSuchAlgorithmException e ) + { + out.println ( "Command failed: " + e.getMessage() ); + } + } + + @Override + public void displayHelp ( PrintStream out ) + { + out.println ( "convertApiKeyDb" ); + out.println ( "\tconvert an API key DB to an encrypted DB and output the cipher details" ); + } + } + + private static class DecryptApiKeysCommand implements Command<ConfigToolContext> + { + @Override + public String[] getMatches () + { + return new String[] { "revertApiKeyDb (\\S*) (\\S*)" }; + } + + @Override + public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException + { + } + + @Override + public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException + { + try + { + final String keyStr = parts[0]; + final String iv = parts[1]; + final byte[] ivBytes = rrConvertor.base64Decode ( iv ); + + final ConfigDb db = context.getDb (); + final EncryptingApiDbImpl<NsaSimpleApiKey> readFrom = new EncryptingApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory (), + EncryptingLayer.readSecretKey ( keyStr ), ivBytes ); + final BaseNsaApiDbImpl<NsaSimpleApiKey> writeTo = new BaseNsaApiDbImpl<NsaSimpleApiKey> ( db, new NsaSimpleApiKeyFactory () ); + + int count = 0; + for ( String apiKey : readFrom.loadAllKeys () ) + { + out.println ( "Converting " + apiKey ); + final NsaSimpleApiKey record = readFrom.loadApiKey ( apiKey ); + if ( record == null ) + { + out.println ( "Couldn't load " + apiKey ); + } + else + { + writeTo.saveApiKey ( record ); + count++; + } + } + out.println ( "Conversion complete, converted " + count + " records." ); + } + catch ( ConfigDbException e ) + { + out.println ( "Command failed: " + e.getMessage() ); + } + } + + @Override + public void displayHelp ( PrintStream out ) + { + out.println ( "revertApiKeyDb <keyBase64> <ivBase64>" ); + out.println ( "\trevert an API key DB to a deencrypted DB" ); + } + } + + private static class NodeFetchCommand implements Command<ConfigToolContext> + { + @Override + public String[] getMatches () + { + return new String[] { "node (\\S*)" }; + } + + @Override + public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException + { + } + + @Override + public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException + { + try + { + final String node = parts[0]; + + final ConfigDb db = context.getDb (); + final ConfigPath cp = db.parse ( node ); + + boolean doneOne = false; + for ( ConfigPath child : db.loadChildrenNames ( cp ) ) + { + out.println ( "\t- " + child.getName () ); + doneOne = true; + } + if ( doneOne ) + { + out.println (); + } + else + { + out.println ( "(No child nodes of '" + node + "')" ); + } + + final String val = db.load ( cp ); + if ( val == null ) + { + out.println ( "(No data at '" + node + "')" ); + } + else + { + out.println ( val ); + } + } + catch ( ConfigDbException e ) + { + out.println ( "Command failed: " + e.getMessage() ); + } + catch ( IllegalArgumentException e ) + { + out.println ( "Command failed: " + e.getMessage() ); + } + } + + @Override + public void displayHelp ( PrintStream out ) + { + out.println ( "node <nodeName>" ); + out.println ( "\tread a config db node" ); + } + } + + private static class DropOldConsumerGroupsCommand implements Command<ConfigToolContext> + { + private final long kMaxRemovals = 500; + + @Override + public String[] getMatches () + { + return new String[] { "(dropOldConsumers) (\\S*)", "(showOldConsumers) (\\S*)" }; + } + + @Override + public void checkReady ( ConfigToolContext context ) throws CommandNotReadyException + { + } + + @Override + public void execute ( String[] parts, ConfigToolContext context, PrintStream out ) throws CommandNotReadyException + { + try + { + final boolean runDrops = parts[0].equalsIgnoreCase ( "dropOldConsumers" ); + final String maxAgeInDaysStr = parts[1]; + final int maxAgeInDays = Integer.parseInt ( maxAgeInDaysStr ); + final long oldestEpochSecs = ( NsaClock.now () / 1000 ) - ( 24 * 60 * 60 * maxAgeInDays ); + + out.println ( "Dropping consumer groups older than " + new Date ( oldestEpochSecs * 1000 ) ); + + final ConfigDb db = context.getDb (); + + // kafka updates consumer partition records in ZK each time a message + // is served. we can determine which consumers are old based on a lack + // of update to the partition entries + // (see https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper) + + // kafka only works with ZK, and our configDb was constructed with a non-kafka + // root node. We have to switch it to get to the right content... + if ( ! ( db instanceof ZkConfigDb ) ) + { + throw new ConfigDbException ( "You can only show/drop old consumers against a ZK config db." ); + } + + final ZkConfigDb newZkDb = new ZkConfigDb ( context.getConnectionString (), "" ); + long cgCount = 0; + + final LinkedList<ConfigPath> removals = new LinkedList<ConfigPath> (); + for ( ConfigPath consumerGroupName : newZkDb.loadChildrenNames ( newZkDb.parse ( "/consumers" ) ) ) + { + cgCount++; + if ( cgCount % 500 == 0 ) + { + out.println ( "" + cgCount + " groups examined" ); + } + + boolean foundAnything = false; + boolean foundRecentUse = false; + long mostRecent = -1; + + // each consumer group has an "offsets" entry, which contains 0 or more topic entries. + // each topic contains partition nodes. + for ( ConfigPath topic : newZkDb.loadChildrenNames ( consumerGroupName.getChild ( "offsets" ) ) ) + { + for ( ConfigPath offset : newZkDb.loadChildrenNames ( topic ) ) + { + foundAnything = true; + + final long modTime = newZkDb.getLastModificationTime ( offset ); + mostRecent = Math.max ( mostRecent, modTime ); + + foundRecentUse = ( modTime > oldestEpochSecs ); + if ( foundRecentUse ) break; + } + if ( foundRecentUse ) break; + } + + // decide if this consumer group is old + out.println ( "Group " + consumerGroupName.getName () + " was most recently used " + new Date ( mostRecent*1000 ) ); + if ( foundAnything && !foundRecentUse ) + { + removals.add ( consumerGroupName ); + } + + if ( removals.size () >= kMaxRemovals ) + { + break; + } + } + + // removals + for ( ConfigPath consumerGroupName : removals ) + { + out.println ( "Group " + consumerGroupName.getName () + " has no recent activity." ); + if ( runDrops ) + { + out.println ( "Removing group " + consumerGroupName.getName () + "..." ); + newZkDb.clear ( consumerGroupName ); + } + } + } + catch ( ConfigDbException e ) + { + out.println ( "Command failed: " + e.getMessage() ); + } + catch ( NumberFormatException e ) + { + out.println ( "Command failed: " + e.getMessage() ); + } + catch ( JSONException e ) + { + out.println ( "Command failed: " + e.getMessage() ); + } + } + + @Override + public void displayHelp ( PrintStream out ) + { + out.println ( "showOldConsumers <minAgeInDays>" ); + out.println ( "dropOldConsumers <minAgeInDays>" ); + out.println ( "\tDrop (or just show) any consumer group that has been inactive longer than <minAgeInDays> days." ); + out.println (); + out.println ( "\tTo be safe, <minAgeInDays> should be much higher than the maximum storage time on the Kafka topics." ); + out.println ( "\tA very old consumer will potentially miss messages, but will resume at the oldest message, while a" ); + out.println ( "\tdeleted consumer will start at the current message if it ever comes back." ); + out.println (); + out.println ( "\tNote that show/drops are limited to " + kMaxRemovals + " records per invocation." ); + } + } +} diff --git a/src/main/java/com/att/nsa/dmaap/tools/ConfigToolContext.java b/src/main/java/com/att/nsa/dmaap/tools/ConfigToolContext.java new file mode 100644 index 0000000..bb44d1f --- /dev/null +++ b/src/main/java/com/att/nsa/dmaap/tools/ConfigToolContext.java @@ -0,0 +1,69 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.dmaap.tools; + +import com.att.nsa.cambria.beans.DMaaPMetricsSet; +import com.att.nsa.cmdtool.CommandContext; +import com.att.nsa.configs.ConfigDb; +import com.att.nsa.drumlin.till.nv.rrNvReadable; + +public class ConfigToolContext implements CommandContext +{ + public ConfigToolContext ( ConfigDb db, String connStr, rrNvReadable cs ) + { + fDb = db; + fConnStr = connStr; + fMetrics = new DMaaPMetricsSet( cs ); + } + + @Override + public void requestShutdown () + { + fQuit = true; + } + + @Override + public boolean shouldContinue () + { + return !fQuit; + } + + public ConfigDb getDb () + { + return fDb; + } + + public String getConnectionString () + { + return fConnStr; + } + + public DMaaPMetricsSet getMetrics () + { + return fMetrics; + } + + private final ConfigDb fDb; + private final String fConnStr; + private boolean fQuit = false; + private DMaaPMetricsSet fMetrics; +} diff --git a/src/main/java/com/att/nsa/dmaap/util/ContentLengthInterceptor.java b/src/main/java/com/att/nsa/dmaap/util/ContentLengthInterceptor.java new file mode 100644 index 0000000..fe1c768 --- /dev/null +++ b/src/main/java/com/att/nsa/dmaap/util/ContentLengthInterceptor.java @@ -0,0 +1,132 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.dmaap.util; + +import java.util.Map; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import org.apache.http.HttpStatus; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import org.json.JSONException; +import org.json.JSONObject; +import org.springframework.stereotype.Component; +import com.att.nsa.cambria.CambriaApiException; +import com.att.nsa.cambria.exception.DMaaPResponseCode; +import com.att.nsa.cambria.exception.ErrorResponse; +import ajsc.beans.interceptors.AjscInterceptor; + +/** + * AJSC Intercepter implementation of ContentLengthFilter + */ +@Component +public class ContentLengthInterceptor implements AjscInterceptor{ + + + private String defLength; + //private Logger log = Logger.getLogger(ContentLengthInterceptor.class.toString()); + private static final EELFLogger log = EELFManager.getInstance().getLogger(ContentLengthInterceptor.class); + + + /** + * Intercepter method to intercept requests before processing + */ + @Override + public boolean allowOrReject(HttpServletRequest httpservletrequest, HttpServletResponse httpservletresponse, + Map map) throws Exception { + + log.info("inside Interceptor allowOrReject content length checking before pub/sub"); + + JSONObject jsonObj = null; + int requestLength = 0; + setDefLength(System.getProperty("maxcontentlength")); + try { + // retrieving content length from message header + + if (null != httpservletrequest.getHeader("Content-Length")) { + requestLength = Integer.parseInt(httpservletrequest.getHeader("Content-Length")); + } + // retrieving encoding from message header + String transferEncoding = httpservletrequest.getHeader("Transfer-Encoding"); + // checking for no encoding, chunked and requestLength greater then + // default length + if (null != transferEncoding && !(transferEncoding.contains("chunked")) + && (requestLength > Integer.parseInt(getDefLength()))) { + jsonObj = new JSONObject().append("defaultlength", getDefLength()) + .append("requestlength", requestLength); + log.error("message length is greater than default"); + throw new CambriaApiException(jsonObj); + } + else if (null == transferEncoding && (requestLength > Integer.parseInt(getDefLength()))) + { + jsonObj = new JSONObject().append("defaultlength", getDefLength()).append( + "requestlength", requestLength); + log.error("Request message is not chunked or request length is greater than default length"); + throw new CambriaApiException(jsonObj); + + + } + else + { + //chain.doFilter(req, res); + return true; + } + + } catch (CambriaApiException | NumberFormatException | JSONException e) { + + log.info("Exception obj--"+e); + log.error("message size is greater then default"+e.getMessage()); + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_REQUEST_TOO_LONG, + DMaaPResponseCode.MSG_SIZE_EXCEEDS_MSG_LIMIT.getResponseCode(), System.getProperty("msg_size_exceeds") + + jsonObj.toString()); + log.info(errRes.toString()); + + + map.put(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,"test"); + httpservletresponse.setStatus(HttpStatus.SC_REQUEST_TOO_LONG); + httpservletresponse.getOutputStream().write(errRes.toString().getBytes()); + return false; + } + + + + } + + + /** + * Get Default Content Length + * @return defLength + */ + public String getDefLength() { + return defLength; + } + /** + * Set Default Content Length + * @param defLength + */ + public void setDefLength(String defLength) { + this.defLength = defLength; + } + + + +} diff --git a/src/main/java/com/att/nsa/dmaap/util/DMaaPAuthFilter.java b/src/main/java/com/att/nsa/dmaap/util/DMaaPAuthFilter.java new file mode 100644 index 0000000..ae79938 --- /dev/null +++ b/src/main/java/com/att/nsa/dmaap/util/DMaaPAuthFilter.java @@ -0,0 +1,164 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.dmaap.util; + +import java.io.IOException; + +import javax.servlet.FilterChain; +import javax.servlet.ServletException; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; +import javax.servlet.http.HttpServletRequest; + +//import com.att.eelf.configuration.EELFLogger; +//import com.att.eelf.configuration.EELFManager; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import com.att.cadi.filter.CadiFilter; +import javax.servlet.FilterConfig; + +/** + * This is a Servlet Filter class overriding the AjscCadiFilter + */ +@Component +public class DMaaPAuthFilter extends CadiFilter { + + // private Logger log = Logger.getLogger(DMaaPAuthFilter.class.toString()); + + // private static final EELFLogger log = + // EELFManager.getInstance().getLogger(DMaaPAuthFilter.class); + private Logger log = LoggerFactory.getLogger(DMaaPAuthFilter.class); + + final Boolean enabled = "authentication-scheme-1".equalsIgnoreCase(System.getProperty("CadiAuthN")); + + /** + * This method will disable Cadi Authentication if cambria headers are + * present in the request else continue with Cadi Authentication + */ + public void init(FilterConfig filterConfig) throws ServletException { + + try { + + super.init(filterConfig); + + } catch (Exception ex) { + log.error("Ajsc Cadi Filter Exception:" + ex.getMessage()); + + } + } + + @Override + public void doFilter(ServletRequest req, ServletResponse res, FilterChain chain) + throws IOException, ServletException { + + log.info("inside servlet filter Cambria Auth Headers checking before doing other Authentication"); + HttpServletRequest request = (HttpServletRequest) req; + + boolean forceAAF = Boolean.valueOf(System.getProperty("forceAAF")); + if (forceAAF || + null != request.getHeader("Authorization") || + (null != request.getHeader("AppName") && + request.getHeader("AppName").equalsIgnoreCase("invenio") && + null != request.getHeader("cookie"))) { + + if (!enabled || + request.getMethod().equalsIgnoreCase("head") || + request.getHeader("DME2HealthCheck") != null) { + + chain.doFilter(req, res); + + } else { + + super.doFilter(req, res, chain); + + } + } else { + + System.setProperty("CadiAuthN", "authentication-scheme-2"); + chain.doFilter(req, res); + + } + + } + + @Override + public void log(Exception e, Object... elements) { + // TODO Auto-generated method stub + // super.log(e, elements); + // System.out.println(convertArrayToString(elements)); + log.error(convertArrayToString(elements), e); + + } + + @Override + public void log(Level level, Object... elements) { + + // System.out.println(willWrite().compareTo(level) ); + if (willWrite().compareTo(level) <= 0) { + switch (level) { + case DEBUG: + log.debug(convertArrayToString(elements)); + break; + case INFO: + log.info(convertArrayToString(elements)); + break; + case ERROR: + log.error(convertArrayToString(elements)); + break; + case AUDIT: + log.info(convertArrayToString(elements)); + break; + case INIT: + log.info(convertArrayToString(elements)); + break; + case WARN: + log.warn(convertArrayToString(elements)); + break; + default: + + log.warn(convertArrayToString(elements)); + + } + + } + + } + + private String convertArrayToString(Object[] elements) { + + StringBuilder strBuilder = new StringBuilder(); + for (int i = 0; i < elements.length; i++) { + if (elements[i] instanceof String) + strBuilder.append((String) elements[i]); + else if (elements[i] instanceof Integer) + strBuilder.append((Integer) elements[i]); + else + strBuilder.append(elements[i]); + } + String newString = strBuilder.toString(); + return newString; + } + +} diff --git a/src/main/java/com/att/nsa/dmaap/util/ServicePropertiesMapBean.java b/src/main/java/com/att/nsa/dmaap/util/ServicePropertiesMapBean.java new file mode 100644 index 0000000..c5173c1 --- /dev/null +++ b/src/main/java/com/att/nsa/dmaap/util/ServicePropertiesMapBean.java @@ -0,0 +1,41 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.dmaap.util; + +import com.att.nsa.dmaap.filemonitor.ServicePropertiesMap; + +/** + * Class ServicePropertiesMapBean + * @author author + * + */ +public class ServicePropertiesMapBean { + /** + * get property + * @param propFileName propFileName + * @param propertyKey propertyKey + * @return str + */ + public static String getProperty(String propFileName, String propertyKey) { + return ServicePropertiesMap.getProperty(propFileName, propertyKey); + } +} |