From 3fc19dc9157f4d05bdbd6fd05a52f0592268c4e7 Mon Sep 17 00:00:00 2001 From: Varun Gudisena Date: Thu, 31 Aug 2017 10:52:33 -0500 Subject: Revert package name changes Reverted package name changes to avoid any potential issues. Renamed maven group id only. Issue-id: DMAAP-74 Change-Id: Ic741b602ade60f108d940c0571a1d94b7be2abc2 Signed-off-by: Varun Gudisena --- .../com/att/nsa/cambria/service/AdminService.java | 83 +++ .../att/nsa/cambria/service/ApiKeysService.java | 105 +++ .../com/att/nsa/cambria/service/EventsService.java | 75 ++ .../com/att/nsa/cambria/service/MMService.java | 68 ++ .../att/nsa/cambria/service/MetricsService.java | 54 ++ .../com/att/nsa/cambria/service/TopicService.java | 176 +++++ .../nsa/cambria/service/TransactionService.java | 61 ++ .../com/att/nsa/cambria/service/UIService.java | 91 +++ .../nsa/cambria/service/impl/AdminServiceImpl.java | 188 +++++ .../cambria/service/impl/ApiKeysServiceImpl.java | 326 +++++++++ .../service/impl/BaseTransactionDbImpl.java | 153 ++++ .../cambria/service/impl/EventsServiceImpl.java | 788 +++++++++++++++++++++ .../nsa/cambria/service/impl/MMServiceImpl.java | 605 ++++++++++++++++ .../cambria/service/impl/MetricsServiceImpl.java | 115 +++ .../nsa/cambria/service/impl/TopicServiceImpl.java | 649 +++++++++++++++++ .../service/impl/TransactionServiceImpl.java | 100 +++ .../nsa/cambria/service/impl/UIServiceImpl.java | 206 ++++++ 17 files changed, 3843 insertions(+) create mode 100644 src/main/java/com/att/nsa/cambria/service/AdminService.java create mode 100644 src/main/java/com/att/nsa/cambria/service/ApiKeysService.java create mode 100644 src/main/java/com/att/nsa/cambria/service/EventsService.java create mode 100644 src/main/java/com/att/nsa/cambria/service/MMService.java create mode 100644 src/main/java/com/att/nsa/cambria/service/MetricsService.java create mode 100644 src/main/java/com/att/nsa/cambria/service/TopicService.java create mode 100644 src/main/java/com/att/nsa/cambria/service/TransactionService.java create mode 100644 src/main/java/com/att/nsa/cambria/service/UIService.java create mode 100644 src/main/java/com/att/nsa/cambria/service/impl/AdminServiceImpl.java create mode 100644 src/main/java/com/att/nsa/cambria/service/impl/ApiKeysServiceImpl.java create mode 100644 src/main/java/com/att/nsa/cambria/service/impl/BaseTransactionDbImpl.java create mode 100644 src/main/java/com/att/nsa/cambria/service/impl/EventsServiceImpl.java create mode 100644 src/main/java/com/att/nsa/cambria/service/impl/MMServiceImpl.java create mode 100644 src/main/java/com/att/nsa/cambria/service/impl/MetricsServiceImpl.java create mode 100644 src/main/java/com/att/nsa/cambria/service/impl/TopicServiceImpl.java create mode 100644 src/main/java/com/att/nsa/cambria/service/impl/TransactionServiceImpl.java create mode 100644 src/main/java/com/att/nsa/cambria/service/impl/UIServiceImpl.java (limited to 'src/main/java/com/att/nsa/cambria/service') diff --git a/src/main/java/com/att/nsa/cambria/service/AdminService.java b/src/main/java/com/att/nsa/cambria/service/AdminService.java new file mode 100644 index 0000000..6f0d9cf --- /dev/null +++ b/src/main/java/com/att/nsa/cambria/service/AdminService.java @@ -0,0 +1,83 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.cambria.service; + +import java.io.IOException; + +import org.json.JSONException; + +import com.att.nsa.cambria.beans.DMaaPContext; +import com.att.nsa.configs.ConfigDbException; +import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException; + +/** + * @author author + * + */ +public interface AdminService { + /** + * method provide consumerCache + * + * @param dMaaPContext + * @throws IOException + */ + void showConsumerCache(DMaaPContext dMaaPContext) throws IOException,AccessDeniedException; + + /** + * method drops consumer cache + * + * @param dMaaPContext + * @throws JSONException + * @throws IOException + */ + void dropConsumerCache(DMaaPContext dMaaPContext) throws JSONException, IOException,AccessDeniedException; + + + /** + * Get list of blacklisted ips + * @param dMaaPContext context + * @throws IOException ex + * @throws AccessDeniedException ex + */ + void getBlacklist ( DMaaPContext dMaaPContext ) throws IOException, AccessDeniedException; + + /** + * Add ip to blacklist + * @param dMaaPContext context + * @param ip ip + * @throws IOException ex + * @throws ConfigDbException ex + * @throws AccessDeniedException ex + */ + void addToBlacklist ( DMaaPContext dMaaPContext, String ip ) throws IOException, ConfigDbException, AccessDeniedException; + + /** + * Remove ip from blacklist + * @param dMaaPContext context + * @param ip ip + * @throws IOException ex + * @throws ConfigDbException ex + * @throws AccessDeniedException ex + */ + void removeFromBlacklist ( DMaaPContext dMaaPContext, String ip ) throws IOException, ConfigDbException, AccessDeniedException; + +} diff --git a/src/main/java/com/att/nsa/cambria/service/ApiKeysService.java b/src/main/java/com/att/nsa/cambria/service/ApiKeysService.java new file mode 100644 index 0000000..6fc9c0d --- /dev/null +++ b/src/main/java/com/att/nsa/cambria/service/ApiKeysService.java @@ -0,0 +1,105 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.cambria.service; + +import java.io.IOException; + +import com.att.nsa.cambria.beans.ApiKeyBean; +import com.att.nsa.cambria.beans.DMaaPContext; +import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException; +import com.att.nsa.configs.ConfigDbException; +import com.att.nsa.security.db.NsaApiDb.KeyExistsException; + +/** + * Declaring all the method in interface that is mainly used for authentication + * purpose. + * + * + */ + +public interface ApiKeysService { + /** + * This method declaration for getting all ApiKey that has generated on + * server. + * + * @param dmaapContext + * @throws ConfigDbException + * @throws IOException + */ + + public void getAllApiKeys(DMaaPContext dmaapContext) + throws ConfigDbException, IOException; + + /** + * Getting information about specific ApiKey + * + * @param dmaapContext + * @param apikey + * @throws ConfigDbException + * @throws IOException + */ + + public void getApiKey(DMaaPContext dmaapContext, String apikey) + throws ConfigDbException, IOException; + + /** + * Thid method is used for create a particular ApiKey + * + * @param dmaapContext + * @param nsaApiKey + * @throws KeyExistsException + * @throws ConfigDbException + * @throws IOException + */ + + public void createApiKey(DMaaPContext dmaapContext, ApiKeyBean nsaApiKey) + throws KeyExistsException, ConfigDbException, IOException; + + /** + * This method is used for update ApiKey that is already generated on + * server. + * + * @param dmaapContext + * @param apikey + * @param nsaApiKey + * @throws ConfigDbException + * @throws IOException + * @throws AccessDeniedException + * @throws com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException + */ + public void updateApiKey(DMaaPContext dmaapContext, String apikey, + ApiKeyBean nsaApiKey) throws ConfigDbException, IOException,AccessDeniedException + ; + + /** + * This method is used for delete specific ApiKey + * + * @param dmaapContext + * @param apikey + * @throws ConfigDbException + * @throws IOException + * @throws AccessDeniedException + */ + + public void deleteApiKey(DMaaPContext dmaapContext, String apikey) + throws ConfigDbException, IOException,AccessDeniedException; +} diff --git a/src/main/java/com/att/nsa/cambria/service/EventsService.java b/src/main/java/com/att/nsa/cambria/service/EventsService.java new file mode 100644 index 0000000..477538d --- /dev/null +++ b/src/main/java/com/att/nsa/cambria/service/EventsService.java @@ -0,0 +1,75 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.cambria.service; + +import java.io.IOException; +import java.io.InputStream; + +import com.att.nsa.cambria.CambriaApiException; +import com.att.nsa.cambria.backends.ConsumerFactory.UnavailableException; +import com.att.nsa.cambria.beans.DMaaPContext; +import com.att.nsa.cambria.metabroker.Broker.TopicExistsException; +import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException; +import com.att.nsa.configs.ConfigDbException; +import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; + +/** + * + * @author author + * + */ +public interface EventsService { + /** + * + * @param ctx + * @param topic + * @param consumerGroup + * @param clientId + * @throws ConfigDbException + * @throws TopicExistsException + * @throws AccessDeniedException + * @throws UnavailableException + * @throws CambriaApiException + * @throws IOException + */ + public void getEvents(DMaaPContext ctx, String topic, String consumerGroup, String clientId) + throws ConfigDbException, TopicExistsException,UnavailableException, + CambriaApiException, IOException,AccessDeniedException; + + /** + * + * @param ctx + * @param topic + * @param msg + * @param defaultPartition + * @param requestTime + * @throws ConfigDbException + * @throws AccessDeniedException + * @throws TopicExistsException + * @throws CambriaApiException + * @throws IOException + */ + public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition, + final String requestTime) throws ConfigDbException, AccessDeniedException, TopicExistsException, + CambriaApiException, IOException,missingReqdSetting; + +} diff --git a/src/main/java/com/att/nsa/cambria/service/MMService.java b/src/main/java/com/att/nsa/cambria/service/MMService.java new file mode 100644 index 0000000..5c14674 --- /dev/null +++ b/src/main/java/com/att/nsa/cambria/service/MMService.java @@ -0,0 +1,68 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.cambria.service; + +import java.io.IOException; +import java.io.InputStream; + +import org.json.JSONException; + +import com.att.nsa.cambria.CambriaApiException; +import com.att.nsa.cambria.backends.ConsumerFactory.UnavailableException; +import com.att.nsa.cambria.beans.DMaaPContext; +import com.att.nsa.cambria.metabroker.Broker.TopicExistsException; +import com.att.nsa.configs.ConfigDbException; +import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; +import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException; + +/** + * Contains the logic for executing calls to the Mirror Maker agent tool. + * + * @author + * + * @since May 25, 2016 + */ + +public interface MMService { + + /* + * this method calls the add white list method of a Mirror Maker agent API + */ + public void addWhiteList(); + + /* + * this method calls the remove white list method of a Mirror Maker agent API + */ + public void removeWhiteList(); + + /* + * This method calls the list white list method of a Mirror Maker agent API + */ + public void listWhiteList(); + + public String subscribe(DMaaPContext ctx, String topic, String consumerGroup, String clientId) throws ConfigDbException, TopicExistsException, + AccessDeniedException, UnavailableException, CambriaApiException, IOException; + + public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition, + final String requestTime) throws ConfigDbException, AccessDeniedException, TopicExistsException, + CambriaApiException, IOException, missingReqdSetting; +} diff --git a/src/main/java/com/att/nsa/cambria/service/MetricsService.java b/src/main/java/com/att/nsa/cambria/service/MetricsService.java new file mode 100644 index 0000000..6b11682 --- /dev/null +++ b/src/main/java/com/att/nsa/cambria/service/MetricsService.java @@ -0,0 +1,54 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.cambria.service; + +/** + * @author + * + */ +import java.io.IOException; + +import com.att.nsa.cambria.CambriaApiException; +import com.att.nsa.cambria.beans.DMaaPContext; + +/** + * + * @author author + * + */ +public interface MetricsService { + /** + * + * @param ctx + * @throws IOException + */ + public void get(DMaaPContext ctx) throws IOException; + + /** + * + * @param ctx + * @param name + * @throws IOException + * @throws CambriaApiException + */ + public void getMetricByName(DMaaPContext ctx, String name) throws IOException, CambriaApiException; +} diff --git a/src/main/java/com/att/nsa/cambria/service/TopicService.java b/src/main/java/com/att/nsa/cambria/service/TopicService.java new file mode 100644 index 0000000..9ed39af --- /dev/null +++ b/src/main/java/com/att/nsa/cambria/service/TopicService.java @@ -0,0 +1,176 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.cambria.service; + +import java.io.IOException; + +import org.json.JSONException; + +import com.att.nsa.cambria.CambriaApiException; +import com.att.nsa.cambria.beans.DMaaPContext; +import com.att.nsa.cambria.beans.TopicBean; +import com.att.nsa.cambria.metabroker.Broker.TopicExistsException; +import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException; +import com.att.nsa.configs.ConfigDbException; + +/** + * interface provide all the topic related operations + * + * @author author + * + */ +public interface TopicService { + /** + * method fetch details of all the topics + * + * @param dmaapContext + * @throws JSONException + * @throws ConfigDbException + * @throws IOException + */ + void getTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException; + void getAllTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException; + + /** + * method fetch details of specific topic + * + * @param dmaapContext + * @param topicName + * @throws ConfigDbException + * @throws IOException + * @throws TopicExistsException + */ + void getTopic(DMaaPContext dmaapContext, String topicName) + throws ConfigDbException, IOException, TopicExistsException; + + /** + * method used to create the topic + * + * @param dmaapContext + * @param topicBean + * @throws CambriaApiException + * @throws TopicExistsException + * @throws IOException + * @throws AccessDeniedException + * @throws JSONException + */ + + void createTopic(DMaaPContext dmaapContext, TopicBean topicBean) + throws CambriaApiException, TopicExistsException, IOException, AccessDeniedException; + + /** + * method used to delete to topic + * + * @param dmaapContext + * @param topicName + * @throws IOException + * @throws AccessDeniedException + * @throws ConfigDbException + * @throws CambriaApiException + * @throws TopicExistsException + */ + + void deleteTopic(DMaaPContext dmaapContext, String topicName) + throws IOException, AccessDeniedException, ConfigDbException, CambriaApiException, TopicExistsException; + + /** + * method provides list of all the publishers associated with a topic + * + * @param dmaapContext + * @param topicName + * @throws IOException + * @throws ConfigDbException + * @throws TopicExistsException + */ + void getPublishersByTopicName(DMaaPContext dmaapContext, String topicName) + throws IOException, ConfigDbException, TopicExistsException; + + /** + * method provides details of all the consumer associated with a specific + * topic + * + * @param dmaapContext + * @param topicName + * @throws IOException + * @throws ConfigDbException + * @throws TopicExistsException + */ + void getConsumersByTopicName(DMaaPContext dmaapContext, String topicName) + throws IOException, ConfigDbException, TopicExistsException; + + /** + * method provides publishing right to a specific topic + * + * @param dmaapContext + * @param topicName + * @param producerId + * @throws AccessDeniedException + * @throws ConfigDbException + * @throws IOException + * @throws TopicExistsException + */ + void permitPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId) + throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,CambriaApiException; + + /** + * method denies any specific publisher from a topic + * + * @param dmaapContext + * @param topicName + * @param producerId + * @throws AccessDeniedException + * @throws ConfigDbException + * @throws IOException + * @throws TopicExistsException + */ + void denyPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId) + throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,CambriaApiException; + + /** + * method provide consuming right to a specific user on a topic + * + * @param dmaapContext + * @param topicName + * @param consumerId + * @throws AccessDeniedException + * @throws ConfigDbException + * @throws IOException + * @throws TopicExistsException + */ + void permitConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId) + throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,CambriaApiException; + + /** + * method denies a particular user's consuming right on a topic + * + * @param dmaapContext + * @param topicName + * @param consumerId + * @throws AccessDeniedException + * @throws ConfigDbException + * @throws IOException + * @throws TopicExistsException + */ + void denyConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId) + throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,CambriaApiException; + +} diff --git a/src/main/java/com/att/nsa/cambria/service/TransactionService.java b/src/main/java/com/att/nsa/cambria/service/TransactionService.java new file mode 100644 index 0000000..109b2c8 --- /dev/null +++ b/src/main/java/com/att/nsa/cambria/service/TransactionService.java @@ -0,0 +1,61 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.cambria.service; + +import java.io.IOException; + +import com.att.aft.dme2.internal.jettison.json.JSONException; +import com.att.nsa.cambria.beans.DMaaPContext; +import com.att.nsa.cambria.transaction.TransactionObj; +import com.att.nsa.configs.ConfigDbException; + +/** + * + * @author author + * + */ +public interface TransactionService { + /** + * + * @param trnObj + */ + void checkTransaction(TransactionObj trnObj); + + /** + * + * @param dmaapContext + * @throws ConfigDbException + * @throws IOException + */ + void getAllTransactionObjs(DMaaPContext dmaapContext) throws ConfigDbException, IOException; + + /** + * + * @param dmaapContext + * @param transactionId + * @throws ConfigDbException + * @throws JSONException + * @throws IOException + */ + void getTransactionObj(DMaaPContext dmaapContext, String transactionId) + throws ConfigDbException, JSONException, IOException; +} diff --git a/src/main/java/com/att/nsa/cambria/service/UIService.java b/src/main/java/com/att/nsa/cambria/service/UIService.java new file mode 100644 index 0000000..b6555fe --- /dev/null +++ b/src/main/java/com/att/nsa/cambria/service/UIService.java @@ -0,0 +1,91 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +/** + * + */ +package com.att.nsa.cambria.service; + +import java.io.IOException; + +import com.att.nsa.cambria.beans.DMaaPContext; +import com.att.nsa.configs.ConfigDbException; + +import kafka.common.TopicExistsException; + +/** + * @author author + * + */ +public interface UIService { + /** + * Returning template of hello page. + * + * @param dmaapContext + * @throws IOException + */ + void hello(DMaaPContext dmaapContext) throws IOException; + + /** + * Fetching list of all api keys and returning in a templated form for + * display + * + * @param dmaapContext + * @throws ConfigDbException + * @throws IOException + */ + void getApiKeysTable(DMaaPContext dmaapContext) throws ConfigDbException, + IOException; + + /** + * Fetching detials of apikey in a templated form for display + * + * @param dmaapContext + * @param apiKey + * @throws Exception + */ + void getApiKey(DMaaPContext dmaapContext, final String apiKey) + throws Exception; + + /** + * Fetching list of all the topics and returning in a templated form for + * display + * + * @param dmaapContext + * @throws ConfigDbException + * @throws IOException + */ + void getTopicsTable(DMaaPContext dmaapContext) throws ConfigDbException, + IOException; + + /** + * Fetching detials of topic in a templated form for display + * + * @param dmaapContext + * @param topic + * @throws ConfigDbException + * @throws IOException + * @throws TopicExistsException + */ + void getTopic(DMaaPContext dmaapContext, final String topic) + throws ConfigDbException, IOException, TopicExistsException; + +} diff --git a/src/main/java/com/att/nsa/cambria/service/impl/AdminServiceImpl.java b/src/main/java/com/att/nsa/cambria/service/impl/AdminServiceImpl.java new file mode 100644 index 0000000..2585ab5 --- /dev/null +++ b/src/main/java/com/att/nsa/cambria/service/impl/AdminServiceImpl.java @@ -0,0 +1,188 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.cambria.service.impl; + +import java.io.IOException; +import java.util.Collection; +import java.util.Set; + +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; +import org.springframework.stereotype.Component; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import com.att.nsa.cambria.backends.Consumer; +import com.att.nsa.cambria.backends.ConsumerFactory; +import com.att.nsa.cambria.beans.DMaaPContext; +import com.att.nsa.cambria.security.DMaaPAuthenticatorImpl; +import com.att.nsa.cambria.service.AdminService; +import com.att.nsa.cambria.utils.DMaaPResponseBuilder; +import com.att.nsa.configs.ConfigDbException; +import com.att.nsa.limits.Blacklist; +import com.att.nsa.security.NsaApiKey; +import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException; + +/** + * @author author + * + */ +@Component +public class AdminServiceImpl implements AdminService { + + //private Logger log = Logger.getLogger(AdminServiceImpl.class.toString()); + private static final EELFLogger log = EELFManager.getInstance().getLogger(AdminServiceImpl.class); + /** + * getConsumerCache returns consumer cache + * @param dMaaPContext context + * @throws IOException ex + * @throws AccessDeniedException + */ + @Override + public void showConsumerCache(DMaaPContext dMaaPContext) throws IOException, AccessDeniedException { + adminAuthenticate(dMaaPContext); + + JSONObject consumers = new JSONObject(); + JSONArray jsonConsumersList = new JSONArray(); + + for (Consumer consumer : getConsumerFactory(dMaaPContext).getConsumers()) { + JSONObject consumerObject = new JSONObject(); + consumerObject.put("name", consumer.getName()); + consumerObject.put("created", consumer.getCreateTimeMs()); + consumerObject.put("accessed", consumer.getLastAccessMs()); + jsonConsumersList.put(consumerObject); + } + + consumers.put("consumers", jsonConsumersList); + log.info("========== AdminServiceImpl: getConsumerCache: " + jsonConsumersList.toString() + "==========="); + DMaaPResponseBuilder.respondOk(dMaaPContext, consumers); + } + + /** + * + * dropConsumerCache() method clears consumer cache + * @param dMaaPContext context + * @throws JSONException ex + * @throws IOException ex + * @throws AccessDeniedException + * + */ + @Override + public void dropConsumerCache(DMaaPContext dMaaPContext) throws JSONException, IOException, AccessDeniedException { + adminAuthenticate(dMaaPContext); + getConsumerFactory(dMaaPContext).dropCache(); + DMaaPResponseBuilder.respondOkWithHtml(dMaaPContext, "Consumer cache cleared successfully"); + // log.info("========== AdminServiceImpl: dropConsumerCache: Consumer + // Cache successfully dropped.==========="); + } + + /** + * getfConsumerFactory returns CosnumerFactory details + * @param dMaaPContext contxt + * @return ConsumerFactory obj + * + */ + private ConsumerFactory getConsumerFactory(DMaaPContext dMaaPContext) { + return dMaaPContext.getConfigReader().getfConsumerFactory(); + } + + /** + * return ipblacklist + * @param dMaaPContext context + * @return blacklist obj + */ + private static Blacklist getIpBlacklist(DMaaPContext dMaaPContext) { + return dMaaPContext.getConfigReader().getfIpBlackList(); + } + + + /** + * Get list of blacklisted ips + */ + @Override + public void getBlacklist ( DMaaPContext dMaaPContext ) throws IOException, AccessDeniedException + { + adminAuthenticate ( dMaaPContext ); + + DMaaPResponseBuilder.respondOk ( dMaaPContext, + new JSONObject().put ( "blacklist", setToJsonArray ( getIpBlacklist (dMaaPContext).asSet() ) ) ); + } + + /** + * Add ip to blacklist + */ + @Override + public void addToBlacklist ( DMaaPContext dMaaPContext, String ip ) throws IOException, ConfigDbException, AccessDeniedException + { + adminAuthenticate ( dMaaPContext ); + + getIpBlacklist (dMaaPContext).add ( ip ); + DMaaPResponseBuilder.respondOkNoContent ( dMaaPContext ); + } + + /** + * Remove ip from blacklist + */ + @Override + public void removeFromBlacklist ( DMaaPContext dMaaPContext, String ip ) throws IOException, ConfigDbException, AccessDeniedException + { + adminAuthenticate ( dMaaPContext ); + + getIpBlacklist (dMaaPContext).remove ( ip ); + DMaaPResponseBuilder.respondOkNoContent ( dMaaPContext ); + } + + /** + * Authenticate if user is admin + * @param dMaaPContext context + * @throws AccessDeniedException ex + */ + private static void adminAuthenticate ( DMaaPContext dMaaPContext ) throws AccessDeniedException + { + + final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dMaaPContext); + if ( user == null || !user.getKey ().equals ( "admin" ) ) + { + throw new AccessDeniedException (); + } + } + + public static JSONArray setToJsonArray ( Set fields ) + { + return collectionToJsonArray ( fields ); + } + + public static JSONArray collectionToJsonArray ( Collection fields ) + { + final JSONArray a = new JSONArray (); + if ( fields != null ) + { + for ( Object o : fields ) + { + a.put ( o ); + } + } + return a; + } + +} diff --git a/src/main/java/com/att/nsa/cambria/service/impl/ApiKeysServiceImpl.java b/src/main/java/com/att/nsa/cambria/service/impl/ApiKeysServiceImpl.java new file mode 100644 index 0000000..637d2fb --- /dev/null +++ b/src/main/java/com/att/nsa/cambria/service/impl/ApiKeysServiceImpl.java @@ -0,0 +1,326 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.cambria.service.impl; + +import java.io.IOException; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import org.json.JSONArray; +import org.json.JSONObject; +import org.springframework.stereotype.Service; + +//import com.att.nsa.apiServer.util.Emailer; +import com.att.nsa.cambria.utils.Emailer; +import com.att.nsa.cambria.beans.ApiKeyBean; +import com.att.nsa.cambria.beans.DMaaPContext; +import com.att.nsa.cambria.constants.CambriaConstants; +import com.att.nsa.cambria.security.DMaaPAuthenticatorImpl; +import com.att.nsa.cambria.service.ApiKeysService; +import com.att.nsa.cambria.utils.ConfigurationReader; +import com.att.nsa.cambria.utils.DMaaPResponseBuilder; +import com.att.nsa.configs.ConfigDbException; +import com.att.nsa.drumlin.service.standards.HttpStatusCodes; +import com.att.nsa.security.NsaApiKey; +import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException; +import com.att.nsa.security.db.NsaApiDb; +import com.att.nsa.security.db.NsaApiDb.KeyExistsException; +import com.att.nsa.security.db.simple.NsaSimpleApiKey; + +/** + * Implementation of the ApiKeysService, this will provide the below operations, + * getAllApiKeys, getApiKey, createApiKey, updateApiKey, deleteApiKey + * + * @author author + */ +@Service +public class ApiKeysServiceImpl implements ApiKeysService { + + //private Logger log = Logger.getLogger(ApiKeysServiceImpl.class.toString()); + private static final EELFLogger log = EELFManager.getInstance().getLogger(ApiKeysServiceImpl.class.toString()); + /** + * This method will provide all the ApiKeys present in kafka server. + * + * @param dmaapContext + * @throws ConfigDbException + * @throws IOException + */ + public void getAllApiKeys(DMaaPContext dmaapContext) + throws ConfigDbException, IOException { + + ConfigurationReader configReader = dmaapContext.getConfigReader(); + + log.info("configReader : " + configReader.toString()); + + final JSONObject result = new JSONObject(); + final JSONArray keys = new JSONArray(); + result.put("apiKeys", keys); + + NsaApiDb apiDb = configReader.getfApiKeyDb(); + + for (String key : apiDb.loadAllKeys()) { + keys.put(key); + } + log.info("========== ApiKeysServiceImpl: getAllApiKeys: Api Keys are : " + + keys.toString() + "==========="); + DMaaPResponseBuilder.respondOk(dmaapContext, result); + } + + /** + * @param dmaapContext + * @param apikey + * @throws ConfigDbException + * @throws IOException + */ + @Override + public void getApiKey(DMaaPContext dmaapContext, String apikey) + throws ConfigDbException, IOException { + + String errorMsg = "Api key name is not mentioned."; + int errorCode = HttpStatusCodes.k400_badRequest; + + if (null != apikey) { + NsaSimpleApiKey simpleApiKey = getApiKeyDb(dmaapContext) + .loadApiKey(apikey); + + + if (null != simpleApiKey) { + JSONObject result = simpleApiKey.asJsonObject(); + DMaaPResponseBuilder.respondOk(dmaapContext, result); + log.info("========== ApiKeysServiceImpl: getApiKey : " + + result.toString() + "==========="); + return; + } else { + errorMsg = "Api key [" + apikey + "] does not exist."; + errorCode = HttpStatusCodes.k404_notFound; + log.info("========== ApiKeysServiceImpl: getApiKey: Error : API Key does not exist. " + + "==========="); + DMaaPResponseBuilder.respondWithError(dmaapContext, errorCode, + errorMsg); + throw new IOException(); + } + } + + } + + /** + * @param dmaapContext + * @param nsaApiKey + * @throws KeyExistsException + * @throws ConfigDbException + * @throws IOException + */ + @Override + public void createApiKey(DMaaPContext dmaapContext, ApiKeyBean nsaApiKey) + throws KeyExistsException, ConfigDbException, IOException { + + log.debug("TopicService: : createApiKey...."); + + + String contactEmail = nsaApiKey.getEmail(); + final boolean emailProvided = contactEmail != null && contactEmail.length() > 0 && contactEmail.indexOf("@") > 1 ; + String kSetting_AllowAnonymousKeys= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"apiKeys.allowAnonymous"); + if(null==kSetting_AllowAnonymousKeys) kSetting_AllowAnonymousKeys ="false"; + + // if ((contactEmail == null) || (contactEmail.length() == 0)) + if ( kSetting_AllowAnonymousKeys.equalsIgnoreCase("true") && !emailProvided ) + { + DMaaPResponseBuilder.respondWithErrorInJson(dmaapContext, 400, "You must provide an email address."); + return; + } + + + + + final NsaApiDb apiKeyDb = getApiKeyDb(dmaapContext); + String apiKey = nsaApiKey.getKey(); + String sharedSecret = nsaApiKey.getSharedSecret(); + final NsaSimpleApiKey key = apiKeyDb.createApiKey(apiKey, + sharedSecret); + + if (null != key) { + + if (null != nsaApiKey.getEmail()) { + key.setContactEmail(nsaApiKey.getEmail()); + } + + if (null != nsaApiKey.getDescription()) { + key.setDescription(nsaApiKey.getDescription()); + } + + log.debug("=======ApiKeysServiceImpl: createApiKey : saving api key : " + + key.toString() + "====="); + apiKeyDb.saveApiKey(key); + // email out the secret to validate the email address + if ( emailProvided ) + { + String body = "\n" + "Your email address was provided as the creator of new API key \"" + + apiKey + "\".\n" + "\n" + "If you did not make this request, please let us know." + + " See http://sa2020.it.att.com:8888 for contact information, " + "but don't worry -" + + " the API key is useless without the information below, which has been provided " + + "only to you.\n" + "\n\n" + "For API key \"" + apiKey + "\", use API key secret:\n\n\t" + + sharedSecret + "\n\n" + "Note that it's normal to share the API key" + + " (" + apiKey + "). " + + "This is how you are granted access to resources " + "like a UEB topic or Flatiron scope. " + + "However, you should NOT share the API key's secret. " + "The API key is associated with your" + + " email alone. ALL access to data made with this " + "key will be your responsibility. If you " + + "share the secret, someone else can use the API key " + "to access proprietary data with your " + + "identity.\n" + "\n" + "Enjoy!\n" + "\n" + "The GFP/SA-2020 Team"; + + Emailer em = dmaapContext.getConfigReader().getSystemEmailer(); + em.send(contactEmail, "New API Key", body); + } + log.debug("TopicService: : sending response."); + + JSONObject o = key.asJsonObject(); + + o.put ( NsaSimpleApiKey.kApiSecretField, + emailProvided ? + "Emailed to " + contactEmail + "." : + key.getSecret () + ); + DMaaPResponseBuilder.respondOk(dmaapContext, + o); + /*o.put("secret", "Emailed to " + contactEmail + "."); + DMaaPResponseBuilder.respondOk(dmaapContext, + o); */ + return; + } else { + log.debug("=======ApiKeysServiceImpl: createApiKey : Error in creating API Key.====="); + DMaaPResponseBuilder.respondWithError(dmaapContext, + HttpStatusCodes.k500_internalServerError, + "Failed to create api key."); + throw new KeyExistsException(apiKey); + } + } + + /** + * @param dmaapContext + * @param apikey + * @param nsaApiKey + * @throws ConfigDbException + * @throws IOException + * @throws AccessDeniedException + */ + @Override + public void updateApiKey(DMaaPContext dmaapContext, String apikey, + ApiKeyBean nsaApiKey) throws ConfigDbException, IOException, AccessDeniedException { + + String errorMsg = "Api key name is not mentioned."; + int errorCode = HttpStatusCodes.k400_badRequest; + + if (null != apikey) { + final NsaApiDb apiKeyDb = getApiKeyDb(dmaapContext); + final NsaSimpleApiKey key = apiKeyDb.loadApiKey(apikey); + boolean shouldUpdate = false; + + if (null != key) { + final NsaApiKey user = DMaaPAuthenticatorImpl + .getAuthenticatedUser(dmaapContext); + + if (user == null || !user.getKey().equals(key.getKey())) { + throw new AccessDeniedException("You must authenticate with the key you'd like to update."); + } + + if (null != nsaApiKey.getEmail()) { + key.setContactEmail(nsaApiKey.getEmail()); + shouldUpdate = true; + } + + if (null != nsaApiKey.getDescription()) { + key.setDescription(nsaApiKey.getDescription()); + shouldUpdate = true; + } + + if (shouldUpdate) { + apiKeyDb.saveApiKey(key); + } + + log.info("======ApiKeysServiceImpl : updateApiKey : Key Updated Successfully :" + + key.toString() + "========="); + DMaaPResponseBuilder.respondOk(dmaapContext, + key.asJsonObject()); + return; + } + } else { + errorMsg = "Api key [" + apikey + "] does not exist."; + errorCode = HttpStatusCodes.k404_notFound; + DMaaPResponseBuilder.respondWithError(dmaapContext, errorCode, + errorMsg); + log.info("======ApiKeysServiceImpl : updateApiKey : Error in Updating Key.============"); + throw new IOException(); + } + } + + /** + * @param dmaapContext + * @param apikey + * @throws ConfigDbException + * @throws IOException + * @throws AccessDeniedException + */ + @Override + public void deleteApiKey(DMaaPContext dmaapContext, String apikey) + throws ConfigDbException, IOException, AccessDeniedException { + + String errorMsg = "Api key name is not mentioned."; + int errorCode = HttpStatusCodes.k400_badRequest; + + if (null != apikey) { + final NsaApiDb apiKeyDb = getApiKeyDb(dmaapContext); + final NsaSimpleApiKey key = apiKeyDb.loadApiKey(apikey); + + if (null != key) { + + final NsaApiKey user = DMaaPAuthenticatorImpl + .getAuthenticatedUser(dmaapContext); + if (user == null || !user.getKey().equals(key.getKey())) { + throw new AccessDeniedException("You don't own the API key."); + } + + apiKeyDb.deleteApiKey(key); + log.info("======ApiKeysServiceImpl : deleteApiKey : Deleted Key successfully.============"); + DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, + "Api key [" + apikey + "] deleted successfully."); + return; + } + } else { + errorMsg = "Api key [" + apikey + "] does not exist."; + errorCode = HttpStatusCodes.k404_notFound; + DMaaPResponseBuilder.respondWithError(dmaapContext, errorCode, + errorMsg); + log.info("======ApiKeysServiceImpl : deleteApiKey : Error while deleting key.============"); + throw new IOException(); + } + } + + /** + * + * @param dmaapContext + * @return + */ + private NsaApiDb getApiKeyDb(DMaaPContext dmaapContext) { + ConfigurationReader configReader = dmaapContext.getConfigReader(); + return configReader.getfApiKeyDb(); + } + +} diff --git a/src/main/java/com/att/nsa/cambria/service/impl/BaseTransactionDbImpl.java b/src/main/java/com/att/nsa/cambria/service/impl/BaseTransactionDbImpl.java new file mode 100644 index 0000000..cdbf57b --- /dev/null +++ b/src/main/java/com/att/nsa/cambria/service/impl/BaseTransactionDbImpl.java @@ -0,0 +1,153 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.cambria.service.impl; + +import java.util.Set; +import java.util.TreeSet; + +import com.att.nsa.cambria.transaction.DMaaPTransactionFactory; +import com.att.nsa.cambria.transaction.DMaaPTransactionObj; +import com.att.nsa.cambria.transaction.DMaaPTransactionObjDB; +import com.att.nsa.cambria.transaction.TransactionObj; +import com.att.nsa.configs.ConfigDb; +import com.att.nsa.configs.ConfigDbException; +import com.att.nsa.configs.ConfigPath; + +/** + * Persistent storage for Transaction objects built over an abstract config db. + * + * @author author + * + * @param + */ +public class BaseTransactionDbImpl implements DMaaPTransactionObjDB { + + private final ConfigDb fDb; + private final ConfigPath fBasePath; + private final DMaaPTransactionFactory fKeyFactory; + + private static final String kStdRootPath = "/transaction"; + + private ConfigPath makePath(String transactionId) { + return fBasePath.getChild(transactionId); + } + + /** + * Construct an Transaction db over the given config db at the standard + * location + * + * @param db + * @param keyFactory + * @throws ConfigDbException + */ + public BaseTransactionDbImpl(ConfigDb db, DMaaPTransactionFactory keyFactory) throws ConfigDbException { + this(db, kStdRootPath, keyFactory); + } + + /** + * Construct an Transaction db over the given config db using the given root + * location + * + * @param db + * @param rootPath + * @param keyFactory + * @throws ConfigDbException + */ + public BaseTransactionDbImpl(ConfigDb db, String rootPath, DMaaPTransactionFactory keyFactory) + throws ConfigDbException { + fDb = db; + fBasePath = db.parse(rootPath); + fKeyFactory = keyFactory; + + if (!db.exists(fBasePath)) { + db.store(fBasePath, ""); + } + } + + /** + * Create a new Transaction Obj. If one exists, + * + * @param id + * @return the new Transaction record + * @throws ConfigDbException + */ + public synchronized K createTransactionObj(String id) throws KeyExistsException, ConfigDbException { + final ConfigPath path = makePath(id); + if (fDb.exists(path)) { + throw new KeyExistsException(id); + } + + // make one, store it, return it + final K newKey = fKeyFactory.makeNewTransactionId(id); + fDb.store(path, newKey.serialize()); + return newKey; + } + + /** + * Save an Transaction record. This must be used after changing auxiliary + * data on the record. Note that the transaction object must exist (via + * createTransactionObj). + * + * @param transaction + * object + * @throws ConfigDbException + */ + @Override + public synchronized void saveTransactionObj(K trnObj) throws ConfigDbException { + final ConfigPath path = makePath(trnObj.getId()); + if (!fDb.exists(path) || !(trnObj instanceof TransactionObj)) { + throw new IllegalStateException(trnObj.getId() + " is not known to this database"); + } + fDb.store(path, ((TransactionObj) trnObj).serialize()); + } + + /** + * Load an Transaction record based on the Transaction Id value + * + * @param transactionId + * @return an Transaction Object record or null + * @throws ConfigDbException + */ + @Override + public synchronized K loadTransactionObj(String transactionId) throws ConfigDbException { + final String data = fDb.load(makePath(transactionId)); + if (data != null) { + return fKeyFactory.makeNewTransactionObj(data); + } + return null; + } + + /** + * Load all transactions known to this database. (This could be expensive.) + * + * @return a set of all Transaction objects + * @throws ConfigDbException + */ + public synchronized Set loadAllTransactionObjs() throws ConfigDbException { + final TreeSet result = new TreeSet(); + for (ConfigPath cp : fDb.loadChildrenNames(fBasePath)) { + result.add(cp.getName()); + } + return result; + } + +} diff --git a/src/main/java/com/att/nsa/cambria/service/impl/EventsServiceImpl.java b/src/main/java/com/att/nsa/cambria/service/impl/EventsServiceImpl.java new file mode 100644 index 0000000..3386f19 --- /dev/null +++ b/src/main/java/com/att/nsa/cambria/service/impl/EventsServiceImpl.java @@ -0,0 +1,788 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.cambria.service.impl; + +import java.io.IOException; +import java.io.InputStream; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.LinkedList; + +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.core.MediaType; + +import org.apache.http.HttpStatus; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import org.json.JSONObject; +import org.json.JSONTokener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import com.att.ajsc.filemonitor.AJSCPropertiesMap; +import com.att.nsa.cambria.CambriaApiException; +import com.att.nsa.cambria.backends.Consumer; +import com.att.nsa.cambria.backends.ConsumerFactory; +import com.att.nsa.cambria.backends.ConsumerFactory.UnavailableException; +import com.att.nsa.cambria.backends.MetricsSet; +import com.att.nsa.cambria.backends.Publisher; +import com.att.nsa.cambria.backends.Publisher.message; +import com.att.nsa.cambria.beans.DMaaPCambriaLimiter; +import com.att.nsa.cambria.beans.DMaaPContext; +import com.att.nsa.cambria.beans.LogDetails; +import com.att.nsa.cambria.constants.CambriaConstants; +import com.att.nsa.cambria.exception.DMaaPAccessDeniedException; +import com.att.nsa.cambria.exception.DMaaPErrorMessages; +import com.att.nsa.cambria.exception.DMaaPResponseCode; +import com.att.nsa.cambria.exception.ErrorResponse; +import com.att.nsa.cambria.metabroker.Broker.TopicExistsException; +import com.att.nsa.cambria.metabroker.Topic; +import com.att.nsa.cambria.resources.CambriaEventSet; +import com.att.nsa.cambria.resources.CambriaOutboundEventStream; +import com.att.nsa.cambria.security.DMaaPAAFAuthenticator; +import com.att.nsa.cambria.security.DMaaPAAFAuthenticatorImpl; +import com.att.nsa.cambria.security.DMaaPAuthenticatorImpl; +import com.att.nsa.cambria.service.EventsService; +import com.att.nsa.cambria.utils.DMaaPResponseBuilder; +import com.att.nsa.cambria.utils.Utils; +import com.att.nsa.configs.ConfigDbException; +import com.att.nsa.drumlin.service.standards.MimeTypes; +import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; +import com.att.nsa.security.NsaApiKey; +import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException; +import com.att.nsa.util.rrConvertor; + +import kafka.producer.KeyedMessage; + +/** + * This class provides the functinality to publish and subscribe message to + * kafka + * + * @author author + * + */ +@Service +public class EventsServiceImpl implements EventsService { + //private static final Logger LOG = Logger.getLogger(EventsServiceImpl.class); + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventsServiceImpl.class); + + private static final String BATCH_LENGTH = "event.batch.length"; + private static final String TRANSFER_ENCODING = "Transfer-Encoding"; + @Autowired + private DMaaPErrorMessages errorMessages; + + //@Value("${metrics.send.cambria.topic}") + //private String metricsTopic; + + /** + * @param ctx + * @param topic + * @param consumerGroup + * @param clientId + * @throws ConfigDbException, + * TopicExistsException, AccessDeniedException, + * UnavailableException, CambriaApiException, IOException + * + * + */ + @Override + public void getEvents(DMaaPContext ctx, String topic, String consumerGroup, String clientId) + throws ConfigDbException, TopicExistsException, AccessDeniedException, UnavailableException, + CambriaApiException, IOException,DMaaPAccessDeniedException { + final long startTime = System.currentTimeMillis(); + final HttpServletRequest req = ctx.getRequest(); + + boolean isAAFTopic=false; + // was this host blacklisted? + final String remoteAddr = Utils.getRemoteAddress(ctx);; + if ( ctx.getConfigReader().getfIpBlackList().contains ( remoteAddr ) ) + { + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, + DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), "Source address [" + remoteAddr + + "] is blacklisted. Please contact the cluster management team." + ,null,Utils.getFormattedDate(new Date()),topic, + Utils.getUserApiKey(ctx.getRequest()),ctx.getRequest().getRemoteHost(), + null,null); + LOG.info(errRes.toString()); + throw new CambriaApiException(errRes); + } + + + int limit = CambriaConstants.kNoLimit; + if (req.getParameter("limit") != null) { + limit = Integer.parseInt(req.getParameter("limit")); + } + + int timeoutMs= CambriaConstants.kNoTimeout; + String strtimeoutMS = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"timeout"); + if(strtimeoutMS!=null)timeoutMs=Integer.parseInt(strtimeoutMS); + //int timeoutMs = ctx.getConfigReader().getSettings().getInt("timeout", CambriaConstants.kNoTimeout); + if (req.getParameter("timeout") != null) { + timeoutMs = Integer.parseInt(req.getParameter("timeout")); + } + + // By default no filter is applied if filter is not passed as a + // parameter in the request URI + String topicFilter = CambriaConstants.kNoFilter; + if (null != req.getParameter("filter")) { + topicFilter = req.getParameter("filter"); + } + // pretty to print the messaages in new line + String prettyval="0"; + String strPretty=AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"pretty"); + if (null!=strPretty)prettyval=strPretty; + + String metaval="0"; + String strmeta=AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"meta"); + if (null!=strmeta)metaval=strmeta; + + final boolean pretty = rrConvertor + .convertToBooleanBroad(prettyval); + // withMeta to print offset along with message + final boolean withMeta = rrConvertor + .convertToBooleanBroad(metaval); + + + /*final boolean pretty = rrConvertor + .convertToBooleanBroad(ctx.getConfigReader().getSettings().getString("pretty", "0")); + // withMeta to print offset along with message + final boolean withMeta = rrConvertor + .convertToBooleanBroad(ctx.getConfigReader().getSettings().getString("meta", "0")); +*/ + final LogWrap logger = new LogWrap ( topic, consumerGroup, clientId); + logger.info("fetch: timeout=" + timeoutMs + ", limit=" + limit + ", filter=" + topicFilter); + + // is this user allowed to read this topic? + final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx); + final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic); + + if (metatopic == null) { + // no such topic. + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, + DMaaPResponseCode.RESOURCE_NOT_FOUND.getResponseCode(), + errorMessages.getTopicNotExist()+"-[" + topic + "]",null,Utils.getFormattedDate(new Date()),topic,null,null, + clientId,ctx.getRequest().getRemoteHost()); + LOG.info(errRes.toString()); + throw new CambriaApiException(errRes); + } + String metricTopicname= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"metrics.send.cambria.topic"); + if (null==metricTopicname) + metricTopicname="msgrtr.apinode.metrics.dmaap"; + + if(null==ctx.getRequest().getHeader("Authorization")&& !topic.equalsIgnoreCase(metricTopicname)) + { + if (null != metatopic.getOwner() && !("".equals(metatopic.getOwner()))){ + // check permissions + metatopic.checkUserRead(user); + } + } + // if headers are not provided then user will be null + if(user == null && null!=ctx.getRequest().getHeader("Authorization")) + { + // the topic name will be sent by the client +// String permission = "com.att.dmaap.mr.topic"+"|"+topic+"|"+"sub"; + DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); + String permission = aaf.aafPermissionString(topic, "sub"); + if(!aaf.aafAuthentication(ctx.getRequest(), permission)) + { + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, + DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), + errorMessages.getNotPermitted1()+" read "+errorMessages.getNotPermitted2()+topic,null,Utils.getFormattedDate(new Date()),topic,null,null, + clientId,ctx.getRequest().getRemoteHost()); + LOG.info(errRes.toString()); + throw new DMaaPAccessDeniedException(errRes); + + } + isAAFTopic = true; + } + Consumer c = null; + try { + final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics(); + + final DMaaPCambriaLimiter rl = ctx.getConfigReader().getfRateLimiter(); + rl.onCall(topic, consumerGroup, clientId); + + c = ctx.getConfigReader().getfConsumerFactory().getConsumerFor(topic, consumerGroup, clientId, timeoutMs); + + /* final CambriaOutboundEventStream coes = new CambriaOutboundEventStream.Builder(c, + ctx.getConfigReader().getSettings()).timeout(timeoutMs).limit(limit).filter(topicFilter) + .pretty(pretty).withMeta(withMeta) + // .atOffset(topicOffset) + .build();*/ + final CambriaOutboundEventStream coes = new CambriaOutboundEventStream.Builder(c).timeout(timeoutMs).limit(limit).filter(topicFilter) + .pretty(pretty).withMeta(withMeta).build(); + coes.setDmaapContext(ctx); + coes.setTopic(metatopic); + if( isTransEnabled() || isAAFTopic ){ + coes.setTransEnabled(true); + }else{ + coes.setTransEnabled(false); + } + coes.setTopicStyle(isAAFTopic); + + DMaaPResponseBuilder.setNoCacheHeadings(ctx); + + DMaaPResponseBuilder.respondOkWithStream(ctx, MediaType.APPLICATION_JSON, coes); + + // No IOException thrown during respondOkWithStream, so commit the + // new offsets to all the brokers + c.commitOffsets(); + final int sent = coes.getSentCount(); + + metricsSet.consumeTick(sent); + rl.onSend(topic, consumerGroup, clientId, sent); + + final long elapsedMs = System.currentTimeMillis() - startTime; + logger.info("Sent " + sent + " msgs in " + elapsedMs + " ms; committed to offset " + c.getOffset()); + + } catch (UnavailableException excp) { + logger.warn(excp.getMessage(), excp); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE, + DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(), + errorMessages.getServerUnav()+ excp.getMessage(),null,Utils.getFormattedDate(new Date()),topic,null,null, + clientId,ctx.getRequest().getRemoteHost()); + LOG.info(errRes.toString()); + throw new CambriaApiException(errRes); + + } catch (CambriaApiException excp) { + logger.warn(excp.getMessage(), excp); + throw excp; + } catch (Exception excp) { + logger.warn("Couldn't respond to client, closing cambria consumer", excp); + ctx.getConfigReader().getfConsumerFactory().destroyConsumer(topic, consumerGroup, clientId); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE, + DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(), + "Couldn't respond to client, closing cambria consumer"+ excp.getMessage(),null,Utils.getFormattedDate(new Date()),topic,null,null, + clientId,ctx.getRequest().getRemoteHost()); + LOG.info(errRes.toString()); + throw new CambriaApiException(errRes); + } finally { + // If no cache, close the consumer now that we're done with it. + boolean kSetting_EnableCache = ConsumerFactory.kDefault_IsCacheEnabled; + String strkSetting_EnableCache=AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,ConsumerFactory.kSetting_EnableCache); + if(null!=strkSetting_EnableCache) kSetting_EnableCache=Boolean.parseBoolean(strkSetting_EnableCache); + //if (!ctx.getConfigReader().getSettings().getBoolean(ConsumerFactory.kSetting_EnableCache, ConsumerFactory.kDefault_IsCacheEnabled) && (c != null)) { + if (!kSetting_EnableCache && (c != null)) { + c.close(); + + } + } + } + + /** + * @throws missingReqdSetting + * + */ + @Override + public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition, + final String requestTime) throws ConfigDbException, AccessDeniedException, TopicExistsException, + CambriaApiException, IOException, missingReqdSetting,DMaaPAccessDeniedException { + + // is this user allowed to write to this topic? + final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx); + final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic); + boolean isAAFTopic=false; + + // was this host blacklisted? + final String remoteAddr = Utils.getRemoteAddress(ctx); + + if ( ctx.getConfigReader().getfIpBlackList().contains ( remoteAddr ) ) + { + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, + DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), "Source address [" + remoteAddr + + "] is blacklisted. Please contact the cluster management team." + ,null,Utils.getFormattedDate(new Date()),topic, + Utils.getUserApiKey(ctx.getRequest()),ctx.getRequest().getRemoteHost(), + null,null); + LOG.info(errRes.toString()); + throw new CambriaApiException(errRes); + } + + String topicNameStd = null; + + // topicNameStd= ctx.getConfigReader().getSettings().getString("enforced.topic.name.AAF"); + topicNameStd= com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,"enforced.topic.name.AAF"); + String metricTopicname= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"metrics.send.cambria.topic"); + if (null==metricTopicname) + metricTopicname="msgrtr.apinode.metrics.dmaap"; + boolean topicNameEnforced=false; + if (null != topicNameStd && topic.startsWith(topicNameStd) ) + { + topicNameEnforced = true; + } + + //Here check if the user has rights to publish on the topic + //( This will be called when no auth is added or when UEB API Key Authentication is used) + //checkUserWrite(user) method will throw an error when there is no Auth header added or when the + //user has no publish rights + + if(null != metatopic && null != metatopic.getOwner() && !("".equals(metatopic.getOwner())) && null==ctx.getRequest().getHeader("Authorization") && !topic.equalsIgnoreCase(metricTopicname)) + { + metatopic.checkUserWrite(user); + } + + + + // if headers are not provided then user will be null + if(topicNameEnforced || (user == null && null!=ctx.getRequest().getHeader("Authorization") && !topic.equalsIgnoreCase(metricTopicname))) + { + // the topic name will be sent by the client + // String permission = "com.att.dmaap.mr.topic"+"|"+topic+"|"+"pub"; + DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); + String permission = aaf.aafPermissionString(topic, "pub"); + if(!aaf.aafAuthentication(ctx.getRequest(), permission)) + { + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, + DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), + errorMessages.getNotPermitted1()+" publish "+errorMessages.getNotPermitted2()+topic,null,Utils.getFormattedDate(new Date()),topic, + Utils.getUserApiKey(ctx.getRequest()),ctx.getRequest().getRemoteHost(), + null,null); + LOG.info(errRes.toString()); + throw new DMaaPAccessDeniedException(errRes); + } + isAAFTopic=true; + } + + final HttpServletRequest req = ctx.getRequest(); + + // check for chunked input + boolean chunked = false; + if (null != req.getHeader(TRANSFER_ENCODING)) { + chunked = req.getHeader(TRANSFER_ENCODING).contains("chunked"); + } + // get the media type, or set it to a generic value if it wasn't + // provided + String mediaType = req.getContentType(); + if (mediaType == null || mediaType.length() == 0) { + mediaType = MimeTypes.kAppGenericBinary; + } + + if (mediaType.contains("charset=UTF-8")) { + mediaType = mediaType.replace("; charset=UTF-8", "").trim(); + } + + String istransidUEBtopicreqd = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"transidUEBtopicreqd"); + boolean istransidreqd=false; + if (null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true")){ + istransidreqd = true; + } + + if (isAAFTopic || istransidreqd ) { + pushEventsWithTransaction(ctx, msg, topic, defaultPartition, requestTime, chunked, mediaType); + } + else + { + pushEvents(ctx, topic, msg, defaultPartition, chunked, mediaType); + } + + + } + + /** + * + * @param ctx + * @param topic + * @param msg + * @param defaultPartition + * @param chunked + * @param mediaType + * @throws ConfigDbException + * @throws AccessDeniedException + * @throws TopicExistsException + * @throws CambriaApiException + * @throws IOException + */ + private void pushEvents(DMaaPContext ctx, String topic, InputStream msg, String defaultPartition, + boolean chunked, String mediaType) throws ConfigDbException, AccessDeniedException, TopicExistsException, + CambriaApiException, IOException { + final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics(); + + // setup the event set + final CambriaEventSet events = new CambriaEventSet(mediaType, msg, chunked, defaultPartition); + + // start processing, building a batch to push to the backend + final long startMs = System.currentTimeMillis(); + long count = 0; + + long maxEventBatch=1024 * 16; + String batchlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,BATCH_LENGTH); + if(null!=batchlen)maxEventBatch=Long.parseLong(batchlen); + + // long maxEventBatch = ctx.getConfigReader().getSettings().getLong(BATCH_LENGTH, 1024 * 16); + final LinkedList batch = new LinkedList(); + final ArrayList> kms = new ArrayList>(); + + try { + // for each message... + Publisher.message m = null; + while ((m = events.next()) != null) { + // add the message to the batch + batch.add(m); + final KeyedMessage data = new KeyedMessage(topic, m.getKey(), + m.getMessage()); + kms.add(data); + // check if the batch is full + final int sizeNow = batch.size(); + if (sizeNow > maxEventBatch) { + ctx.getConfigReader().getfPublisher().sendBatchMessage(topic, kms); + kms.clear(); + batch.clear(); + metricsSet.publishTick(sizeNow); + count += sizeNow; + } + } + + // send the pending batch + final int sizeNow = batch.size(); + if (sizeNow > 0) { + ctx.getConfigReader().getfPublisher().sendBatchMessage(topic, kms); + kms.clear(); + batch.clear(); + metricsSet.publishTick(sizeNow); + count += sizeNow; + } + + final long endMs = System.currentTimeMillis(); + final long totalMs = endMs - startMs; + + LOG.info("Published " + count + " msgs in " + totalMs + "ms for topic " + topic); + + // build a responseP + final JSONObject response = new JSONObject(); + response.put("count", count); + response.put("serverTimeMs", totalMs); + DMaaPResponseBuilder.respondOk(ctx, response); + + } catch (Exception excp) { + int status = HttpStatus.SC_NOT_FOUND; + String errorMsg=null; + if(excp instanceof CambriaApiException) { + status = ((CambriaApiException) excp).getStatus(); + JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody()); + JSONObject errObject = new JSONObject(jsonTokener); + errorMsg = (String) errObject.get("message"); + + } + ErrorResponse errRes = new ErrorResponse(status, + DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(), + errorMessages.getPublishMsgError()+":"+topic+"."+errorMessages.getPublishMsgCount()+count+"."+errorMsg,null,Utils.getFormattedDate(new Date()),topic, + null,ctx.getRequest().getRemoteHost(), + null,null); + LOG.info(errRes.toString()); + throw new CambriaApiException(errRes); + + + } + } + + /** + * + * @param ctx + * @param inputStream + * @param topic + * @param partitionKey + * @param requestTime + * @param chunked + * @param mediaType + * @throws ConfigDbException + * @throws AccessDeniedException + * @throws TopicExistsException + * @throws IOException + * @throws CambriaApiException + */ + private void pushEventsWithTransaction(DMaaPContext ctx, InputStream inputStream, final String topic, + final String partitionKey, final String requestTime, final boolean chunked, final String mediaType) + throws ConfigDbException, AccessDeniedException, TopicExistsException, IOException, + CambriaApiException { + + final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics(); + + // setup the event set + final CambriaEventSet events = new CambriaEventSet(mediaType, inputStream, chunked, partitionKey); + + // start processing, building a batch to push to the backend + final long startMs = System.currentTimeMillis(); + long count = 0; + long maxEventBatch = 1024 * 16; + String evenlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,BATCH_LENGTH); + if(null!=evenlen)maxEventBatch=Long.parseLong(evenlen); + //final long maxEventBatch = ctx.getConfigReader().getSettings().getLong(BATCH_LENGTH, 1024 * 16); + final LinkedList batch = new LinkedList(); + final ArrayList> kms = new ArrayList>(); + + Publisher.message m = null; + int messageSequence = 1; + Long batchId = 1L; + final boolean transactionEnabled = true; + int publishBatchCount=0; + SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SS"); + + //LOG.warn("Batch Start Id: " + Utils.getFromattedBatchSequenceId(batchId)); + try { + // for each message... + batchId=DMaaPContext.getBatchID(); + + String responseTransactionId = null; + + while ((m = events.next()) != null) { + + //LOG.warn("Batch Start Id: " + Utils.getFromattedBatchSequenceId(batchId)); + + + addTransactionDetailsToMessage(m, topic, ctx.getRequest(), requestTime, messageSequence, batchId, + transactionEnabled); + messageSequence++; + + // add the message to the batch + batch.add(m); + + responseTransactionId = m.getLogDetails().getTransactionId(); + + JSONObject jsonObject = new JSONObject(); + jsonObject.put("message", m.getMessage()); + jsonObject.put("transactionId", responseTransactionId); + final KeyedMessage data = new KeyedMessage(topic, m.getKey(), + jsonObject.toString()); + kms.add(data); + + // check if the batch is full + final int sizeNow = batch.size(); + if (sizeNow >= maxEventBatch) { + String startTime = sdf.format(new Date()); + LOG.info("Batch Start Details:[serverIp="+ctx.getRequest().getLocalAddr()+",Batch Start Id=" + batchId+"]"); + try { + ctx.getConfigReader().getfPublisher().sendBatchMessage(topic, kms); + //transactionLogs(batch); + for (message msg : batch) { + LogDetails logDetails = msg.getLogDetails(); + LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails()); + } + } catch (Exception excp) { + + int status = HttpStatus.SC_NOT_FOUND; + String errorMsg=null; + if(excp instanceof CambriaApiException) { + status = ((CambriaApiException) excp).getStatus(); + JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody()); + JSONObject errObject = new JSONObject(jsonTokener); + errorMsg = (String) errObject.get("message"); + } + ErrorResponse errRes = new ErrorResponse(status, + DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(), + "Transaction-"+errorMessages.getPublishMsgError()+":"+topic+ "."+errorMessages.getPublishMsgCount()+count+"."+errorMsg, + null,Utils.getFormattedDate(new Date()),topic, + Utils.getUserApiKey(ctx.getRequest()),ctx.getRequest().getRemoteHost(), + null,null); + LOG.info(errRes.toString()); + throw new CambriaApiException(errRes); + } + kms.clear(); + batch.clear(); + metricsSet.publishTick(sizeNow); + publishBatchCount=sizeNow; + count += sizeNow; + //batchId++; + String endTime = sdf.format(new Date()); + LOG.info("Batch End Details:[serverIp="+ctx.getRequest().getLocalAddr()+",Batch End Id=" + batchId + + ",Batch Total=" + publishBatchCount+",Batch Start Time="+startTime+",Batch End Time="+endTime+"]"); + batchId=DMaaPContext.getBatchID(); + } + } + + // send the pending batch + final int sizeNow = batch.size(); + if (sizeNow > 0) { + String startTime = sdf.format(new Date()); + LOG.info("Batch Start Details:[serverIp="+ctx.getRequest().getLocalAddr()+",Batch Start Id=" + batchId+"]"); + try { + ctx.getConfigReader().getfPublisher().sendBatchMessage(topic, kms); + //transactionLogs(batch); + for (message msg : batch) { + LogDetails logDetails = msg.getLogDetails(); + LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails()); + } + } catch (Exception excp) { + int status = HttpStatus.SC_NOT_FOUND; + String errorMsg=null; + if(excp instanceof CambriaApiException) { + status = ((CambriaApiException) excp).getStatus(); + JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody()); + JSONObject errObject = new JSONObject(jsonTokener); + errorMsg = (String) errObject.get("message"); + } + + ErrorResponse errRes = new ErrorResponse(status, + DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(), + "Transaction-"+errorMessages.getPublishMsgError()+":"+topic+"."+ errorMessages.getPublishMsgCount()+count+"."+errorMsg, + null,Utils.getFormattedDate(new Date()),topic, + Utils.getUserApiKey(ctx.getRequest()),ctx.getRequest().getRemoteHost(), + null,null); + LOG.info(errRes.toString()); + throw new CambriaApiException(errRes); + } + kms.clear(); + metricsSet.publishTick(sizeNow); + count += sizeNow; + //batchId++; + String endTime = sdf.format(new Date()); + publishBatchCount=sizeNow; + LOG.info("Batch End Details:[serverIp="+ctx.getRequest().getLocalAddr()+",Batch End Id=" + batchId + + ",Batch Total=" + publishBatchCount+",Batch Start Time="+startTime+",Batch End Time="+endTime+"]"); + } + + final long endMs = System.currentTimeMillis(); + final long totalMs = endMs - startMs; + + LOG.info("Published " + count + " msgs in " + totalMs + "ms for topic " + topic); + + if (null != responseTransactionId) { + ctx.getResponse().setHeader("transactionId", Utils.getResponseTransactionId(responseTransactionId)); + } + + // build a response + final JSONObject response = new JSONObject(); + response.put("count", count); + response.put("serverTimeMs", totalMs); + DMaaPResponseBuilder.respondOk(ctx, response); + + } catch (Exception excp) { + int status = HttpStatus.SC_NOT_FOUND; + String errorMsg=null; + if(excp instanceof CambriaApiException) { + status = ((CambriaApiException) excp).getStatus(); + JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody()); + JSONObject errObject = new JSONObject(jsonTokener); + errorMsg = (String) errObject.get("message"); + } + + ErrorResponse errRes = new ErrorResponse( + status, + DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(), + "Transaction-"+errorMessages.getPublishMsgError()+":"+topic+"."+errorMessages.getPublishMsgCount()+count+"."+errorMsg,null,Utils.getFormattedDate(new Date()),topic, + Utils.getUserApiKey(ctx.getRequest()),ctx.getRequest().getRemoteHost(), + null,null); + LOG.info(errRes.toString()); + throw new CambriaApiException(errRes); + } + } + + /** + * + * @param msg + * @param topic + * @param request + * @param messageCreationTime + * @param messageSequence + * @param batchId + * @param transactionEnabled + */ + private static void addTransactionDetailsToMessage(message msg, final String topic, HttpServletRequest request, + final String messageCreationTime, final int messageSequence, final Long batchId, + final boolean transactionEnabled) { + LogDetails logDetails = generateLogDetails(topic, request, messageCreationTime, messageSequence, batchId, + transactionEnabled); + logDetails.setMessageLengthInBytes(Utils.messageLengthInBytes(msg.getMessage())); + msg.setTransactionEnabled(transactionEnabled); + msg.setLogDetails(logDetails); + } + + + + /** + * + * @author author + * + */ + private static class LogWrap { + private final String fId; + + /** + * constructor initialization + * + * @param topic + * @param cgroup + * @param cid + */ + public LogWrap(String topic, String cgroup, String cid) { + fId = "[" + topic + "/" + cgroup + "/" + cid + "] "; + } + + /** + * + * @param msg + */ + public void info(String msg) { + LOG.info(fId + msg); + } + + /** + * + * @param msg + * @param t + */ + public void warn(String msg, Exception t) { + LOG.warn(fId + msg, t); + } + + } + + private boolean isTransEnabled() { + String istransidUEBtopicreqd = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"transidUEBtopicreqd"); + boolean istransidreqd=false; + if ((null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true")) ){ + istransidreqd = true; + } + + return istransidreqd; + + } + + private static LogDetails generateLogDetails(final String topicName, HttpServletRequest request, + final String messageTimestamp, int messageSequence, Long batchId, final boolean transactionEnabled) { + LogDetails logDetails = new LogDetails(); + logDetails.setTopicId(topicName); + logDetails.setMessageTimestamp(messageTimestamp); + logDetails.setPublisherId(Utils.getUserApiKey(request)); + logDetails.setPublisherIp(request.getRemoteHost()); + logDetails.setMessageBatchId(batchId); + logDetails.setMessageSequence(String.valueOf(messageSequence)); + logDetails.setTransactionEnabled(transactionEnabled); + logDetails.setTransactionIdTs(Utils.getFormattedDate(new Date())); + logDetails.setServerIp(request.getLocalAddr()); + return logDetails; + } + + /*public String getMetricsTopic() { + return metricsTopic; + } + + public void setMetricsTopic(String metricsTopic) { + this.metricsTopic = metricsTopic; + }*/ + +} \ No newline at end of file diff --git a/src/main/java/com/att/nsa/cambria/service/impl/MMServiceImpl.java b/src/main/java/com/att/nsa/cambria/service/impl/MMServiceImpl.java new file mode 100644 index 0000000..4f6a9a1 --- /dev/null +++ b/src/main/java/com/att/nsa/cambria/service/impl/MMServiceImpl.java @@ -0,0 +1,605 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.cambria.service.impl; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.LinkedList; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.core.Context; + +import org.apache.http.HttpStatus; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +import org.json.JSONObject; +import org.json.JSONTokener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Service; + +import com.att.ajsc.filemonitor.AJSCPropertiesMap; +import com.att.nsa.cambria.CambriaApiException; +import com.att.nsa.cambria.backends.Consumer; +import com.att.nsa.cambria.backends.ConsumerFactory; +import com.att.nsa.cambria.backends.ConsumerFactory.UnavailableException; +import com.att.nsa.cambria.backends.Publisher.message; +import com.att.nsa.cambria.backends.MetricsSet; +import com.att.nsa.cambria.backends.Publisher; + +import com.att.nsa.cambria.beans.DMaaPContext; +import com.att.nsa.cambria.beans.LogDetails; +import com.att.nsa.cambria.constants.CambriaConstants; +import com.att.nsa.cambria.exception.DMaaPAccessDeniedException; +import com.att.nsa.cambria.exception.DMaaPErrorMessages; +import com.att.nsa.cambria.exception.DMaaPResponseCode; +import com.att.nsa.cambria.exception.ErrorResponse; +import com.att.nsa.cambria.metabroker.Broker.TopicExistsException; +import com.att.nsa.cambria.metabroker.Topic; +import com.att.nsa.cambria.resources.CambriaEventSet; +import com.att.nsa.cambria.resources.CambriaOutboundEventStream; +import com.att.nsa.cambria.security.DMaaPAAFAuthenticator; +import com.att.nsa.cambria.security.DMaaPAAFAuthenticatorImpl; +import com.att.nsa.cambria.service.MMService; +import com.att.nsa.cambria.utils.ConfigurationReader; +import com.att.nsa.cambria.utils.DMaaPResponseBuilder; +import com.att.nsa.cambria.utils.Utils; +import com.att.nsa.configs.ConfigDbException; +import com.att.nsa.drumlin.service.standards.MimeTypes; +import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting; + +import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException; +import com.att.nsa.util.rrConvertor; + +import kafka.producer.KeyedMessage; + +@Service +public class MMServiceImpl implements MMService { + private static final String BATCH_LENGTH = "event.batch.length"; + private static final String TRANSFER_ENCODING = "Transfer-Encoding"; + //private static final Logger LOG = Logger.getLogger(MMServiceImpl.class); + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(MMServiceImpl.class); + @Autowired + private DMaaPErrorMessages errorMessages; + + @Autowired + @Qualifier("configurationReader") + private ConfigurationReader configReader; + + // HttpServletRequest object + @Context + private HttpServletRequest request; + + // HttpServletResponse object + @Context + private HttpServletResponse response; + + @Override + public void addWhiteList() { + + } + + @Override + public void removeWhiteList() { + + } + + @Override + public void listWhiteList() { + + } + + @Override + public String subscribe(DMaaPContext ctx, String topic, String consumerGroup, String clientId) + throws ConfigDbException, TopicExistsException, AccessDeniedException, UnavailableException, + CambriaApiException, IOException { + + // final long startTime = System.currentTimeMillis(); + final HttpServletRequest req = ctx.getRequest(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + // was this host blacklisted? + final String remoteAddr = Utils.getRemoteAddress(ctx); + + if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) { + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, + DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), + "Source address [" + remoteAddr + "] is blacklisted. Please contact the cluster management team.", + null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()), + ctx.getRequest().getRemoteHost(), null, null); + LOG.info(errRes.toString()); + throw new CambriaApiException(errRes); + } + + int limit = CambriaConstants.kNoLimit; + + if (req.getParameter("limit") != null) { + limit = Integer.parseInt(req.getParameter("limit")); + } + limit = 1; + // int timeoutMs = 60000; + int timeoutMs = CambriaConstants.kNoTimeout; + String strtimeoutMS = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "timeout"); + if (strtimeoutMS != null) + timeoutMs = Integer.parseInt(strtimeoutMS); + // int timeoutMs = ctx.getConfigReader().getSettings().getInt("timeout", + // CambriaConstants.kNoTimeout); + if (req.getParameter("timeout") != null) { + timeoutMs = Integer.parseInt(req.getParameter("timeout")); + } + + // By default no filter is applied if filter is not passed as a + // parameter in the request URI + String topicFilter = CambriaConstants.kNoFilter; + if (null != req.getParameter("filter")) { + topicFilter = req.getParameter("filter"); + } + // pretty to print the messaages in new line + String prettyval = "0"; + String strPretty = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "pretty"); + if (null != strPretty) + prettyval = strPretty; + + String metaval = "0"; + String strmeta = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "meta"); + if (null != strmeta) + metaval = strmeta; + + final boolean pretty = rrConvertor.convertToBooleanBroad(prettyval); + // withMeta to print offset along with message + final boolean withMeta = rrConvertor.convertToBooleanBroad(metaval); + + // is this user allowed to read this topic? + //final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx); + final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic); + + if (metatopic == null) { + // no such topic. + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, + DMaaPResponseCode.RESOURCE_NOT_FOUND.getResponseCode(), + errorMessages.getTopicNotExist() + "-[" + topic + "]", null, Utils.getFormattedDate(new Date()), + topic, null, null, clientId, ctx.getRequest().getRemoteHost()); + LOG.info(errRes.toString()); + throw new CambriaApiException(errRes); + } + //String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "metrics.send.cambria.topic"); + /* + * if (null==metricTopicname) + * metricTopicname="msgrtr.apinode.metrics.dmaap"; //else if(user!=null) + * if(null==ctx.getRequest().getHeader("Authorization")&& + * !topic.equalsIgnoreCase(metricTopicname)) { if (null != + * metatopic.getOwner() && !("".equals(metatopic.getOwner()))){ // check + * permissions metatopic.checkUserRead(user); } } + */ + + Consumer c = null; + try { + final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics(); + + c = ctx.getConfigReader().getfConsumerFactory().getConsumerFor(topic, consumerGroup, clientId, timeoutMs); + + final CambriaOutboundEventStream coes = new CambriaOutboundEventStream.Builder(c).timeout(timeoutMs) + .limit(limit).filter(topicFilter).pretty(pretty).withMeta(withMeta).build(); + coes.setDmaapContext(ctx); + coes.setTopic(metatopic); + + DMaaPResponseBuilder.setNoCacheHeadings(ctx); + + try { + coes.write(baos); + } catch (Exception ex) { + + } + + c.commitOffsets(); + final int sent = coes.getSentCount(); + + metricsSet.consumeTick(sent); + + } catch (UnavailableException excp) { + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE, + DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(), + errorMessages.getServerUnav() + excp.getMessage(), null, Utils.getFormattedDate(new Date()), topic, + null, null, clientId, ctx.getRequest().getRemoteHost()); + LOG.info(errRes.toString()); + throw new CambriaApiException(errRes); + + } catch (CambriaApiException excp) { + + throw excp; + } catch (Exception excp) { + + ctx.getConfigReader().getfConsumerFactory().destroyConsumer(topic, consumerGroup, clientId); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE, + DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(), + "Couldn't respond to client, closing cambria consumer" + excp.getMessage(), null, + Utils.getFormattedDate(new Date()), topic, null, null, clientId, ctx.getRequest().getRemoteHost()); + LOG.info(errRes.toString()); + throw new CambriaApiException(errRes); + } finally { + + boolean kSetting_EnableCache = ConsumerFactory.kDefault_IsCacheEnabled; + String strkSetting_EnableCache = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, + ConsumerFactory.kSetting_EnableCache); + if (null != strkSetting_EnableCache) + kSetting_EnableCache = Boolean.parseBoolean(strkSetting_EnableCache); + + if (!kSetting_EnableCache && (c != null)) { + c.close(); + + } + } + return baos.toString(); + } + + @Override + public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition, + final String requestTime) throws ConfigDbException, AccessDeniedException, TopicExistsException, + CambriaApiException, IOException, missingReqdSetting { + + //final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx); + //final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic); + + final String remoteAddr = Utils.getRemoteAddress(ctx); + + if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) { + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, + DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), + "Source address [" + remoteAddr + "] is blacklisted. Please contact the cluster management team.", + null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()), + ctx.getRequest().getRemoteHost(), null, null); + LOG.info(errRes.toString()); + throw new CambriaApiException(errRes); + } + + String topicNameStd = null; + + topicNameStd = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, + "enforced.topic.name.AAF"); + String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, + "metrics.send.cambria.topic"); + if (null == metricTopicname) + metricTopicname = "msgrtr.apinode.metrics.dmaap"; + boolean topicNameEnforced = false; + if (null != topicNameStd && topic.startsWith(topicNameStd)) { + topicNameEnforced = true; + } + + final HttpServletRequest req = ctx.getRequest(); + + boolean chunked = false; + if (null != req.getHeader(TRANSFER_ENCODING)) { + chunked = req.getHeader(TRANSFER_ENCODING).contains("chunked"); + } + + String mediaType = req.getContentType(); + if (mediaType == null || mediaType.length() == 0) { + mediaType = MimeTypes.kAppGenericBinary; + } + + if (mediaType.contains("charset=UTF-8")) { + mediaType = mediaType.replace("; charset=UTF-8", "").trim(); + } + + if (!topic.equalsIgnoreCase(metricTopicname)) { + pushEventsWithTransaction(ctx, msg, topic, defaultPartition, requestTime, chunked, mediaType); + } else { + pushEvents(ctx, topic, msg, defaultPartition, chunked, mediaType); + } + } + + private static void addTransactionDetailsToMessage(message msg, final String topic, HttpServletRequest request, + final String messageCreationTime, final int messageSequence, final Long batchId, + final boolean transactionEnabled) { + LogDetails logDetails = generateLogDetails(topic, request, messageCreationTime, messageSequence, batchId, + transactionEnabled); + logDetails.setMessageLengthInBytes(Utils.messageLengthInBytes(msg.getMessage())); + msg.setTransactionEnabled(transactionEnabled); + msg.setLogDetails(logDetails); + } + + private static LogDetails generateLogDetails(final String topicName, HttpServletRequest request, + final String messageTimestamp, int messageSequence, Long batchId, final boolean transactionEnabled) { + LogDetails logDetails = new LogDetails(); + logDetails.setTopicId(topicName); + logDetails.setMessageTimestamp(messageTimestamp); + logDetails.setPublisherId(Utils.getUserApiKey(request)); + logDetails.setPublisherIp(request.getRemoteHost()); + logDetails.setMessageBatchId(batchId); + logDetails.setMessageSequence(String.valueOf(messageSequence)); + logDetails.setTransactionEnabled(transactionEnabled); + logDetails.setTransactionIdTs(Utils.getFormattedDate(new Date())); + logDetails.setServerIp(request.getLocalAddr()); + return logDetails; + } + + private void pushEvents(DMaaPContext ctx, String topic, InputStream msg, String defaultPartition, boolean chunked, + String mediaType) throws ConfigDbException, AccessDeniedException, TopicExistsException, + CambriaApiException, IOException { + final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics(); + + // setup the event set + final CambriaEventSet events = new CambriaEventSet(mediaType, msg, chunked, defaultPartition); + + // start processing, building a batch to push to the backend + final long startMs = System.currentTimeMillis(); + long count = 0; + + long maxEventBatch = 1024 * 16; + String batchlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH); + if (null != batchlen) + maxEventBatch = Long.parseLong(batchlen); + + // long maxEventBatch = + // ctx.getConfigReader().getSettings().getLong(BATCH_LENGTH, 1024 * 16); + final LinkedList batch = new LinkedList(); + final ArrayList> kms = new ArrayList>(); + + try { + // for each message... + Publisher.message m = null; + while ((m = events.next()) != null) { + // add the message to the batch + batch.add(m); + final KeyedMessage data = new KeyedMessage(topic, m.getKey(), + m.getMessage()); + kms.add(data); + // check if the batch is full + final int sizeNow = batch.size(); + if (sizeNow > maxEventBatch) { + ctx.getConfigReader().getfPublisher().sendBatchMessage(topic, kms); + kms.clear(); + batch.clear(); + metricsSet.publishTick(sizeNow); + count += sizeNow; + } + } + + // send the pending batch + final int sizeNow = batch.size(); + if (sizeNow > 0) { + ctx.getConfigReader().getfPublisher().sendBatchMessage(topic, kms); + kms.clear(); + batch.clear(); + metricsSet.publishTick(sizeNow); + count += sizeNow; + } + + final long endMs = System.currentTimeMillis(); + final long totalMs = endMs - startMs; + + LOG.info("Published " + count + " msgs in " + totalMs + "ms for topic " + topic); + + // build a responseP + final JSONObject response = new JSONObject(); + response.put("count", count); + response.put("serverTimeMs", totalMs); + // DMaaPResponseBuilder.respondOk(ctx, response); + + } catch (Exception excp) { + + int status = HttpStatus.SC_NOT_FOUND; + String errorMsg = null; + if (excp instanceof CambriaApiException) { + status = ((CambriaApiException) excp).getStatus(); + JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody()); + JSONObject errObject = new JSONObject(jsonTokener); + errorMsg = (String) errObject.get("message"); + + } + ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(), + errorMessages.getPublishMsgError() + ":" + topic + "." + errorMessages.getPublishMsgCount() + count + + "." + errorMsg, + null, Utils.getFormattedDate(new Date()), topic, null, ctx.getRequest().getRemoteHost(), null, + null); + LOG.info(errRes.toString()); + throw new CambriaApiException(errRes); + + } + } + + private void pushEventsWithTransaction(DMaaPContext ctx, InputStream inputStream, final String topic, + final String partitionKey, final String requestTime, final boolean chunked, final String mediaType) + throws ConfigDbException, AccessDeniedException, TopicExistsException, IOException, + CambriaApiException { + + final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics(); + + // setup the event set + final CambriaEventSet events = new CambriaEventSet(mediaType, inputStream, chunked, partitionKey); + + // start processing, building a batch to push to the backend + final long startMs = System.currentTimeMillis(); + long count = 0; + long maxEventBatch = 1024 * 16; + String evenlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH); + if (null != evenlen) + maxEventBatch = Long.parseLong(evenlen); + + final LinkedList batch = new LinkedList(); + final ArrayList> kms = new ArrayList>(); + + Publisher.message m = null; + int messageSequence = 1; + Long batchId = 1L; + final boolean transactionEnabled = true; + int publishBatchCount = 0; + SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SS"); + + // LOG.warn("Batch Start Id: " + + // Utils.getFromattedBatchSequenceId(batchId)); + try { + // for each message... + batchId = DMaaPContext.getBatchID(); + + String responseTransactionId = null; + + while ((m = events.next()) != null) { + + // LOG.warn("Batch Start Id: " + + // Utils.getFromattedBatchSequenceId(batchId)); + + addTransactionDetailsToMessage(m, topic, ctx.getRequest(), requestTime, messageSequence, batchId, + transactionEnabled); + messageSequence++; + + // add the message to the batch + batch.add(m); + + responseTransactionId = m.getLogDetails().getTransactionId(); + + JSONObject jsonObject = new JSONObject(); + jsonObject.put("message", m.getMessage()); + jsonObject.put("transactionId", responseTransactionId); + final KeyedMessage data = new KeyedMessage(topic, m.getKey(), + jsonObject.toString()); + kms.add(data); + + // check if the batch is full + final int sizeNow = batch.size(); + if (sizeNow >= maxEventBatch) { + String startTime = sdf.format(new Date()); + LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id=" + + batchId + "]"); + try { + ctx.getConfigReader().getfPublisher().sendBatchMessage(topic, kms); + // transactionLogs(batch); + for (message msg : batch) { + LogDetails logDetails = msg.getLogDetails(); + LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails()); + } + } catch (Exception excp) { + + int status = HttpStatus.SC_NOT_FOUND; + String errorMsg = null; + if (excp instanceof CambriaApiException) { + status = ((CambriaApiException) excp).getStatus(); + JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody()); + JSONObject errObject = new JSONObject(jsonTokener); + errorMsg = (String) errObject.get("message"); + } + ErrorResponse errRes = new ErrorResponse(status, + DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(), + "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "." + + errorMessages.getPublishMsgCount() + count + "." + errorMsg, + null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()), + ctx.getRequest().getRemoteHost(), null, null); + LOG.info(errRes.toString()); + throw new CambriaApiException(errRes); + } + kms.clear(); + batch.clear(); + metricsSet.publishTick(sizeNow); + publishBatchCount = sizeNow; + count += sizeNow; + // batchId++; + String endTime = sdf.format(new Date()); + LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id=" + + batchId + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime + + ",Batch End Time=" + endTime + "]"); + batchId = DMaaPContext.getBatchID(); + } + } + + // send the pending batch + final int sizeNow = batch.size(); + if (sizeNow > 0) { + String startTime = sdf.format(new Date()); + LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id=" + + batchId + "]"); + try { + ctx.getConfigReader().getfPublisher().sendBatchMessage(topic, kms); + // transactionLogs(batch); + for (message msg : batch) { + LogDetails logDetails = msg.getLogDetails(); + LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails()); + } + } catch (Exception excp) { + int status = HttpStatus.SC_NOT_FOUND; + String errorMsg = null; + if (excp instanceof CambriaApiException) { + status = ((CambriaApiException) excp).getStatus(); + JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody()); + JSONObject errObject = new JSONObject(jsonTokener); + errorMsg = (String) errObject.get("message"); + } + + ErrorResponse errRes = new ErrorResponse(status, + DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(), + "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "." + + errorMessages.getPublishMsgCount() + count + "." + errorMsg, + null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()), + ctx.getRequest().getRemoteHost(), null, null); + LOG.info(errRes.toString()); + throw new CambriaApiException(errRes); + } + kms.clear(); + metricsSet.publishTick(sizeNow); + count += sizeNow; + // batchId++; + String endTime = sdf.format(new Date()); + publishBatchCount = sizeNow; + LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id=" + batchId + + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime + ",Batch End Time=" + + endTime + "]"); + } + + final long endMs = System.currentTimeMillis(); + final long totalMs = endMs - startMs; + + LOG.info("Published " + count + " msgs in " + totalMs + "ms for topic " + topic); + + // build a response + final JSONObject response = new JSONObject(); + response.put("count", count); + response.put("serverTimeMs", totalMs); + + } catch (Exception excp) { + int status = HttpStatus.SC_NOT_FOUND; + String errorMsg = null; + if (excp instanceof CambriaApiException) { + status = ((CambriaApiException) excp).getStatus(); + JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody()); + JSONObject errObject = new JSONObject(jsonTokener); + errorMsg = (String) errObject.get("message"); + } + + ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(), + "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "." + + errorMessages.getPublishMsgCount() + count + "." + errorMsg, + null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()), + ctx.getRequest().getRemoteHost(), null, null); + LOG.info(errRes.toString()); + throw new CambriaApiException(errRes); + } + } +} diff --git a/src/main/java/com/att/nsa/cambria/service/impl/MetricsServiceImpl.java b/src/main/java/com/att/nsa/cambria/service/impl/MetricsServiceImpl.java new file mode 100644 index 0000000..1a1baf5 --- /dev/null +++ b/src/main/java/com/att/nsa/cambria/service/impl/MetricsServiceImpl.java @@ -0,0 +1,115 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.cambria.service.impl; + +import java.io.IOException; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import org.json.JSONObject; +import org.springframework.stereotype.Component; + +import com.att.nsa.cambria.CambriaApiException; +import com.att.nsa.cambria.backends.MetricsSet; +import com.att.nsa.cambria.beans.DMaaPContext; +import com.att.nsa.cambria.service.MetricsService; +import com.att.nsa.cambria.utils.DMaaPResponseBuilder; +import com.att.nsa.metrics.CdmMeasuredItem; + +/** + * + * + * This will provide all the generated metrics details also it can provide the + * get metrics details + * + * + * @author author + * + * + */ +@Component +public class MetricsServiceImpl implements MetricsService { + + //private static final Logger LOG = Logger.getLogger(MetricsService.class.toString()); + private static final EELFLogger LOG = EELFManager.getInstance().getLogger(MetricsService.class); + /** + * + * + * @param ctx + * @throws IOException + * + * + * get Metric details + * + */ + @Override + + public void get(DMaaPContext ctx) throws IOException { + LOG.info("Inside : MetricsServiceImpl : get()"); + final MetricsSet metrics = ctx.getConfigReader().getfMetrics(); + DMaaPResponseBuilder.setNoCacheHeadings(ctx); + final JSONObject result = metrics.toJson(); + DMaaPResponseBuilder.respondOk(ctx, result); + LOG.info("============ Metrics generated : " + result.toString() + "================="); + + } + + + @Override + /** + * + * get Metric by name + * + * + * @param ctx + * @param name + * @throws IOException + * @throws CambriaApiException + * + * + */ + public void getMetricByName(DMaaPContext ctx, String name) throws IOException, CambriaApiException { + LOG.info("Inside : MetricsServiceImpl : getMetricByName()"); + final MetricsSet metrics = ctx.getConfigReader().getfMetrics(); + + final CdmMeasuredItem item = metrics.getItem(name); + /** + * check if item is null + */ + if (item == null) { + throw new CambriaApiException(404, "No metric named [" + name + "]."); + } + + final JSONObject entry = new JSONObject(); + entry.put("summary", item.summarize()); + entry.put("raw", item.getRawValueString()); + + DMaaPResponseBuilder.setNoCacheHeadings(ctx); + + final JSONObject result = new JSONObject(); + result.put(name, entry); + + DMaaPResponseBuilder.respondOk(ctx, result); + LOG.info("============ Metrics generated : " + entry.toString() + "================="); + } + +} diff --git a/src/main/java/com/att/nsa/cambria/service/impl/TopicServiceImpl.java b/src/main/java/com/att/nsa/cambria/service/impl/TopicServiceImpl.java new file mode 100644 index 0000000..658523d --- /dev/null +++ b/src/main/java/com/att/nsa/cambria/service/impl/TopicServiceImpl.java @@ -0,0 +1,649 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +/** + * + */ +package com.att.nsa.cambria.service.impl; + +import java.io.IOException; + +import org.apache.http.HttpStatus; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import com.att.ajsc.filemonitor.AJSCPropertiesMap; +import com.att.nsa.cambria.CambriaApiException; +import com.att.nsa.cambria.beans.DMaaPContext; +import com.att.nsa.cambria.beans.DMaaPKafkaMetaBroker; +import com.att.nsa.cambria.beans.TopicBean; +import com.att.nsa.cambria.constants.CambriaConstants; +import com.att.nsa.cambria.exception.DMaaPAccessDeniedException; +import com.att.nsa.cambria.exception.DMaaPErrorMessages; +import com.att.nsa.cambria.exception.DMaaPResponseCode; +import com.att.nsa.cambria.exception.ErrorResponse; +import com.att.nsa.cambria.metabroker.Broker; +import com.att.nsa.cambria.metabroker.Broker.TopicExistsException; +import com.att.nsa.cambria.metabroker.Topic; +import com.att.nsa.cambria.security.DMaaPAAFAuthenticator; +import com.att.nsa.cambria.security.DMaaPAAFAuthenticatorImpl; +import com.att.nsa.cambria.security.DMaaPAuthenticatorImpl; +import com.att.nsa.cambria.service.TopicService; +import com.att.nsa.cambria.utils.DMaaPResponseBuilder; +import com.att.nsa.configs.ConfigDbException; +import com.att.nsa.security.NsaAcl; +import com.att.nsa.security.NsaApiKey; +import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException; + +/** + * @author author + * + */ +@Service +public class TopicServiceImpl implements TopicService { + + //private static final Logger LOGGER = Logger.getLogger(TopicServiceImpl.class); + private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicServiceImpl.class); + @Autowired + private DMaaPErrorMessages errorMessages; + + //@Value("${msgRtr.topicfactory.aaf}") + //private String mrFactory; + + + /** + * @param dmaapContext + * @throws JSONException + * @throws ConfigDbException + * @throws IOException + * + */ + @Override + public void getTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException { + + LOGGER.info("Fetching list of all the topics."); + JSONObject json = new JSONObject(); + + JSONArray topicsList = new JSONArray(); + + for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) { + topicsList.put(topic.getName()); + } + + json.put("topics", topicsList); + + LOGGER.info("Returning list of all the topics."); + DMaaPResponseBuilder.respondOk(dmaapContext, json); + + } + + /** + * @param dmaapContext + * @throws JSONException + * @throws ConfigDbException + * @throws IOException + * + */ + public void getAllTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException { + + LOGGER.info("Fetching list of all the topics."); + JSONObject json = new JSONObject(); + + JSONArray topicsList = new JSONArray(); + + for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) { + JSONObject obj = new JSONObject(); + obj.put("topicName", topic.getName()); + //obj.put("description", topic.getDescription()); + obj.put("owner", topic.getOwner()); + obj.put("txenabled", topic.isTransactionEnabled()); + topicsList.put(obj); + } + + json.put("topics", topicsList); + + LOGGER.info("Returning list of all the topics."); + DMaaPResponseBuilder.respondOk(dmaapContext, json); + + } + + + /** + * @param dmaapContext + * @param topicName + * @throws ConfigDbException + * @throws IOException + * @throws TopicExistsException + */ + @Override + public void getTopic(DMaaPContext dmaapContext, String topicName) + throws ConfigDbException, IOException, TopicExistsException { + + LOGGER.info("Fetching details of topic " + topicName); + Topic t = getMetaBroker(dmaapContext).getTopic(topicName); + + if (null == t) { + LOGGER.error("Topic [" + topicName + "] does not exist."); + throw new TopicExistsException("Topic [" + topicName + "] does not exist."); + } + + JSONObject o = new JSONObject(); + o.put ( "name", t.getName () ); + o.put ( "description", t.getDescription () ); + + if (null!=t.getOwners ()) + o.put ( "owner", t.getOwners ().iterator ().next () ); + if(null!=t.getReaderAcl ()) + o.put ( "readerAcl", aclToJson ( t.getReaderAcl () ) ); + if(null!=t.getWriterAcl ()) + o.put ( "writerAcl", aclToJson ( t.getWriterAcl () ) ); + + LOGGER.info("Returning details of topic " + topicName); + DMaaPResponseBuilder.respondOk(dmaapContext, o); + + } + + + /** + * @param dmaapContext + * @param topicBean + * @throws CambriaApiException + * @throws AccessDeniedException + * @throws IOException + * @throws TopicExistsException + * @throws JSONException + * + * + * + */ + @Override + public void createTopic(DMaaPContext dmaapContext, TopicBean topicBean) + throws CambriaApiException, DMaaPAccessDeniedException,IOException, TopicExistsException { + + LOGGER.info("Creating topic " + topicBean.getTopicName()); + + final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext); + String key = null; + String appName=dmaapContext.getRequest().getHeader("AppName"); + String enfTopicName= com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,"enforced.topic.name.AAF"); + + if(user != null) + { + key = user.getKey(); + + if( enfTopicName != null && topicBean.getTopicName().indexOf(enfTopicName) >=0 ) { + + LOGGER.error("Failed to create topic"+topicBean.getTopicName()+", Authentication failed."); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, + DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), + errorMessages.getCreateTopicFail()+" "+errorMessages.getNotPermitted1()+" create "+errorMessages.getNotPermitted2()); + LOGGER.info(errRes.toString()); + throw new DMaaPAccessDeniedException(errRes); + + } + } + + //else if (user==null && (null==dmaapContext.getRequest().getHeader("Authorization") && null == dmaapContext.getRequest().getHeader("cookie")) ) { + else if (user == null && null==dmaapContext.getRequest().getHeader("Authorization") && + (null == appName && null == dmaapContext.getRequest().getHeader("cookie"))) { + LOGGER.error("Failed to create topic"+topicBean.getTopicName()+", Authentication failed."); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, + DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), + errorMessages.getCreateTopicFail()+" "+errorMessages.getNotPermitted1()+" create "+errorMessages.getNotPermitted2()); + LOGGER.info(errRes.toString()); + throw new DMaaPAccessDeniedException(errRes); + } + + if (user == null && (null!=dmaapContext.getRequest().getHeader("Authorization") || + null != dmaapContext.getRequest().getHeader("cookie"))) { + //if (user == null && (null!=dmaapContext.getRequest().getHeader("Authorization") || null != dmaapContext.getRequest().getHeader("cookie"))) { + // ACL authentication is not provided so we will use the aaf authentication + LOGGER.info("Authorization the topic"); + + String permission = ""; + String nameSpace=""; + if(topicBean.getTopicName().indexOf(".")>1) + nameSpace = topicBean.getTopicName().substring(0,topicBean.getTopicName().lastIndexOf(".")); + + String mrFactoryVal=AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"msgRtr.topicfactory.aaf"); + + //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSettings_KafkaZookeeper); + + permission = mrFactoryVal+nameSpace+"|create"; + DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); + + if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) + { + + LOGGER.error("Failed to create topic"+topicBean.getTopicName()+", Authentication failed."); + + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, + DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), + errorMessages.getCreateTopicFail()+" "+errorMessages.getNotPermitted1()+" create "+errorMessages.getNotPermitted2()); + LOGGER.info(errRes.toString()); + throw new DMaaPAccessDeniedException(errRes); + + }else{ + // if user is null and aaf authentication is ok then key should be "" + //key = ""; + /** + * Added as part of AAF user it should return username + */ + + key = dmaapContext.getRequest().getUserPrincipal().getName().toString(); + LOGGER.info("key ==================== "+key); + + } + } + + try { + final String topicName = topicBean.getTopicName(); + final String desc = topicBean.getTopicDescription(); + + final int partitions = topicBean.getPartitionCount(); + + final int replicas = topicBean.getReplicationCount(); + boolean transactionEnabled = topicBean.isTransactionEnabled(); + + + final Broker metabroker = getMetaBroker(dmaapContext); + final Topic t = metabroker.createTopic(topicName, desc, key, partitions, replicas, + transactionEnabled); + + LOGGER.info("Topic created successfully. Sending response"); + DMaaPResponseBuilder.respondOk(dmaapContext, topicToJson(t)); + } catch (JSONException excp) { + + LOGGER.error("Failed to create topic. Couldn't parse JSON data.", excp); + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST, + DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), + errorMessages.getIncorrectJson()); + LOGGER.info(errRes.toString()); + throw new CambriaApiException(errRes); + + } + } + + /** + * @param dmaapContext + * @param topicName + * @throws ConfigDbException + * @throws IOException + * @throws TopicExistsException + * @throws CambriaApiException + * @throws AccessDeniedException + */ + @Override + public void deleteTopic(DMaaPContext dmaapContext, String topicName) + throws IOException, ConfigDbException, CambriaApiException, TopicExistsException, DMaaPAccessDeniedException, AccessDeniedException { + + LOGGER.info("Deleting topic " + topicName); + final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext); + + if (user == null && null!=dmaapContext.getRequest().getHeader("Authorization")) { + LOGGER.info("Authenticating the user, as ACL authentication is not provided"); +// String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage"; + String permission = ""; + String nameSpace = topicName.substring(0,topicName.lastIndexOf(".")); + String mrFactoryVal=AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"msgRtr.topicfactory.aaf"); +// String tokens[] = topicName.split(".mr.topic."); + permission = mrFactoryVal+nameSpace+"|destroy"; + DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); + if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) + { + LOGGER.error("Failed to delete topi"+topicName+". Authentication failed."); + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, + DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), + errorMessages.getCreateTopicFail()+" "+errorMessages.getNotPermitted1()+" delete "+errorMessages.getNotPermitted2()); + LOGGER.info(errRes.toString()); + throw new DMaaPAccessDeniedException(errRes); + } + + + } + + final Broker metabroker = getMetaBroker(dmaapContext); + final Topic topic = metabroker.getTopic(topicName); + + if (topic == null) { + LOGGER.error("Failed to delete topic. Topic [" + topicName + "] does not exist."); + throw new TopicExistsException("Failed to delete topic. Topic [" + topicName + "] does not exist."); + } + + metabroker.deleteTopic(topicName); + + LOGGER.info("Topic [" + topicName + "] deleted successfully. Sending response."); + DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Topic [" + topicName + "] deleted successfully"); + + } + + /** + * + * @param dmaapContext + * @return + */ + private DMaaPKafkaMetaBroker getMetaBroker(DMaaPContext dmaapContext) { + return (DMaaPKafkaMetaBroker) dmaapContext.getConfigReader().getfMetaBroker(); + } + + /** + * @param dmaapContext + * @param topicName + * @throws ConfigDbException + * @throws IOException + * @throws TopicExistsException + * + */ + @Override + public void getPublishersByTopicName(DMaaPContext dmaapContext, String topicName) + throws ConfigDbException, IOException, TopicExistsException { + LOGGER.info("Retrieving list of all the publishers for topic " + topicName); + Topic topic = getMetaBroker(dmaapContext).getTopic(topicName); + + if (topic == null) { + LOGGER.error("Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist."); + throw new TopicExistsException( + "Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist."); + } + + + + final NsaAcl acl = topic.getWriterAcl(); + + LOGGER.info("Returning list of all the publishers for topic " + topicName + ". Sending response."); + DMaaPResponseBuilder.respondOk(dmaapContext, aclToJson(acl)); + + } + + /** + * + * @param acl + * @return + */ + private static JSONObject aclToJson(NsaAcl acl) { + final JSONObject o = new JSONObject(); + if (acl == null) { + o.put("enabled", false); + o.put("users", new JSONArray()); + } else { + o.put("enabled", acl.isActive()); + + final JSONArray a = new JSONArray(); + for (String user : acl.getUsers()) { + a.put(user); + } + o.put("users", a); + } + return o; + } + + /** + * @param dmaapContext + * @param topicName + */ + @Override + public void getConsumersByTopicName(DMaaPContext dmaapContext, String topicName) + throws IOException, ConfigDbException, TopicExistsException { + LOGGER.info("Retrieving list of all the consumers for topic " + topicName); + Topic topic = getMetaBroker(dmaapContext).getTopic(topicName); + + if (topic == null) { + LOGGER.error("Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist."); + throw new TopicExistsException( + "Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist."); + } + + final NsaAcl acl = topic.getReaderAcl(); + + LOGGER.info("Returning list of all the consumers for topic " + topicName + ". Sending response."); + DMaaPResponseBuilder.respondOk(dmaapContext, aclToJson(acl)); + + } + + /** + * + * @param t + * @return + */ + private static JSONObject topicToJson(Topic t) { + final JSONObject o = new JSONObject(); + + o.put("name", t.getName()); + o.put("description", t.getDescription()); + o.put("owner", t.getOwner()); + o.put("readerAcl", aclToJson(t.getReaderAcl())); + o.put("writerAcl", aclToJson(t.getWriterAcl())); + + return o; + } + + /** + * @param dmaapContext + * @param topicName + * @param producerId + * @throws ConfigDbException + * @throws IOException + * @throws TopicExistsException + * @throws AccessDeniedException + * @throws + * + */ + @Override + public void permitPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId) + throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,CambriaApiException { + + LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName); + final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext); + +// if (user == null) { +// +// LOGGER.info("Authenticating the user, as ACL authentication is not provided"); +//// String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage"; +// +// DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); +// String permission = aaf.aafPermissionString(topicName, "manage"); +// if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) +// { +// LOGGER.error("Failed to permit write access to producer [" + producerId + "] for topic " + topicName +// + ". Authentication failed."); +// ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, +// DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), +// errorMessages.getNotPermitted1()+" "+errorMessages.getNotPermitted2()+ topicName); +// LOGGER.info(errRes); +// throw new DMaaPAccessDeniedException(errRes); +// } +// } + + Topic topic = getMetaBroker(dmaapContext).getTopic(topicName); + + if (null == topic) { + LOGGER.error("Failed to permit write access to producer [" + producerId + "] for topic. Topic [" + topicName + + "] does not exist."); + throw new TopicExistsException("Failed to permit write access to producer [" + producerId + + "] for topic. Topic [" + topicName + "] does not exist."); + } + + topic.permitWritesFromUser(producerId, user); + + LOGGER.info("Write access has been granted to producer [" + producerId + "] for topic [" + topicName + + "]. Sending response."); + DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Write access has been granted to publisher."); + + } + + /** + * @param dmaapContext + * @param topicName + * @param producerId + * @throws ConfigDbException + * @throws IOException + * @throws TopicExistsException + * @throws AccessDeniedException + * @throws DMaaPAccessDeniedException + * + */ + @Override + public void denyPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId) + throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, DMaaPAccessDeniedException { + + LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName); + final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext); +// if (user == null) { +// +//// String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage"; +// DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); +// String permission = aaf.aafPermissionString(topicName, "manage"); +// if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) +// { +// LOGGER.error("Failed to revoke write access to producer [" + producerId + "] for topic " + topicName +// + ". Authentication failed."); +// ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, +// DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), +// errorMessages.getNotPermitted1()+" "+errorMessages.getNotPermitted2()+ topicName); +// LOGGER.info(errRes); +// throw new DMaaPAccessDeniedException(errRes); +// +// } +// } + + Topic topic = getMetaBroker(dmaapContext).getTopic(topicName); + + if (null == topic) { + LOGGER.error("Failed to revoke write access to producer [" + producerId + "] for topic. Topic [" + topicName + + "] does not exist."); + throw new TopicExistsException("Failed to revoke write access to producer [" + producerId + + "] for topic. Topic [" + topicName + "] does not exist."); + } + + topic.denyWritesFromUser(producerId, user); + + LOGGER.info("Write access has been revoked to producer [" + producerId + "] for topic [" + topicName + + "]. Sending response."); + DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Write access has been revoked for publisher."); + + } + + /** + * @param dmaapContext + * @param topicName + * @param consumerId + * @throws DMaaPAccessDeniedException + */ + @Override + public void permitConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId) + throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, DMaaPAccessDeniedException { + + LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName); + final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext); +// if (user == null) { +// +//// String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage"; +// DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); +// String permission = aaf.aafPermissionString(topicName, "manage"); +// if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) +// { +// LOGGER.error("Failed to permit read access to consumer [" + consumerId + "] for topic " + topicName +// + ". Authentication failed."); +// ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, +// DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), +// errorMessages.getNotPermitted1()+" "+errorMessages.getNotPermitted2()+ topicName); +// LOGGER.info(errRes); +// throw new DMaaPAccessDeniedException(errRes); +// } +// } + + Topic topic = getMetaBroker(dmaapContext).getTopic(topicName); + + if (null == topic) { + LOGGER.error("Failed to permit read access to consumer [" + consumerId + "] for topic. Topic [" + topicName + + "] does not exist."); + throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId + + "] for topic. Topic [" + topicName + "] does not exist."); + } + + topic.permitReadsByUser(consumerId, user); + + LOGGER.info("Read access has been granted to consumer [" + consumerId + "] for topic [" + topicName + + "]. Sending response."); + DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, + "Read access has been granted for consumer [" + consumerId + "] for topic [" + topicName + "]."); + } + + /** + * @param dmaapContext + * @param topicName + * @param consumerId + * @throws DMaaPAccessDeniedException + */ + @Override + public void denyConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId) + throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, DMaaPAccessDeniedException { + + LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName); + final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext); +// if (user == null) { +//// String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage"; +// DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); +// String permission = aaf.aafPermissionString(topicName, "manage"); +// if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) +// { +// LOGGER.error("Failed to revoke read access to consumer [" + consumerId + "] for topic " + topicName +// + ". Authentication failed."); +// ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, +// DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), +// errorMessages.getNotPermitted1()+" "+errorMessages.getNotPermitted2()+ topicName); +// LOGGER.info(errRes); +// throw new DMaaPAccessDeniedException(errRes); +// } +// +// +// } + + Topic topic = getMetaBroker(dmaapContext).getTopic(topicName); + + if (null == topic) { + LOGGER.error("Failed to revoke read access to consumer [" + consumerId + "] for topic. Topic [" + topicName + + "] does not exist."); + throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId + + "] for topic. Topic [" + topicName + "] does not exist."); + } + + topic.denyReadsByUser(consumerId, user); + + LOGGER.info("Read access has been revoked to consumer [" + consumerId + "] for topic [" + topicName + + "]. Sending response."); + DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, + "Read access has been revoked for consumer [" + consumerId + "] for topic [" + topicName + "]."); + + } + + + + + +} diff --git a/src/main/java/com/att/nsa/cambria/service/impl/TransactionServiceImpl.java b/src/main/java/com/att/nsa/cambria/service/impl/TransactionServiceImpl.java new file mode 100644 index 0000000..9da2852 --- /dev/null +++ b/src/main/java/com/att/nsa/cambria/service/impl/TransactionServiceImpl.java @@ -0,0 +1,100 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.cambria.service.impl; + +import java.io.IOException; + +import org.springframework.stereotype.Service; + +import com.att.aft.dme2.internal.jettison.json.JSONException; +import com.att.nsa.cambria.beans.DMaaPContext; +import com.att.nsa.cambria.service.TransactionService; +import com.att.nsa.cambria.transaction.TransactionObj; +import com.att.nsa.configs.ConfigDbException; + +/** + * Once the transaction rest gateway will be using that time it will provide all + * the transaction details like fetching all the transactional objects or get + * any particular transaction object details + * + * @author author + * + */ +@Service +public class TransactionServiceImpl implements TransactionService { + + @Override + public void checkTransaction(TransactionObj trnObj) { + /* Need to implement the method */ + } + + @Override + public void getAllTransactionObjs(DMaaPContext dmaapContext) + throws ConfigDbException, IOException { + + /* + * ConfigurationReader configReader = dmaapContext.getConfigReader(); + * + * LOG.info("configReader : "+configReader.toString()); + * + * final JSONObject result = new JSONObject (); final JSONArray + * transactionIds = new JSONArray (); result.put ( "transactionIds", + * transactionIds ); + * + * DMaaPTransactionObjDB transDb = + * configReader.getfTranDb(); + * + * for (String transactionId : transDb.loadAllTransactionObjs()) { + * transactionIds.put (transactionId); } LOG.info( + * "========== TransactionServiceImpl: getAllTransactionObjs: Transaction objects are : " + * + transactionIds.toString()+"==========="); + * DMaaPResponseBuilder.respondOk(dmaapContext, result); + */ + } + + @Override + public void getTransactionObj(DMaaPContext dmaapContext, + String transactionId) throws ConfigDbException, JSONException, + IOException { + + /* + * if (null != transactionId) { + * + * ConfigurationReader configReader = dmaapContext.getConfigReader(); + * + * DMaaPTransactionObj trnObj; + * + * trnObj = configReader.getfTranDb().loadTransactionObj(transactionId); + * + * + * if (null != trnObj) { trnObj.serialize(); JSONObject result = + * trnObj.asJsonObject(); DMaaPResponseBuilder.respondOk(dmaapContext, + * result); + * LOG.info("========== TransactionServiceImpl: getTransactionObj : "+ + * result.toString()+"==========="); return; } + * + * } LOG.info( + * "========== TransactionServiceImpl: getTransactionObj: Error : Transaction object does not exist. " + * +"==========="); + */ + } +} diff --git a/src/main/java/com/att/nsa/cambria/service/impl/UIServiceImpl.java b/src/main/java/com/att/nsa/cambria/service/impl/UIServiceImpl.java new file mode 100644 index 0000000..0fbf657 --- /dev/null +++ b/src/main/java/com/att/nsa/cambria/service/impl/UIServiceImpl.java @@ -0,0 +1,206 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.cambria.service.impl; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import org.json.JSONArray; +import org.json.JSONObject; +import org.springframework.stereotype.Service; + +import com.att.nsa.cambria.beans.DMaaPContext; +import com.att.nsa.cambria.beans.DMaaPKafkaMetaBroker; +import com.att.nsa.cambria.metabroker.Topic; +import com.att.nsa.cambria.service.UIService; +import com.att.nsa.cambria.utils.DMaaPResponseBuilder; +import com.att.nsa.configs.ConfigDbException; +import com.att.nsa.security.db.NsaApiDb; +import com.att.nsa.security.db.simple.NsaSimpleApiKey; + +import kafka.common.TopicExistsException; + +/** + * @author author + * + */ +@Service +public class UIServiceImpl implements UIService { + + //private static final Logger LOGGER = Logger.getLogger(UIServiceImpl.class); + private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(UIServiceImpl.class); + /** + * Returning template of hello page + * @param dmaapContext + * @throws IOException + */ + @Override + public void hello(DMaaPContext dmaapContext) throws IOException { + LOGGER.info("Returning template of hello page."); + DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "templates/hello.html"); + } + + /** + * Fetching list of all api keys and returning in a templated form for display. + * @param dmaapContext + * @throws ConfigDbException + * @throws IOException + */ + @Override + public void getApiKeysTable(DMaaPContext dmaapContext) throws ConfigDbException, IOException { + // TODO - We need to work on the templates and how data will be set in + // the template + LOGGER.info("Fetching list of all api keys and returning in a templated form for display."); + Map keyMap = getApiKeyDb(dmaapContext).loadAllKeyRecords(); + + LinkedList keyList = new LinkedList(); + + JSONObject jsonList = new JSONObject(); + + for (Entry e : keyMap.entrySet()) { + final NsaSimpleApiKey key = e.getValue(); + final JSONObject jsonObject = new JSONObject(); + jsonObject.put("key", key.getKey()); + jsonObject.put("email", key.getContactEmail()); + jsonObject.put("description", key.getDescription()); + keyList.add(jsonObject); + } + + jsonList.put("apiKeys", keyList); + + LOGGER.info("Returning list of all the api keys in JSON format for the template."); + // "templates/apiKeyList.html" + DMaaPResponseBuilder.respondOk(dmaapContext, jsonList); + + } + + /** + * @param dmaapContext + * @param apiKey + * @throws Exception + */ + @Override + public void getApiKey(DMaaPContext dmaapContext, String apiKey) throws Exception { + // TODO - We need to work on the templates and how data will be set in + // the template + LOGGER.info("Fetching detials of apikey: " + apiKey); + final NsaSimpleApiKey key = getApiKeyDb(dmaapContext).loadApiKey(apiKey); + + if (null != key) { + LOGGER.info("Details of apikey [" + apiKey + "] found. Returning response"); + DMaaPResponseBuilder.respondOk(dmaapContext, key.asJsonObject()); + } else { + LOGGER.info("Details of apikey [" + apiKey + "] not found. Returning response"); + throw new Exception("Key [" + apiKey + "] not found."); + } + + } + + /** + * Fetching list of all the topics + * @param dmaapContext + * @throws ConfigDbException + * @throws IOException + */ + @Override + public void getTopicsTable(DMaaPContext dmaapContext) throws ConfigDbException, IOException { + // TODO - We need to work on the templates and how data will be set in + // the template + LOGGER.info("Fetching list of all the topics and returning in a templated form for display"); + List topicsList = getMetaBroker(dmaapContext).getAllTopics(); + + JSONObject jsonObject = new JSONObject(); + + JSONArray topicsArray = new JSONArray(); + + List topicList = getMetaBroker(dmaapContext).getAllTopics(); + + for (Topic topic : topicList) { + JSONObject obj = new JSONObject(); + obj.put("topicName", topic.getName()); + obj.put("description", topic.getDescription()); + obj.put("owner", topic.getOwner()); + topicsArray.put(obj); + } + + jsonObject.put("topics", topicsList); + + LOGGER.info("Returning the list of topics in templated format for display."); + DMaaPResponseBuilder.respondOk(dmaapContext, jsonObject); + + } + + /** + * @param dmaapContext + * @param topicName + * @throws ConfigDbException + * @throws IOException + * @throws TopicExistsException + */ + @Override + public void getTopic(DMaaPContext dmaapContext, String topicName) + throws ConfigDbException, IOException, TopicExistsException { + // TODO - We need to work on the templates and how data will be set in + // the template + LOGGER.info("Fetching detials of apikey: " + topicName); + Topic topic = getMetaBroker(dmaapContext).getTopic(topicName); + + if (null == topic) { + LOGGER.error("Topic [" + topicName + "] does not exist."); + throw new TopicExistsException("Topic [" + topicName + "] does not exist."); + } + + JSONObject json = new JSONObject(); + json.put("topicName", topic.getName()); + json.put("description", topic.getDescription()); + json.put("owner", topic.getOwner()); + + LOGGER.info("Returning details of topic [" + topicName + "]. Sending response."); + DMaaPResponseBuilder.respondOk(dmaapContext, json); + + } + + /** + * + * @param dmaapContext + * @return + */ + private NsaApiDb getApiKeyDb(DMaaPContext dmaapContext) { + return dmaapContext.getConfigReader().getfApiKeyDb(); + + } + + /** + * + * @param dmaapContext + * @return + */ + private DMaaPKafkaMetaBroker getMetaBroker(DMaaPContext dmaapContext) { + return (DMaaPKafkaMetaBroker) dmaapContext.getConfigReader().getfMetaBroker(); + } + +} -- cgit 1.2.3-korg