summaryrefslogtreecommitdiffstats
path: root/src/main/java/com/att/nsa/cambria/service
diff options
context:
space:
mode:
authorVarun Gudisena <vg411h@att.com>2017-08-31 10:52:33 -0500
committerVarun Gudisena <vg411h@att.com>2017-08-31 10:52:50 -0500
commit3fc19dc9157f4d05bdbd6fd05a52f0592268c4e7 (patch)
tree69355ec5a2a03a1867862e6b757b51c45763ef1a /src/main/java/com/att/nsa/cambria/service
parentca63da6e0cb7fb63e231343d0b52a40036f6b6aa (diff)
Revert package name changes
Reverted package name changes to avoid any potential issues. Renamed maven group id only. Issue-id: DMAAP-74 Change-Id: Ic741b602ade60f108d940c0571a1d94b7be2abc2 Signed-off-by: Varun Gudisena <vg411h@att.com>
Diffstat (limited to 'src/main/java/com/att/nsa/cambria/service')
-rw-r--r--src/main/java/com/att/nsa/cambria/service/AdminService.java83
-rw-r--r--src/main/java/com/att/nsa/cambria/service/ApiKeysService.java105
-rw-r--r--src/main/java/com/att/nsa/cambria/service/EventsService.java75
-rw-r--r--src/main/java/com/att/nsa/cambria/service/MMService.java68
-rw-r--r--src/main/java/com/att/nsa/cambria/service/MetricsService.java54
-rw-r--r--src/main/java/com/att/nsa/cambria/service/TopicService.java176
-rw-r--r--src/main/java/com/att/nsa/cambria/service/TransactionService.java61
-rw-r--r--src/main/java/com/att/nsa/cambria/service/UIService.java91
-rw-r--r--src/main/java/com/att/nsa/cambria/service/impl/AdminServiceImpl.java188
-rw-r--r--src/main/java/com/att/nsa/cambria/service/impl/ApiKeysServiceImpl.java326
-rw-r--r--src/main/java/com/att/nsa/cambria/service/impl/BaseTransactionDbImpl.java153
-rw-r--r--src/main/java/com/att/nsa/cambria/service/impl/EventsServiceImpl.java788
-rw-r--r--src/main/java/com/att/nsa/cambria/service/impl/MMServiceImpl.java605
-rw-r--r--src/main/java/com/att/nsa/cambria/service/impl/MetricsServiceImpl.java115
-rw-r--r--src/main/java/com/att/nsa/cambria/service/impl/TopicServiceImpl.java649
-rw-r--r--src/main/java/com/att/nsa/cambria/service/impl/TransactionServiceImpl.java100
-rw-r--r--src/main/java/com/att/nsa/cambria/service/impl/UIServiceImpl.java206
17 files changed, 3843 insertions, 0 deletions
diff --git a/src/main/java/com/att/nsa/cambria/service/AdminService.java b/src/main/java/com/att/nsa/cambria/service/AdminService.java
new file mode 100644
index 0000000..6f0d9cf
--- /dev/null
+++ b/src/main/java/com/att/nsa/cambria/service/AdminService.java
@@ -0,0 +1,83 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.cambria.service;
+
+import java.io.IOException;
+
+import org.json.JSONException;
+
+import com.att.nsa.cambria.beans.DMaaPContext;
+import com.att.nsa.configs.ConfigDbException;
+import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
+
+/**
+ * @author author
+ *
+ */
+public interface AdminService {
+ /**
+ * method provide consumerCache
+ *
+ * @param dMaaPContext
+ * @throws IOException
+ */
+ void showConsumerCache(DMaaPContext dMaaPContext) throws IOException,AccessDeniedException;
+
+ /**
+ * method drops consumer cache
+ *
+ * @param dMaaPContext
+ * @throws JSONException
+ * @throws IOException
+ */
+ void dropConsumerCache(DMaaPContext dMaaPContext) throws JSONException, IOException,AccessDeniedException;
+
+
+ /**
+ * Get list of blacklisted ips
+ * @param dMaaPContext context
+ * @throws IOException ex
+ * @throws AccessDeniedException ex
+ */
+ void getBlacklist ( DMaaPContext dMaaPContext ) throws IOException, AccessDeniedException;
+
+ /**
+ * Add ip to blacklist
+ * @param dMaaPContext context
+ * @param ip ip
+ * @throws IOException ex
+ * @throws ConfigDbException ex
+ * @throws AccessDeniedException ex
+ */
+ void addToBlacklist ( DMaaPContext dMaaPContext, String ip ) throws IOException, ConfigDbException, AccessDeniedException;
+
+ /**
+ * Remove ip from blacklist
+ * @param dMaaPContext context
+ * @param ip ip
+ * @throws IOException ex
+ * @throws ConfigDbException ex
+ * @throws AccessDeniedException ex
+ */
+ void removeFromBlacklist ( DMaaPContext dMaaPContext, String ip ) throws IOException, ConfigDbException, AccessDeniedException;
+
+}
diff --git a/src/main/java/com/att/nsa/cambria/service/ApiKeysService.java b/src/main/java/com/att/nsa/cambria/service/ApiKeysService.java
new file mode 100644
index 0000000..6fc9c0d
--- /dev/null
+++ b/src/main/java/com/att/nsa/cambria/service/ApiKeysService.java
@@ -0,0 +1,105 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.cambria.service;
+
+import java.io.IOException;
+
+import com.att.nsa.cambria.beans.ApiKeyBean;
+import com.att.nsa.cambria.beans.DMaaPContext;
+import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
+import com.att.nsa.configs.ConfigDbException;
+import com.att.nsa.security.db.NsaApiDb.KeyExistsException;
+
+/**
+ * Declaring all the method in interface that is mainly used for authentication
+ * purpose.
+ *
+ *
+ */
+
+public interface ApiKeysService {
+ /**
+ * This method declaration for getting all ApiKey that has generated on
+ * server.
+ *
+ * @param dmaapContext
+ * @throws ConfigDbException
+ * @throws IOException
+ */
+
+ public void getAllApiKeys(DMaaPContext dmaapContext)
+ throws ConfigDbException, IOException;
+
+ /**
+ * Getting information about specific ApiKey
+ *
+ * @param dmaapContext
+ * @param apikey
+ * @throws ConfigDbException
+ * @throws IOException
+ */
+
+ public void getApiKey(DMaaPContext dmaapContext, String apikey)
+ throws ConfigDbException, IOException;
+
+ /**
+ * Thid method is used for create a particular ApiKey
+ *
+ * @param dmaapContext
+ * @param nsaApiKey
+ * @throws KeyExistsException
+ * @throws ConfigDbException
+ * @throws IOException
+ */
+
+ public void createApiKey(DMaaPContext dmaapContext, ApiKeyBean nsaApiKey)
+ throws KeyExistsException, ConfigDbException, IOException;
+
+ /**
+ * This method is used for update ApiKey that is already generated on
+ * server.
+ *
+ * @param dmaapContext
+ * @param apikey
+ * @param nsaApiKey
+ * @throws ConfigDbException
+ * @throws IOException
+ * @throws AccessDeniedException
+ * @throws com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException
+ */
+ public void updateApiKey(DMaaPContext dmaapContext, String apikey,
+ ApiKeyBean nsaApiKey) throws ConfigDbException, IOException,AccessDeniedException
+ ;
+
+ /**
+ * This method is used for delete specific ApiKey
+ *
+ * @param dmaapContext
+ * @param apikey
+ * @throws ConfigDbException
+ * @throws IOException
+ * @throws AccessDeniedException
+ */
+
+ public void deleteApiKey(DMaaPContext dmaapContext, String apikey)
+ throws ConfigDbException, IOException,AccessDeniedException;
+}
diff --git a/src/main/java/com/att/nsa/cambria/service/EventsService.java b/src/main/java/com/att/nsa/cambria/service/EventsService.java
new file mode 100644
index 0000000..477538d
--- /dev/null
+++ b/src/main/java/com/att/nsa/cambria/service/EventsService.java
@@ -0,0 +1,75 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.cambria.service;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import com.att.nsa.cambria.CambriaApiException;
+import com.att.nsa.cambria.backends.ConsumerFactory.UnavailableException;
+import com.att.nsa.cambria.beans.DMaaPContext;
+import com.att.nsa.cambria.metabroker.Broker.TopicExistsException;
+import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
+import com.att.nsa.configs.ConfigDbException;
+import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
+
+/**
+ *
+ * @author author
+ *
+ */
+public interface EventsService {
+ /**
+ *
+ * @param ctx
+ * @param topic
+ * @param consumerGroup
+ * @param clientId
+ * @throws ConfigDbException
+ * @throws TopicExistsException
+ * @throws AccessDeniedException
+ * @throws UnavailableException
+ * @throws CambriaApiException
+ * @throws IOException
+ */
+ public void getEvents(DMaaPContext ctx, String topic, String consumerGroup, String clientId)
+ throws ConfigDbException, TopicExistsException,UnavailableException,
+ CambriaApiException, IOException,AccessDeniedException;
+
+ /**
+ *
+ * @param ctx
+ * @param topic
+ * @param msg
+ * @param defaultPartition
+ * @param requestTime
+ * @throws ConfigDbException
+ * @throws AccessDeniedException
+ * @throws TopicExistsException
+ * @throws CambriaApiException
+ * @throws IOException
+ */
+ public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition,
+ final String requestTime) throws ConfigDbException, AccessDeniedException, TopicExistsException,
+ CambriaApiException, IOException,missingReqdSetting;
+
+}
diff --git a/src/main/java/com/att/nsa/cambria/service/MMService.java b/src/main/java/com/att/nsa/cambria/service/MMService.java
new file mode 100644
index 0000000..5c14674
--- /dev/null
+++ b/src/main/java/com/att/nsa/cambria/service/MMService.java
@@ -0,0 +1,68 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.cambria.service;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.json.JSONException;
+
+import com.att.nsa.cambria.CambriaApiException;
+import com.att.nsa.cambria.backends.ConsumerFactory.UnavailableException;
+import com.att.nsa.cambria.beans.DMaaPContext;
+import com.att.nsa.cambria.metabroker.Broker.TopicExistsException;
+import com.att.nsa.configs.ConfigDbException;
+import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
+import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
+
+/**
+ * Contains the logic for executing calls to the Mirror Maker agent tool.
+ *
+ * @author <a href="mailto:"></a>
+ *
+ * @since May 25, 2016
+ */
+
+public interface MMService {
+
+ /*
+ * this method calls the add white list method of a Mirror Maker agent API
+ */
+ public void addWhiteList();
+
+ /*
+ * this method calls the remove white list method of a Mirror Maker agent API
+ */
+ public void removeWhiteList();
+
+ /*
+ * This method calls the list white list method of a Mirror Maker agent API
+ */
+ public void listWhiteList();
+
+ public String subscribe(DMaaPContext ctx, String topic, String consumerGroup, String clientId) throws ConfigDbException, TopicExistsException,
+ AccessDeniedException, UnavailableException, CambriaApiException, IOException;
+
+ public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition,
+ final String requestTime) throws ConfigDbException, AccessDeniedException, TopicExistsException,
+ CambriaApiException, IOException, missingReqdSetting;
+}
diff --git a/src/main/java/com/att/nsa/cambria/service/MetricsService.java b/src/main/java/com/att/nsa/cambria/service/MetricsService.java
new file mode 100644
index 0000000..6b11682
--- /dev/null
+++ b/src/main/java/com/att/nsa/cambria/service/MetricsService.java
@@ -0,0 +1,54 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.cambria.service;
+
+/**
+ * @author
+ *
+ */
+import java.io.IOException;
+
+import com.att.nsa.cambria.CambriaApiException;
+import com.att.nsa.cambria.beans.DMaaPContext;
+
+/**
+ *
+ * @author author
+ *
+ */
+public interface MetricsService {
+ /**
+ *
+ * @param ctx
+ * @throws IOException
+ */
+ public void get(DMaaPContext ctx) throws IOException;
+
+ /**
+ *
+ * @param ctx
+ * @param name
+ * @throws IOException
+ * @throws CambriaApiException
+ */
+ public void getMetricByName(DMaaPContext ctx, String name) throws IOException, CambriaApiException;
+}
diff --git a/src/main/java/com/att/nsa/cambria/service/TopicService.java b/src/main/java/com/att/nsa/cambria/service/TopicService.java
new file mode 100644
index 0000000..9ed39af
--- /dev/null
+++ b/src/main/java/com/att/nsa/cambria/service/TopicService.java
@@ -0,0 +1,176 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.cambria.service;
+
+import java.io.IOException;
+
+import org.json.JSONException;
+
+import com.att.nsa.cambria.CambriaApiException;
+import com.att.nsa.cambria.beans.DMaaPContext;
+import com.att.nsa.cambria.beans.TopicBean;
+import com.att.nsa.cambria.metabroker.Broker.TopicExistsException;
+import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
+import com.att.nsa.configs.ConfigDbException;
+
+/**
+ * interface provide all the topic related operations
+ *
+ * @author author
+ *
+ */
+public interface TopicService {
+ /**
+ * method fetch details of all the topics
+ *
+ * @param dmaapContext
+ * @throws JSONException
+ * @throws ConfigDbException
+ * @throws IOException
+ */
+ void getTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException;
+ void getAllTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException;
+
+ /**
+ * method fetch details of specific topic
+ *
+ * @param dmaapContext
+ * @param topicName
+ * @throws ConfigDbException
+ * @throws IOException
+ * @throws TopicExistsException
+ */
+ void getTopic(DMaaPContext dmaapContext, String topicName)
+ throws ConfigDbException, IOException, TopicExistsException;
+
+ /**
+ * method used to create the topic
+ *
+ * @param dmaapContext
+ * @param topicBean
+ * @throws CambriaApiException
+ * @throws TopicExistsException
+ * @throws IOException
+ * @throws AccessDeniedException
+ * @throws JSONException
+ */
+
+ void createTopic(DMaaPContext dmaapContext, TopicBean topicBean)
+ throws CambriaApiException, TopicExistsException, IOException, AccessDeniedException;
+
+ /**
+ * method used to delete to topic
+ *
+ * @param dmaapContext
+ * @param topicName
+ * @throws IOException
+ * @throws AccessDeniedException
+ * @throws ConfigDbException
+ * @throws CambriaApiException
+ * @throws TopicExistsException
+ */
+
+ void deleteTopic(DMaaPContext dmaapContext, String topicName)
+ throws IOException, AccessDeniedException, ConfigDbException, CambriaApiException, TopicExistsException;
+
+ /**
+ * method provides list of all the publishers associated with a topic
+ *
+ * @param dmaapContext
+ * @param topicName
+ * @throws IOException
+ * @throws ConfigDbException
+ * @throws TopicExistsException
+ */
+ void getPublishersByTopicName(DMaaPContext dmaapContext, String topicName)
+ throws IOException, ConfigDbException, TopicExistsException;
+
+ /**
+ * method provides details of all the consumer associated with a specific
+ * topic
+ *
+ * @param dmaapContext
+ * @param topicName
+ * @throws IOException
+ * @throws ConfigDbException
+ * @throws TopicExistsException
+ */
+ void getConsumersByTopicName(DMaaPContext dmaapContext, String topicName)
+ throws IOException, ConfigDbException, TopicExistsException;
+
+ /**
+ * method provides publishing right to a specific topic
+ *
+ * @param dmaapContext
+ * @param topicName
+ * @param producerId
+ * @throws AccessDeniedException
+ * @throws ConfigDbException
+ * @throws IOException
+ * @throws TopicExistsException
+ */
+ void permitPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
+ throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,CambriaApiException;
+
+ /**
+ * method denies any specific publisher from a topic
+ *
+ * @param dmaapContext
+ * @param topicName
+ * @param producerId
+ * @throws AccessDeniedException
+ * @throws ConfigDbException
+ * @throws IOException
+ * @throws TopicExistsException
+ */
+ void denyPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
+ throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,CambriaApiException;
+
+ /**
+ * method provide consuming right to a specific user on a topic
+ *
+ * @param dmaapContext
+ * @param topicName
+ * @param consumerId
+ * @throws AccessDeniedException
+ * @throws ConfigDbException
+ * @throws IOException
+ * @throws TopicExistsException
+ */
+ void permitConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
+ throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,CambriaApiException;
+
+ /**
+ * method denies a particular user's consuming right on a topic
+ *
+ * @param dmaapContext
+ * @param topicName
+ * @param consumerId
+ * @throws AccessDeniedException
+ * @throws ConfigDbException
+ * @throws IOException
+ * @throws TopicExistsException
+ */
+ void denyConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
+ throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,CambriaApiException;
+
+}
diff --git a/src/main/java/com/att/nsa/cambria/service/TransactionService.java b/src/main/java/com/att/nsa/cambria/service/TransactionService.java
new file mode 100644
index 0000000..109b2c8
--- /dev/null
+++ b/src/main/java/com/att/nsa/cambria/service/TransactionService.java
@@ -0,0 +1,61 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.cambria.service;
+
+import java.io.IOException;
+
+import com.att.aft.dme2.internal.jettison.json.JSONException;
+import com.att.nsa.cambria.beans.DMaaPContext;
+import com.att.nsa.cambria.transaction.TransactionObj;
+import com.att.nsa.configs.ConfigDbException;
+
+/**
+ *
+ * @author author
+ *
+ */
+public interface TransactionService {
+ /**
+ *
+ * @param trnObj
+ */
+ void checkTransaction(TransactionObj trnObj);
+
+ /**
+ *
+ * @param dmaapContext
+ * @throws ConfigDbException
+ * @throws IOException
+ */
+ void getAllTransactionObjs(DMaaPContext dmaapContext) throws ConfigDbException, IOException;
+
+ /**
+ *
+ * @param dmaapContext
+ * @param transactionId
+ * @throws ConfigDbException
+ * @throws JSONException
+ * @throws IOException
+ */
+ void getTransactionObj(DMaaPContext dmaapContext, String transactionId)
+ throws ConfigDbException, JSONException, IOException;
+}
diff --git a/src/main/java/com/att/nsa/cambria/service/UIService.java b/src/main/java/com/att/nsa/cambria/service/UIService.java
new file mode 100644
index 0000000..b6555fe
--- /dev/null
+++ b/src/main/java/com/att/nsa/cambria/service/UIService.java
@@ -0,0 +1,91 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+/**
+ *
+ */
+package com.att.nsa.cambria.service;
+
+import java.io.IOException;
+
+import com.att.nsa.cambria.beans.DMaaPContext;
+import com.att.nsa.configs.ConfigDbException;
+
+import kafka.common.TopicExistsException;
+
+/**
+ * @author author
+ *
+ */
+public interface UIService {
+ /**
+ * Returning template of hello page.
+ *
+ * @param dmaapContext
+ * @throws IOException
+ */
+ void hello(DMaaPContext dmaapContext) throws IOException;
+
+ /**
+ * Fetching list of all api keys and returning in a templated form for
+ * display
+ *
+ * @param dmaapContext
+ * @throws ConfigDbException
+ * @throws IOException
+ */
+ void getApiKeysTable(DMaaPContext dmaapContext) throws ConfigDbException,
+ IOException;
+
+ /**
+ * Fetching detials of apikey in a templated form for display
+ *
+ * @param dmaapContext
+ * @param apiKey
+ * @throws Exception
+ */
+ void getApiKey(DMaaPContext dmaapContext, final String apiKey)
+ throws Exception;
+
+ /**
+ * Fetching list of all the topics and returning in a templated form for
+ * display
+ *
+ * @param dmaapContext
+ * @throws ConfigDbException
+ * @throws IOException
+ */
+ void getTopicsTable(DMaaPContext dmaapContext) throws ConfigDbException,
+ IOException;
+
+ /**
+ * Fetching detials of topic in a templated form for display
+ *
+ * @param dmaapContext
+ * @param topic
+ * @throws ConfigDbException
+ * @throws IOException
+ * @throws TopicExistsException
+ */
+ void getTopic(DMaaPContext dmaapContext, final String topic)
+ throws ConfigDbException, IOException, TopicExistsException;
+
+}
diff --git a/src/main/java/com/att/nsa/cambria/service/impl/AdminServiceImpl.java b/src/main/java/com/att/nsa/cambria/service/impl/AdminServiceImpl.java
new file mode 100644
index 0000000..2585ab5
--- /dev/null
+++ b/src/main/java/com/att/nsa/cambria/service/impl/AdminServiceImpl.java
@@ -0,0 +1,188 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.cambria.service.impl;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Set;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.springframework.stereotype.Component;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import com.att.nsa.cambria.backends.Consumer;
+import com.att.nsa.cambria.backends.ConsumerFactory;
+import com.att.nsa.cambria.beans.DMaaPContext;
+import com.att.nsa.cambria.security.DMaaPAuthenticatorImpl;
+import com.att.nsa.cambria.service.AdminService;
+import com.att.nsa.cambria.utils.DMaaPResponseBuilder;
+import com.att.nsa.configs.ConfigDbException;
+import com.att.nsa.limits.Blacklist;
+import com.att.nsa.security.NsaApiKey;
+import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
+
+/**
+ * @author author
+ *
+ */
+@Component
+public class AdminServiceImpl implements AdminService {
+
+ //private Logger log = Logger.getLogger(AdminServiceImpl.class.toString());
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(AdminServiceImpl.class);
+ /**
+ * getConsumerCache returns consumer cache
+ * @param dMaaPContext context
+ * @throws IOException ex
+ * @throws AccessDeniedException
+ */
+ @Override
+ public void showConsumerCache(DMaaPContext dMaaPContext) throws IOException, AccessDeniedException {
+ adminAuthenticate(dMaaPContext);
+
+ JSONObject consumers = new JSONObject();
+ JSONArray jsonConsumersList = new JSONArray();
+
+ for (Consumer consumer : getConsumerFactory(dMaaPContext).getConsumers()) {
+ JSONObject consumerObject = new JSONObject();
+ consumerObject.put("name", consumer.getName());
+ consumerObject.put("created", consumer.getCreateTimeMs());
+ consumerObject.put("accessed", consumer.getLastAccessMs());
+ jsonConsumersList.put(consumerObject);
+ }
+
+ consumers.put("consumers", jsonConsumersList);
+ log.info("========== AdminServiceImpl: getConsumerCache: " + jsonConsumersList.toString() + "===========");
+ DMaaPResponseBuilder.respondOk(dMaaPContext, consumers);
+ }
+
+ /**
+ *
+ * dropConsumerCache() method clears consumer cache
+ * @param dMaaPContext context
+ * @throws JSONException ex
+ * @throws IOException ex
+ * @throws AccessDeniedException
+ *
+ */
+ @Override
+ public void dropConsumerCache(DMaaPContext dMaaPContext) throws JSONException, IOException, AccessDeniedException {
+ adminAuthenticate(dMaaPContext);
+ getConsumerFactory(dMaaPContext).dropCache();
+ DMaaPResponseBuilder.respondOkWithHtml(dMaaPContext, "Consumer cache cleared successfully");
+ // log.info("========== AdminServiceImpl: dropConsumerCache: Consumer
+ // Cache successfully dropped.===========");
+ }
+
+ /**
+ * getfConsumerFactory returns CosnumerFactory details
+ * @param dMaaPContext contxt
+ * @return ConsumerFactory obj
+ *
+ */
+ private ConsumerFactory getConsumerFactory(DMaaPContext dMaaPContext) {
+ return dMaaPContext.getConfigReader().getfConsumerFactory();
+ }
+
+ /**
+ * return ipblacklist
+ * @param dMaaPContext context
+ * @return blacklist obj
+ */
+ private static Blacklist getIpBlacklist(DMaaPContext dMaaPContext) {
+ return dMaaPContext.getConfigReader().getfIpBlackList();
+ }
+
+
+ /**
+ * Get list of blacklisted ips
+ */
+ @Override
+ public void getBlacklist ( DMaaPContext dMaaPContext ) throws IOException, AccessDeniedException
+ {
+ adminAuthenticate ( dMaaPContext );
+
+ DMaaPResponseBuilder.respondOk ( dMaaPContext,
+ new JSONObject().put ( "blacklist", setToJsonArray ( getIpBlacklist (dMaaPContext).asSet() ) ) );
+ }
+
+ /**
+ * Add ip to blacklist
+ */
+ @Override
+ public void addToBlacklist ( DMaaPContext dMaaPContext, String ip ) throws IOException, ConfigDbException, AccessDeniedException
+ {
+ adminAuthenticate ( dMaaPContext );
+
+ getIpBlacklist (dMaaPContext).add ( ip );
+ DMaaPResponseBuilder.respondOkNoContent ( dMaaPContext );
+ }
+
+ /**
+ * Remove ip from blacklist
+ */
+ @Override
+ public void removeFromBlacklist ( DMaaPContext dMaaPContext, String ip ) throws IOException, ConfigDbException, AccessDeniedException
+ {
+ adminAuthenticate ( dMaaPContext );
+
+ getIpBlacklist (dMaaPContext).remove ( ip );
+ DMaaPResponseBuilder.respondOkNoContent ( dMaaPContext );
+ }
+
+ /**
+ * Authenticate if user is admin
+ * @param dMaaPContext context
+ * @throws AccessDeniedException ex
+ */
+ private static void adminAuthenticate ( DMaaPContext dMaaPContext ) throws AccessDeniedException
+ {
+
+ final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dMaaPContext);
+ if ( user == null || !user.getKey ().equals ( "admin" ) )
+ {
+ throw new AccessDeniedException ();
+ }
+ }
+
+ public static JSONArray setToJsonArray ( Set<?> fields )
+ {
+ return collectionToJsonArray ( fields );
+ }
+
+ public static JSONArray collectionToJsonArray ( Collection<?> fields )
+ {
+ final JSONArray a = new JSONArray ();
+ if ( fields != null )
+ {
+ for ( Object o : fields )
+ {
+ a.put ( o );
+ }
+ }
+ return a;
+ }
+
+}
diff --git a/src/main/java/com/att/nsa/cambria/service/impl/ApiKeysServiceImpl.java b/src/main/java/com/att/nsa/cambria/service/impl/ApiKeysServiceImpl.java
new file mode 100644
index 0000000..637d2fb
--- /dev/null
+++ b/src/main/java/com/att/nsa/cambria/service/impl/ApiKeysServiceImpl.java
@@ -0,0 +1,326 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.cambria.service.impl;
+
+import java.io.IOException;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import org.json.JSONArray;
+import org.json.JSONObject;
+import org.springframework.stereotype.Service;
+
+//import com.att.nsa.apiServer.util.Emailer;
+import com.att.nsa.cambria.utils.Emailer;
+import com.att.nsa.cambria.beans.ApiKeyBean;
+import com.att.nsa.cambria.beans.DMaaPContext;
+import com.att.nsa.cambria.constants.CambriaConstants;
+import com.att.nsa.cambria.security.DMaaPAuthenticatorImpl;
+import com.att.nsa.cambria.service.ApiKeysService;
+import com.att.nsa.cambria.utils.ConfigurationReader;
+import com.att.nsa.cambria.utils.DMaaPResponseBuilder;
+import com.att.nsa.configs.ConfigDbException;
+import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
+import com.att.nsa.security.NsaApiKey;
+import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
+import com.att.nsa.security.db.NsaApiDb;
+import com.att.nsa.security.db.NsaApiDb.KeyExistsException;
+import com.att.nsa.security.db.simple.NsaSimpleApiKey;
+
+/**
+ * Implementation of the ApiKeysService, this will provide the below operations,
+ * getAllApiKeys, getApiKey, createApiKey, updateApiKey, deleteApiKey
+ *
+ * @author author
+ */
+@Service
+public class ApiKeysServiceImpl implements ApiKeysService {
+
+ //private Logger log = Logger.getLogger(ApiKeysServiceImpl.class.toString());
+ private static final EELFLogger log = EELFManager.getInstance().getLogger(ApiKeysServiceImpl.class.toString());
+ /**
+ * This method will provide all the ApiKeys present in kafka server.
+ *
+ * @param dmaapContext
+ * @throws ConfigDbException
+ * @throws IOException
+ */
+ public void getAllApiKeys(DMaaPContext dmaapContext)
+ throws ConfigDbException, IOException {
+
+ ConfigurationReader configReader = dmaapContext.getConfigReader();
+
+ log.info("configReader : " + configReader.toString());
+
+ final JSONObject result = new JSONObject();
+ final JSONArray keys = new JSONArray();
+ result.put("apiKeys", keys);
+
+ NsaApiDb<NsaSimpleApiKey> apiDb = configReader.getfApiKeyDb();
+
+ for (String key : apiDb.loadAllKeys()) {
+ keys.put(key);
+ }
+ log.info("========== ApiKeysServiceImpl: getAllApiKeys: Api Keys are : "
+ + keys.toString() + "===========");
+ DMaaPResponseBuilder.respondOk(dmaapContext, result);
+ }
+
+ /**
+ * @param dmaapContext
+ * @param apikey
+ * @throws ConfigDbException
+ * @throws IOException
+ */
+ @Override
+ public void getApiKey(DMaaPContext dmaapContext, String apikey)
+ throws ConfigDbException, IOException {
+
+ String errorMsg = "Api key name is not mentioned.";
+ int errorCode = HttpStatusCodes.k400_badRequest;
+
+ if (null != apikey) {
+ NsaSimpleApiKey simpleApiKey = getApiKeyDb(dmaapContext)
+ .loadApiKey(apikey);
+
+
+ if (null != simpleApiKey) {
+ JSONObject result = simpleApiKey.asJsonObject();
+ DMaaPResponseBuilder.respondOk(dmaapContext, result);
+ log.info("========== ApiKeysServiceImpl: getApiKey : "
+ + result.toString() + "===========");
+ return;
+ } else {
+ errorMsg = "Api key [" + apikey + "] does not exist.";
+ errorCode = HttpStatusCodes.k404_notFound;
+ log.info("========== ApiKeysServiceImpl: getApiKey: Error : API Key does not exist. "
+ + "===========");
+ DMaaPResponseBuilder.respondWithError(dmaapContext, errorCode,
+ errorMsg);
+ throw new IOException();
+ }
+ }
+
+ }
+
+ /**
+ * @param dmaapContext
+ * @param nsaApiKey
+ * @throws KeyExistsException
+ * @throws ConfigDbException
+ * @throws IOException
+ */
+ @Override
+ public void createApiKey(DMaaPContext dmaapContext, ApiKeyBean nsaApiKey)
+ throws KeyExistsException, ConfigDbException, IOException {
+
+ log.debug("TopicService: : createApiKey....");
+
+
+ String contactEmail = nsaApiKey.getEmail();
+ final boolean emailProvided = contactEmail != null && contactEmail.length() > 0 && contactEmail.indexOf("@") > 1 ;
+ String kSetting_AllowAnonymousKeys= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"apiKeys.allowAnonymous");
+ if(null==kSetting_AllowAnonymousKeys) kSetting_AllowAnonymousKeys ="false";
+
+ // if ((contactEmail == null) || (contactEmail.length() == 0))
+ if ( kSetting_AllowAnonymousKeys.equalsIgnoreCase("true") && !emailProvided )
+ {
+ DMaaPResponseBuilder.respondWithErrorInJson(dmaapContext, 400, "You must provide an email address.");
+ return;
+ }
+
+
+
+
+ final NsaApiDb<NsaSimpleApiKey> apiKeyDb = getApiKeyDb(dmaapContext);
+ String apiKey = nsaApiKey.getKey();
+ String sharedSecret = nsaApiKey.getSharedSecret();
+ final NsaSimpleApiKey key = apiKeyDb.createApiKey(apiKey,
+ sharedSecret);
+
+ if (null != key) {
+
+ if (null != nsaApiKey.getEmail()) {
+ key.setContactEmail(nsaApiKey.getEmail());
+ }
+
+ if (null != nsaApiKey.getDescription()) {
+ key.setDescription(nsaApiKey.getDescription());
+ }
+
+ log.debug("=======ApiKeysServiceImpl: createApiKey : saving api key : "
+ + key.toString() + "=====");
+ apiKeyDb.saveApiKey(key);
+ // email out the secret to validate the email address
+ if ( emailProvided )
+ {
+ String body = "\n" + "Your email address was provided as the creator of new API key \""
+ + apiKey + "\".\n" + "\n" + "If you did not make this request, please let us know."
+ + " See http://sa2020.it.att.com:8888 for contact information, " + "but don't worry -"
+ + " the API key is useless without the information below, which has been provided "
+ + "only to you.\n" + "\n\n" + "For API key \"" + apiKey + "\", use API key secret:\n\n\t"
+ + sharedSecret + "\n\n" + "Note that it's normal to share the API key"
+ + " (" + apiKey + "). "
+ + "This is how you are granted access to resources " + "like a UEB topic or Flatiron scope. "
+ + "However, you should NOT share the API key's secret. " + "The API key is associated with your"
+ + " email alone. ALL access to data made with this " + "key will be your responsibility. If you "
+ + "share the secret, someone else can use the API key " + "to access proprietary data with your "
+ + "identity.\n" + "\n" + "Enjoy!\n" + "\n" + "The GFP/SA-2020 Team";
+
+ Emailer em = dmaapContext.getConfigReader().getSystemEmailer();
+ em.send(contactEmail, "New API Key", body);
+ }
+ log.debug("TopicService: : sending response.");
+
+ JSONObject o = key.asJsonObject();
+
+ o.put ( NsaSimpleApiKey.kApiSecretField,
+ emailProvided ?
+ "Emailed to " + contactEmail + "." :
+ key.getSecret ()
+ );
+ DMaaPResponseBuilder.respondOk(dmaapContext,
+ o);
+ /*o.put("secret", "Emailed to " + contactEmail + ".");
+ DMaaPResponseBuilder.respondOk(dmaapContext,
+ o); */
+ return;
+ } else {
+ log.debug("=======ApiKeysServiceImpl: createApiKey : Error in creating API Key.=====");
+ DMaaPResponseBuilder.respondWithError(dmaapContext,
+ HttpStatusCodes.k500_internalServerError,
+ "Failed to create api key.");
+ throw new KeyExistsException(apiKey);
+ }
+ }
+
+ /**
+ * @param dmaapContext
+ * @param apikey
+ * @param nsaApiKey
+ * @throws ConfigDbException
+ * @throws IOException
+ * @throws AccessDeniedException
+ */
+ @Override
+ public void updateApiKey(DMaaPContext dmaapContext, String apikey,
+ ApiKeyBean nsaApiKey) throws ConfigDbException, IOException, AccessDeniedException {
+
+ String errorMsg = "Api key name is not mentioned.";
+ int errorCode = HttpStatusCodes.k400_badRequest;
+
+ if (null != apikey) {
+ final NsaApiDb<NsaSimpleApiKey> apiKeyDb = getApiKeyDb(dmaapContext);
+ final NsaSimpleApiKey key = apiKeyDb.loadApiKey(apikey);
+ boolean shouldUpdate = false;
+
+ if (null != key) {
+ final NsaApiKey user = DMaaPAuthenticatorImpl
+ .getAuthenticatedUser(dmaapContext);
+
+ if (user == null || !user.getKey().equals(key.getKey())) {
+ throw new AccessDeniedException("You must authenticate with the key you'd like to update.");
+ }
+
+ if (null != nsaApiKey.getEmail()) {
+ key.setContactEmail(nsaApiKey.getEmail());
+ shouldUpdate = true;
+ }
+
+ if (null != nsaApiKey.getDescription()) {
+ key.setDescription(nsaApiKey.getDescription());
+ shouldUpdate = true;
+ }
+
+ if (shouldUpdate) {
+ apiKeyDb.saveApiKey(key);
+ }
+
+ log.info("======ApiKeysServiceImpl : updateApiKey : Key Updated Successfully :"
+ + key.toString() + "=========");
+ DMaaPResponseBuilder.respondOk(dmaapContext,
+ key.asJsonObject());
+ return;
+ }
+ } else {
+ errorMsg = "Api key [" + apikey + "] does not exist.";
+ errorCode = HttpStatusCodes.k404_notFound;
+ DMaaPResponseBuilder.respondWithError(dmaapContext, errorCode,
+ errorMsg);
+ log.info("======ApiKeysServiceImpl : updateApiKey : Error in Updating Key.============");
+ throw new IOException();
+ }
+ }
+
+ /**
+ * @param dmaapContext
+ * @param apikey
+ * @throws ConfigDbException
+ * @throws IOException
+ * @throws AccessDeniedException
+ */
+ @Override
+ public void deleteApiKey(DMaaPContext dmaapContext, String apikey)
+ throws ConfigDbException, IOException, AccessDeniedException {
+
+ String errorMsg = "Api key name is not mentioned.";
+ int errorCode = HttpStatusCodes.k400_badRequest;
+
+ if (null != apikey) {
+ final NsaApiDb<NsaSimpleApiKey> apiKeyDb = getApiKeyDb(dmaapContext);
+ final NsaSimpleApiKey key = apiKeyDb.loadApiKey(apikey);
+
+ if (null != key) {
+
+ final NsaApiKey user = DMaaPAuthenticatorImpl
+ .getAuthenticatedUser(dmaapContext);
+ if (user == null || !user.getKey().equals(key.getKey())) {
+ throw new AccessDeniedException("You don't own the API key.");
+ }
+
+ apiKeyDb.deleteApiKey(key);
+ log.info("======ApiKeysServiceImpl : deleteApiKey : Deleted Key successfully.============");
+ DMaaPResponseBuilder.respondOkWithHtml(dmaapContext,
+ "Api key [" + apikey + "] deleted successfully.");
+ return;
+ }
+ } else {
+ errorMsg = "Api key [" + apikey + "] does not exist.";
+ errorCode = HttpStatusCodes.k404_notFound;
+ DMaaPResponseBuilder.respondWithError(dmaapContext, errorCode,
+ errorMsg);
+ log.info("======ApiKeysServiceImpl : deleteApiKey : Error while deleting key.============");
+ throw new IOException();
+ }
+ }
+
+ /**
+ *
+ * @param dmaapContext
+ * @return
+ */
+ private NsaApiDb<NsaSimpleApiKey> getApiKeyDb(DMaaPContext dmaapContext) {
+ ConfigurationReader configReader = dmaapContext.getConfigReader();
+ return configReader.getfApiKeyDb();
+ }
+
+}
diff --git a/src/main/java/com/att/nsa/cambria/service/impl/BaseTransactionDbImpl.java b/src/main/java/com/att/nsa/cambria/service/impl/BaseTransactionDbImpl.java
new file mode 100644
index 0000000..cdbf57b
--- /dev/null
+++ b/src/main/java/com/att/nsa/cambria/service/impl/BaseTransactionDbImpl.java
@@ -0,0 +1,153 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.cambria.service.impl;
+
+import java.util.Set;
+import java.util.TreeSet;
+
+import com.att.nsa.cambria.transaction.DMaaPTransactionFactory;
+import com.att.nsa.cambria.transaction.DMaaPTransactionObj;
+import com.att.nsa.cambria.transaction.DMaaPTransactionObjDB;
+import com.att.nsa.cambria.transaction.TransactionObj;
+import com.att.nsa.configs.ConfigDb;
+import com.att.nsa.configs.ConfigDbException;
+import com.att.nsa.configs.ConfigPath;
+
+/**
+ * Persistent storage for Transaction objects built over an abstract config db.
+ *
+ * @author author
+ *
+ * @param <K>
+ */
+public class BaseTransactionDbImpl<K extends DMaaPTransactionObj> implements DMaaPTransactionObjDB<K> {
+
+ private final ConfigDb fDb;
+ private final ConfigPath fBasePath;
+ private final DMaaPTransactionFactory<K> fKeyFactory;
+
+ private static final String kStdRootPath = "/transaction";
+
+ private ConfigPath makePath(String transactionId) {
+ return fBasePath.getChild(transactionId);
+ }
+
+ /**
+ * Construct an Transaction db over the given config db at the standard
+ * location
+ *
+ * @param db
+ * @param keyFactory
+ * @throws ConfigDbException
+ */
+ public BaseTransactionDbImpl(ConfigDb db, DMaaPTransactionFactory<K> keyFactory) throws ConfigDbException {
+ this(db, kStdRootPath, keyFactory);
+ }
+
+ /**
+ * Construct an Transaction db over the given config db using the given root
+ * location
+ *
+ * @param db
+ * @param rootPath
+ * @param keyFactory
+ * @throws ConfigDbException
+ */
+ public BaseTransactionDbImpl(ConfigDb db, String rootPath, DMaaPTransactionFactory<K> keyFactory)
+ throws ConfigDbException {
+ fDb = db;
+ fBasePath = db.parse(rootPath);
+ fKeyFactory = keyFactory;
+
+ if (!db.exists(fBasePath)) {
+ db.store(fBasePath, "");
+ }
+ }
+
+ /**
+ * Create a new Transaction Obj. If one exists,
+ *
+ * @param id
+ * @return the new Transaction record
+ * @throws ConfigDbException
+ */
+ public synchronized K createTransactionObj(String id) throws KeyExistsException, ConfigDbException {
+ final ConfigPath path = makePath(id);
+ if (fDb.exists(path)) {
+ throw new KeyExistsException(id);
+ }
+
+ // make one, store it, return it
+ final K newKey = fKeyFactory.makeNewTransactionId(id);
+ fDb.store(path, newKey.serialize());
+ return newKey;
+ }
+
+ /**
+ * Save an Transaction record. This must be used after changing auxiliary
+ * data on the record. Note that the transaction object must exist (via
+ * createTransactionObj).
+ *
+ * @param transaction
+ * object
+ * @throws ConfigDbException
+ */
+ @Override
+ public synchronized void saveTransactionObj(K trnObj) throws ConfigDbException {
+ final ConfigPath path = makePath(trnObj.getId());
+ if (!fDb.exists(path) || !(trnObj instanceof TransactionObj)) {
+ throw new IllegalStateException(trnObj.getId() + " is not known to this database");
+ }
+ fDb.store(path, ((TransactionObj) trnObj).serialize());
+ }
+
+ /**
+ * Load an Transaction record based on the Transaction Id value
+ *
+ * @param transactionId
+ * @return an Transaction Object record or null
+ * @throws ConfigDbException
+ */
+ @Override
+ public synchronized K loadTransactionObj(String transactionId) throws ConfigDbException {
+ final String data = fDb.load(makePath(transactionId));
+ if (data != null) {
+ return fKeyFactory.makeNewTransactionObj(data);
+ }
+ return null;
+ }
+
+ /**
+ * Load all transactions known to this database. (This could be expensive.)
+ *
+ * @return a set of all Transaction objects
+ * @throws ConfigDbException
+ */
+ public synchronized Set<String> loadAllTransactionObjs() throws ConfigDbException {
+ final TreeSet<String> result = new TreeSet<String>();
+ for (ConfigPath cp : fDb.loadChildrenNames(fBasePath)) {
+ result.add(cp.getName());
+ }
+ return result;
+ }
+
+}
diff --git a/src/main/java/com/att/nsa/cambria/service/impl/EventsServiceImpl.java b/src/main/java/com/att/nsa/cambria/service/impl/EventsServiceImpl.java
new file mode 100644
index 0000000..3386f19
--- /dev/null
+++ b/src/main/java/com/att/nsa/cambria/service/impl/EventsServiceImpl.java
@@ -0,0 +1,788 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.cambria.service.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.LinkedList;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.http.HttpStatus;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import org.json.JSONObject;
+import org.json.JSONTokener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import com.att.ajsc.filemonitor.AJSCPropertiesMap;
+import com.att.nsa.cambria.CambriaApiException;
+import com.att.nsa.cambria.backends.Consumer;
+import com.att.nsa.cambria.backends.ConsumerFactory;
+import com.att.nsa.cambria.backends.ConsumerFactory.UnavailableException;
+import com.att.nsa.cambria.backends.MetricsSet;
+import com.att.nsa.cambria.backends.Publisher;
+import com.att.nsa.cambria.backends.Publisher.message;
+import com.att.nsa.cambria.beans.DMaaPCambriaLimiter;
+import com.att.nsa.cambria.beans.DMaaPContext;
+import com.att.nsa.cambria.beans.LogDetails;
+import com.att.nsa.cambria.constants.CambriaConstants;
+import com.att.nsa.cambria.exception.DMaaPAccessDeniedException;
+import com.att.nsa.cambria.exception.DMaaPErrorMessages;
+import com.att.nsa.cambria.exception.DMaaPResponseCode;
+import com.att.nsa.cambria.exception.ErrorResponse;
+import com.att.nsa.cambria.metabroker.Broker.TopicExistsException;
+import com.att.nsa.cambria.metabroker.Topic;
+import com.att.nsa.cambria.resources.CambriaEventSet;
+import com.att.nsa.cambria.resources.CambriaOutboundEventStream;
+import com.att.nsa.cambria.security.DMaaPAAFAuthenticator;
+import com.att.nsa.cambria.security.DMaaPAAFAuthenticatorImpl;
+import com.att.nsa.cambria.security.DMaaPAuthenticatorImpl;
+import com.att.nsa.cambria.service.EventsService;
+import com.att.nsa.cambria.utils.DMaaPResponseBuilder;
+import com.att.nsa.cambria.utils.Utils;
+import com.att.nsa.configs.ConfigDbException;
+import com.att.nsa.drumlin.service.standards.MimeTypes;
+import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
+import com.att.nsa.security.NsaApiKey;
+import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
+import com.att.nsa.util.rrConvertor;
+
+import kafka.producer.KeyedMessage;
+
+/**
+ * This class provides the functinality to publish and subscribe message to
+ * kafka
+ *
+ * @author author
+ *
+ */
+@Service
+public class EventsServiceImpl implements EventsService {
+ //private static final Logger LOG = Logger.getLogger(EventsServiceImpl.class);
+ private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventsServiceImpl.class);
+
+ private static final String BATCH_LENGTH = "event.batch.length";
+ private static final String TRANSFER_ENCODING = "Transfer-Encoding";
+ @Autowired
+ private DMaaPErrorMessages errorMessages;
+
+ //@Value("${metrics.send.cambria.topic}")
+ //private String metricsTopic;
+
+ /**
+ * @param ctx
+ * @param topic
+ * @param consumerGroup
+ * @param clientId
+ * @throws ConfigDbException,
+ * TopicExistsException, AccessDeniedException,
+ * UnavailableException, CambriaApiException, IOException
+ *
+ *
+ */
+ @Override
+ public void getEvents(DMaaPContext ctx, String topic, String consumerGroup, String clientId)
+ throws ConfigDbException, TopicExistsException, AccessDeniedException, UnavailableException,
+ CambriaApiException, IOException,DMaaPAccessDeniedException {
+ final long startTime = System.currentTimeMillis();
+ final HttpServletRequest req = ctx.getRequest();
+
+ boolean isAAFTopic=false;
+ // was this host blacklisted?
+ final String remoteAddr = Utils.getRemoteAddress(ctx);;
+ if ( ctx.getConfigReader().getfIpBlackList().contains ( remoteAddr ) )
+ {
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+ DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), "Source address [" + remoteAddr +
+ "] is blacklisted. Please contact the cluster management team."
+ ,null,Utils.getFormattedDate(new Date()),topic,
+ Utils.getUserApiKey(ctx.getRequest()),ctx.getRequest().getRemoteHost(),
+ null,null);
+ LOG.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+ }
+
+
+ int limit = CambriaConstants.kNoLimit;
+ if (req.getParameter("limit") != null) {
+ limit = Integer.parseInt(req.getParameter("limit"));
+ }
+
+ int timeoutMs= CambriaConstants.kNoTimeout;
+ String strtimeoutMS = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"timeout");
+ if(strtimeoutMS!=null)timeoutMs=Integer.parseInt(strtimeoutMS);
+ //int timeoutMs = ctx.getConfigReader().getSettings().getInt("timeout", CambriaConstants.kNoTimeout);
+ if (req.getParameter("timeout") != null) {
+ timeoutMs = Integer.parseInt(req.getParameter("timeout"));
+ }
+
+ // By default no filter is applied if filter is not passed as a
+ // parameter in the request URI
+ String topicFilter = CambriaConstants.kNoFilter;
+ if (null != req.getParameter("filter")) {
+ topicFilter = req.getParameter("filter");
+ }
+ // pretty to print the messaages in new line
+ String prettyval="0";
+ String strPretty=AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"pretty");
+ if (null!=strPretty)prettyval=strPretty;
+
+ String metaval="0";
+ String strmeta=AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"meta");
+ if (null!=strmeta)metaval=strmeta;
+
+ final boolean pretty = rrConvertor
+ .convertToBooleanBroad(prettyval);
+ // withMeta to print offset along with message
+ final boolean withMeta = rrConvertor
+ .convertToBooleanBroad(metaval);
+
+
+ /*final boolean pretty = rrConvertor
+ .convertToBooleanBroad(ctx.getConfigReader().getSettings().getString("pretty", "0"));
+ // withMeta to print offset along with message
+ final boolean withMeta = rrConvertor
+ .convertToBooleanBroad(ctx.getConfigReader().getSettings().getString("meta", "0"));
+*/
+ final LogWrap logger = new LogWrap ( topic, consumerGroup, clientId);
+ logger.info("fetch: timeout=" + timeoutMs + ", limit=" + limit + ", filter=" + topicFilter);
+
+ // is this user allowed to read this topic?
+ final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
+ final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
+
+ if (metatopic == null) {
+ // no such topic.
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.RESOURCE_NOT_FOUND.getResponseCode(),
+ errorMessages.getTopicNotExist()+"-[" + topic + "]",null,Utils.getFormattedDate(new Date()),topic,null,null,
+ clientId,ctx.getRequest().getRemoteHost());
+ LOG.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+ }
+ String metricTopicname= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"metrics.send.cambria.topic");
+ if (null==metricTopicname)
+ metricTopicname="msgrtr.apinode.metrics.dmaap";
+
+ if(null==ctx.getRequest().getHeader("Authorization")&& !topic.equalsIgnoreCase(metricTopicname))
+ {
+ if (null != metatopic.getOwner() && !("".equals(metatopic.getOwner()))){
+ // check permissions
+ metatopic.checkUserRead(user);
+ }
+ }
+ // if headers are not provided then user will be null
+ if(user == null && null!=ctx.getRequest().getHeader("Authorization"))
+ {
+ // the topic name will be sent by the client
+// String permission = "com.att.dmaap.mr.topic"+"|"+topic+"|"+"sub";
+ DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
+ String permission = aaf.aafPermissionString(topic, "sub");
+ if(!aaf.aafAuthentication(ctx.getRequest(), permission))
+ {
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
+ errorMessages.getNotPermitted1()+" read "+errorMessages.getNotPermitted2()+topic,null,Utils.getFormattedDate(new Date()),topic,null,null,
+ clientId,ctx.getRequest().getRemoteHost());
+ LOG.info(errRes.toString());
+ throw new DMaaPAccessDeniedException(errRes);
+
+ }
+ isAAFTopic = true;
+ }
+ Consumer c = null;
+ try {
+ final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
+
+ final DMaaPCambriaLimiter rl = ctx.getConfigReader().getfRateLimiter();
+ rl.onCall(topic, consumerGroup, clientId);
+
+ c = ctx.getConfigReader().getfConsumerFactory().getConsumerFor(topic, consumerGroup, clientId, timeoutMs);
+
+ /* final CambriaOutboundEventStream coes = new CambriaOutboundEventStream.Builder(c,
+ ctx.getConfigReader().getSettings()).timeout(timeoutMs).limit(limit).filter(topicFilter)
+ .pretty(pretty).withMeta(withMeta)
+ // .atOffset(topicOffset)
+ .build();*/
+ final CambriaOutboundEventStream coes = new CambriaOutboundEventStream.Builder(c).timeout(timeoutMs).limit(limit).filter(topicFilter)
+ .pretty(pretty).withMeta(withMeta).build();
+ coes.setDmaapContext(ctx);
+ coes.setTopic(metatopic);
+ if( isTransEnabled() || isAAFTopic ){
+ coes.setTransEnabled(true);
+ }else{
+ coes.setTransEnabled(false);
+ }
+ coes.setTopicStyle(isAAFTopic);
+
+ DMaaPResponseBuilder.setNoCacheHeadings(ctx);
+
+ DMaaPResponseBuilder.respondOkWithStream(ctx, MediaType.APPLICATION_JSON, coes);
+
+ // No IOException thrown during respondOkWithStream, so commit the
+ // new offsets to all the brokers
+ c.commitOffsets();
+ final int sent = coes.getSentCount();
+
+ metricsSet.consumeTick(sent);
+ rl.onSend(topic, consumerGroup, clientId, sent);
+
+ final long elapsedMs = System.currentTimeMillis() - startTime;
+ logger.info("Sent " + sent + " msgs in " + elapsedMs + " ms; committed to offset " + c.getOffset());
+
+ } catch (UnavailableException excp) {
+ logger.warn(excp.getMessage(), excp);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
+ DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
+ errorMessages.getServerUnav()+ excp.getMessage(),null,Utils.getFormattedDate(new Date()),topic,null,null,
+ clientId,ctx.getRequest().getRemoteHost());
+ LOG.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+
+ } catch (CambriaApiException excp) {
+ logger.warn(excp.getMessage(), excp);
+ throw excp;
+ } catch (Exception excp) {
+ logger.warn("Couldn't respond to client, closing cambria consumer", excp);
+ ctx.getConfigReader().getfConsumerFactory().destroyConsumer(topic, consumerGroup, clientId);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
+ DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
+ "Couldn't respond to client, closing cambria consumer"+ excp.getMessage(),null,Utils.getFormattedDate(new Date()),topic,null,null,
+ clientId,ctx.getRequest().getRemoteHost());
+ LOG.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+ } finally {
+ // If no cache, close the consumer now that we're done with it.
+ boolean kSetting_EnableCache = ConsumerFactory.kDefault_IsCacheEnabled;
+ String strkSetting_EnableCache=AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,ConsumerFactory.kSetting_EnableCache);
+ if(null!=strkSetting_EnableCache) kSetting_EnableCache=Boolean.parseBoolean(strkSetting_EnableCache);
+ //if (!ctx.getConfigReader().getSettings().getBoolean(ConsumerFactory.kSetting_EnableCache, ConsumerFactory.kDefault_IsCacheEnabled) && (c != null)) {
+ if (!kSetting_EnableCache && (c != null)) {
+ c.close();
+
+ }
+ }
+ }
+
+ /**
+ * @throws missingReqdSetting
+ *
+ */
+ @Override
+ public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition,
+ final String requestTime) throws ConfigDbException, AccessDeniedException, TopicExistsException,
+ CambriaApiException, IOException, missingReqdSetting,DMaaPAccessDeniedException {
+
+ // is this user allowed to write to this topic?
+ final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
+ final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
+ boolean isAAFTopic=false;
+
+ // was this host blacklisted?
+ final String remoteAddr = Utils.getRemoteAddress(ctx);
+
+ if ( ctx.getConfigReader().getfIpBlackList().contains ( remoteAddr ) )
+ {
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+ DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), "Source address [" + remoteAddr +
+ "] is blacklisted. Please contact the cluster management team."
+ ,null,Utils.getFormattedDate(new Date()),topic,
+ Utils.getUserApiKey(ctx.getRequest()),ctx.getRequest().getRemoteHost(),
+ null,null);
+ LOG.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+ }
+
+ String topicNameStd = null;
+
+ // topicNameStd= ctx.getConfigReader().getSettings().getString("enforced.topic.name.AAF");
+ topicNameStd= com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,"enforced.topic.name.AAF");
+ String metricTopicname= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"metrics.send.cambria.topic");
+ if (null==metricTopicname)
+ metricTopicname="msgrtr.apinode.metrics.dmaap";
+ boolean topicNameEnforced=false;
+ if (null != topicNameStd && topic.startsWith(topicNameStd) )
+ {
+ topicNameEnforced = true;
+ }
+
+ //Here check if the user has rights to publish on the topic
+ //( This will be called when no auth is added or when UEB API Key Authentication is used)
+ //checkUserWrite(user) method will throw an error when there is no Auth header added or when the
+ //user has no publish rights
+
+ if(null != metatopic && null != metatopic.getOwner() && !("".equals(metatopic.getOwner())) && null==ctx.getRequest().getHeader("Authorization") && !topic.equalsIgnoreCase(metricTopicname))
+ {
+ metatopic.checkUserWrite(user);
+ }
+
+
+
+ // if headers are not provided then user will be null
+ if(topicNameEnforced || (user == null && null!=ctx.getRequest().getHeader("Authorization") && !topic.equalsIgnoreCase(metricTopicname)))
+ {
+ // the topic name will be sent by the client
+ // String permission = "com.att.dmaap.mr.topic"+"|"+topic+"|"+"pub";
+ DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
+ String permission = aaf.aafPermissionString(topic, "pub");
+ if(!aaf.aafAuthentication(ctx.getRequest(), permission))
+ {
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
+ errorMessages.getNotPermitted1()+" publish "+errorMessages.getNotPermitted2()+topic,null,Utils.getFormattedDate(new Date()),topic,
+ Utils.getUserApiKey(ctx.getRequest()),ctx.getRequest().getRemoteHost(),
+ null,null);
+ LOG.info(errRes.toString());
+ throw new DMaaPAccessDeniedException(errRes);
+ }
+ isAAFTopic=true;
+ }
+
+ final HttpServletRequest req = ctx.getRequest();
+
+ // check for chunked input
+ boolean chunked = false;
+ if (null != req.getHeader(TRANSFER_ENCODING)) {
+ chunked = req.getHeader(TRANSFER_ENCODING).contains("chunked");
+ }
+ // get the media type, or set it to a generic value if it wasn't
+ // provided
+ String mediaType = req.getContentType();
+ if (mediaType == null || mediaType.length() == 0) {
+ mediaType = MimeTypes.kAppGenericBinary;
+ }
+
+ if (mediaType.contains("charset=UTF-8")) {
+ mediaType = mediaType.replace("; charset=UTF-8", "").trim();
+ }
+
+ String istransidUEBtopicreqd = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"transidUEBtopicreqd");
+ boolean istransidreqd=false;
+ if (null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true")){
+ istransidreqd = true;
+ }
+
+ if (isAAFTopic || istransidreqd ) {
+ pushEventsWithTransaction(ctx, msg, topic, defaultPartition, requestTime, chunked, mediaType);
+ }
+ else
+ {
+ pushEvents(ctx, topic, msg, defaultPartition, chunked, mediaType);
+ }
+
+
+ }
+
+ /**
+ *
+ * @param ctx
+ * @param topic
+ * @param msg
+ * @param defaultPartition
+ * @param chunked
+ * @param mediaType
+ * @throws ConfigDbException
+ * @throws AccessDeniedException
+ * @throws TopicExistsException
+ * @throws CambriaApiException
+ * @throws IOException
+ */
+ private void pushEvents(DMaaPContext ctx, String topic, InputStream msg, String defaultPartition,
+ boolean chunked, String mediaType) throws ConfigDbException, AccessDeniedException, TopicExistsException,
+ CambriaApiException, IOException {
+ final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
+
+ // setup the event set
+ final CambriaEventSet events = new CambriaEventSet(mediaType, msg, chunked, defaultPartition);
+
+ // start processing, building a batch to push to the backend
+ final long startMs = System.currentTimeMillis();
+ long count = 0;
+
+ long maxEventBatch=1024 * 16;
+ String batchlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,BATCH_LENGTH);
+ if(null!=batchlen)maxEventBatch=Long.parseLong(batchlen);
+
+ // long maxEventBatch = ctx.getConfigReader().getSettings().getLong(BATCH_LENGTH, 1024 * 16);
+ final LinkedList<Publisher.message> batch = new LinkedList<Publisher.message>();
+ final ArrayList<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();
+
+ try {
+ // for each message...
+ Publisher.message m = null;
+ while ((m = events.next()) != null) {
+ // add the message to the batch
+ batch.add(m);
+ final KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, m.getKey(),
+ m.getMessage());
+ kms.add(data);
+ // check if the batch is full
+ final int sizeNow = batch.size();
+ if (sizeNow > maxEventBatch) {
+ ctx.getConfigReader().getfPublisher().sendBatchMessage(topic, kms);
+ kms.clear();
+ batch.clear();
+ metricsSet.publishTick(sizeNow);
+ count += sizeNow;
+ }
+ }
+
+ // send the pending batch
+ final int sizeNow = batch.size();
+ if (sizeNow > 0) {
+ ctx.getConfigReader().getfPublisher().sendBatchMessage(topic, kms);
+ kms.clear();
+ batch.clear();
+ metricsSet.publishTick(sizeNow);
+ count += sizeNow;
+ }
+
+ final long endMs = System.currentTimeMillis();
+ final long totalMs = endMs - startMs;
+
+ LOG.info("Published " + count + " msgs in " + totalMs + "ms for topic " + topic);
+
+ // build a responseP
+ final JSONObject response = new JSONObject();
+ response.put("count", count);
+ response.put("serverTimeMs", totalMs);
+ DMaaPResponseBuilder.respondOk(ctx, response);
+
+ } catch (Exception excp) {
+ int status = HttpStatus.SC_NOT_FOUND;
+ String errorMsg=null;
+ if(excp instanceof CambriaApiException) {
+ status = ((CambriaApiException) excp).getStatus();
+ JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
+ JSONObject errObject = new JSONObject(jsonTokener);
+ errorMsg = (String) errObject.get("message");
+
+ }
+ ErrorResponse errRes = new ErrorResponse(status,
+ DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
+ errorMessages.getPublishMsgError()+":"+topic+"."+errorMessages.getPublishMsgCount()+count+"."+errorMsg,null,Utils.getFormattedDate(new Date()),topic,
+ null,ctx.getRequest().getRemoteHost(),
+ null,null);
+ LOG.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+
+
+ }
+ }
+
+ /**
+ *
+ * @param ctx
+ * @param inputStream
+ * @param topic
+ * @param partitionKey
+ * @param requestTime
+ * @param chunked
+ * @param mediaType
+ * @throws ConfigDbException
+ * @throws AccessDeniedException
+ * @throws TopicExistsException
+ * @throws IOException
+ * @throws CambriaApiException
+ */
+ private void pushEventsWithTransaction(DMaaPContext ctx, InputStream inputStream, final String topic,
+ final String partitionKey, final String requestTime, final boolean chunked, final String mediaType)
+ throws ConfigDbException, AccessDeniedException, TopicExistsException, IOException,
+ CambriaApiException {
+
+ final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
+
+ // setup the event set
+ final CambriaEventSet events = new CambriaEventSet(mediaType, inputStream, chunked, partitionKey);
+
+ // start processing, building a batch to push to the backend
+ final long startMs = System.currentTimeMillis();
+ long count = 0;
+ long maxEventBatch = 1024 * 16;
+ String evenlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,BATCH_LENGTH);
+ if(null!=evenlen)maxEventBatch=Long.parseLong(evenlen);
+ //final long maxEventBatch = ctx.getConfigReader().getSettings().getLong(BATCH_LENGTH, 1024 * 16);
+ final LinkedList<Publisher.message> batch = new LinkedList<Publisher.message>();
+ final ArrayList<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();
+
+ Publisher.message m = null;
+ int messageSequence = 1;
+ Long batchId = 1L;
+ final boolean transactionEnabled = true;
+ int publishBatchCount=0;
+ SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SS");
+
+ //LOG.warn("Batch Start Id: " + Utils.getFromattedBatchSequenceId(batchId));
+ try {
+ // for each message...
+ batchId=DMaaPContext.getBatchID();
+
+ String responseTransactionId = null;
+
+ while ((m = events.next()) != null) {
+
+ //LOG.warn("Batch Start Id: " + Utils.getFromattedBatchSequenceId(batchId));
+
+
+ addTransactionDetailsToMessage(m, topic, ctx.getRequest(), requestTime, messageSequence, batchId,
+ transactionEnabled);
+ messageSequence++;
+
+ // add the message to the batch
+ batch.add(m);
+
+ responseTransactionId = m.getLogDetails().getTransactionId();
+
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put("message", m.getMessage());
+ jsonObject.put("transactionId", responseTransactionId);
+ final KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, m.getKey(),
+ jsonObject.toString());
+ kms.add(data);
+
+ // check if the batch is full
+ final int sizeNow = batch.size();
+ if (sizeNow >= maxEventBatch) {
+ String startTime = sdf.format(new Date());
+ LOG.info("Batch Start Details:[serverIp="+ctx.getRequest().getLocalAddr()+",Batch Start Id=" + batchId+"]");
+ try {
+ ctx.getConfigReader().getfPublisher().sendBatchMessage(topic, kms);
+ //transactionLogs(batch);
+ for (message msg : batch) {
+ LogDetails logDetails = msg.getLogDetails();
+ LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
+ }
+ } catch (Exception excp) {
+
+ int status = HttpStatus.SC_NOT_FOUND;
+ String errorMsg=null;
+ if(excp instanceof CambriaApiException) {
+ status = ((CambriaApiException) excp).getStatus();
+ JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
+ JSONObject errObject = new JSONObject(jsonTokener);
+ errorMsg = (String) errObject.get("message");
+ }
+ ErrorResponse errRes = new ErrorResponse(status,
+ DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
+ "Transaction-"+errorMessages.getPublishMsgError()+":"+topic+ "."+errorMessages.getPublishMsgCount()+count+"."+errorMsg,
+ null,Utils.getFormattedDate(new Date()),topic,
+ Utils.getUserApiKey(ctx.getRequest()),ctx.getRequest().getRemoteHost(),
+ null,null);
+ LOG.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+ }
+ kms.clear();
+ batch.clear();
+ metricsSet.publishTick(sizeNow);
+ publishBatchCount=sizeNow;
+ count += sizeNow;
+ //batchId++;
+ String endTime = sdf.format(new Date());
+ LOG.info("Batch End Details:[serverIp="+ctx.getRequest().getLocalAddr()+",Batch End Id=" + batchId
+ + ",Batch Total=" + publishBatchCount+",Batch Start Time="+startTime+",Batch End Time="+endTime+"]");
+ batchId=DMaaPContext.getBatchID();
+ }
+ }
+
+ // send the pending batch
+ final int sizeNow = batch.size();
+ if (sizeNow > 0) {
+ String startTime = sdf.format(new Date());
+ LOG.info("Batch Start Details:[serverIp="+ctx.getRequest().getLocalAddr()+",Batch Start Id=" + batchId+"]");
+ try {
+ ctx.getConfigReader().getfPublisher().sendBatchMessage(topic, kms);
+ //transactionLogs(batch);
+ for (message msg : batch) {
+ LogDetails logDetails = msg.getLogDetails();
+ LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
+ }
+ } catch (Exception excp) {
+ int status = HttpStatus.SC_NOT_FOUND;
+ String errorMsg=null;
+ if(excp instanceof CambriaApiException) {
+ status = ((CambriaApiException) excp).getStatus();
+ JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
+ JSONObject errObject = new JSONObject(jsonTokener);
+ errorMsg = (String) errObject.get("message");
+ }
+
+ ErrorResponse errRes = new ErrorResponse(status,
+ DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
+ "Transaction-"+errorMessages.getPublishMsgError()+":"+topic+"."+ errorMessages.getPublishMsgCount()+count+"."+errorMsg,
+ null,Utils.getFormattedDate(new Date()),topic,
+ Utils.getUserApiKey(ctx.getRequest()),ctx.getRequest().getRemoteHost(),
+ null,null);
+ LOG.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+ }
+ kms.clear();
+ metricsSet.publishTick(sizeNow);
+ count += sizeNow;
+ //batchId++;
+ String endTime = sdf.format(new Date());
+ publishBatchCount=sizeNow;
+ LOG.info("Batch End Details:[serverIp="+ctx.getRequest().getLocalAddr()+",Batch End Id=" + batchId
+ + ",Batch Total=" + publishBatchCount+",Batch Start Time="+startTime+",Batch End Time="+endTime+"]");
+ }
+
+ final long endMs = System.currentTimeMillis();
+ final long totalMs = endMs - startMs;
+
+ LOG.info("Published " + count + " msgs in " + totalMs + "ms for topic " + topic);
+
+ if (null != responseTransactionId) {
+ ctx.getResponse().setHeader("transactionId", Utils.getResponseTransactionId(responseTransactionId));
+ }
+
+ // build a response
+ final JSONObject response = new JSONObject();
+ response.put("count", count);
+ response.put("serverTimeMs", totalMs);
+ DMaaPResponseBuilder.respondOk(ctx, response);
+
+ } catch (Exception excp) {
+ int status = HttpStatus.SC_NOT_FOUND;
+ String errorMsg=null;
+ if(excp instanceof CambriaApiException) {
+ status = ((CambriaApiException) excp).getStatus();
+ JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
+ JSONObject errObject = new JSONObject(jsonTokener);
+ errorMsg = (String) errObject.get("message");
+ }
+
+ ErrorResponse errRes = new ErrorResponse(
+ status,
+ DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
+ "Transaction-"+errorMessages.getPublishMsgError()+":"+topic+"."+errorMessages.getPublishMsgCount()+count+"."+errorMsg,null,Utils.getFormattedDate(new Date()),topic,
+ Utils.getUserApiKey(ctx.getRequest()),ctx.getRequest().getRemoteHost(),
+ null,null);
+ LOG.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+ }
+ }
+
+ /**
+ *
+ * @param msg
+ * @param topic
+ * @param request
+ * @param messageCreationTime
+ * @param messageSequence
+ * @param batchId
+ * @param transactionEnabled
+ */
+ private static void addTransactionDetailsToMessage(message msg, final String topic, HttpServletRequest request,
+ final String messageCreationTime, final int messageSequence, final Long batchId,
+ final boolean transactionEnabled) {
+ LogDetails logDetails = generateLogDetails(topic, request, messageCreationTime, messageSequence, batchId,
+ transactionEnabled);
+ logDetails.setMessageLengthInBytes(Utils.messageLengthInBytes(msg.getMessage()));
+ msg.setTransactionEnabled(transactionEnabled);
+ msg.setLogDetails(logDetails);
+ }
+
+
+
+ /**
+ *
+ * @author author
+ *
+ */
+ private static class LogWrap {
+ private final String fId;
+
+ /**
+ * constructor initialization
+ *
+ * @param topic
+ * @param cgroup
+ * @param cid
+ */
+ public LogWrap(String topic, String cgroup, String cid) {
+ fId = "[" + topic + "/" + cgroup + "/" + cid + "] ";
+ }
+
+ /**
+ *
+ * @param msg
+ */
+ public void info(String msg) {
+ LOG.info(fId + msg);
+ }
+
+ /**
+ *
+ * @param msg
+ * @param t
+ */
+ public void warn(String msg, Exception t) {
+ LOG.warn(fId + msg, t);
+ }
+
+ }
+
+ private boolean isTransEnabled() {
+ String istransidUEBtopicreqd = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"transidUEBtopicreqd");
+ boolean istransidreqd=false;
+ if ((null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true")) ){
+ istransidreqd = true;
+ }
+
+ return istransidreqd;
+
+ }
+
+ private static LogDetails generateLogDetails(final String topicName, HttpServletRequest request,
+ final String messageTimestamp, int messageSequence, Long batchId, final boolean transactionEnabled) {
+ LogDetails logDetails = new LogDetails();
+ logDetails.setTopicId(topicName);
+ logDetails.setMessageTimestamp(messageTimestamp);
+ logDetails.setPublisherId(Utils.getUserApiKey(request));
+ logDetails.setPublisherIp(request.getRemoteHost());
+ logDetails.setMessageBatchId(batchId);
+ logDetails.setMessageSequence(String.valueOf(messageSequence));
+ logDetails.setTransactionEnabled(transactionEnabled);
+ logDetails.setTransactionIdTs(Utils.getFormattedDate(new Date()));
+ logDetails.setServerIp(request.getLocalAddr());
+ return logDetails;
+ }
+
+ /*public String getMetricsTopic() {
+ return metricsTopic;
+ }
+
+ public void setMetricsTopic(String metricsTopic) {
+ this.metricsTopic = metricsTopic;
+ }*/
+
+} \ No newline at end of file
diff --git a/src/main/java/com/att/nsa/cambria/service/impl/MMServiceImpl.java b/src/main/java/com/att/nsa/cambria/service/impl/MMServiceImpl.java
new file mode 100644
index 0000000..4f6a9a1
--- /dev/null
+++ b/src/main/java/com/att/nsa/cambria/service/impl/MMServiceImpl.java
@@ -0,0 +1,605 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.cambria.service.impl;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.LinkedList;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.Context;
+
+import org.apache.http.HttpStatus;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+import org.json.JSONObject;
+import org.json.JSONTokener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Service;
+
+import com.att.ajsc.filemonitor.AJSCPropertiesMap;
+import com.att.nsa.cambria.CambriaApiException;
+import com.att.nsa.cambria.backends.Consumer;
+import com.att.nsa.cambria.backends.ConsumerFactory;
+import com.att.nsa.cambria.backends.ConsumerFactory.UnavailableException;
+import com.att.nsa.cambria.backends.Publisher.message;
+import com.att.nsa.cambria.backends.MetricsSet;
+import com.att.nsa.cambria.backends.Publisher;
+
+import com.att.nsa.cambria.beans.DMaaPContext;
+import com.att.nsa.cambria.beans.LogDetails;
+import com.att.nsa.cambria.constants.CambriaConstants;
+import com.att.nsa.cambria.exception.DMaaPAccessDeniedException;
+import com.att.nsa.cambria.exception.DMaaPErrorMessages;
+import com.att.nsa.cambria.exception.DMaaPResponseCode;
+import com.att.nsa.cambria.exception.ErrorResponse;
+import com.att.nsa.cambria.metabroker.Broker.TopicExistsException;
+import com.att.nsa.cambria.metabroker.Topic;
+import com.att.nsa.cambria.resources.CambriaEventSet;
+import com.att.nsa.cambria.resources.CambriaOutboundEventStream;
+import com.att.nsa.cambria.security.DMaaPAAFAuthenticator;
+import com.att.nsa.cambria.security.DMaaPAAFAuthenticatorImpl;
+import com.att.nsa.cambria.service.MMService;
+import com.att.nsa.cambria.utils.ConfigurationReader;
+import com.att.nsa.cambria.utils.DMaaPResponseBuilder;
+import com.att.nsa.cambria.utils.Utils;
+import com.att.nsa.configs.ConfigDbException;
+import com.att.nsa.drumlin.service.standards.MimeTypes;
+import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
+
+import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
+import com.att.nsa.util.rrConvertor;
+
+import kafka.producer.KeyedMessage;
+
+@Service
+public class MMServiceImpl implements MMService {
+ private static final String BATCH_LENGTH = "event.batch.length";
+ private static final String TRANSFER_ENCODING = "Transfer-Encoding";
+ //private static final Logger LOG = Logger.getLogger(MMServiceImpl.class);
+ private static final EELFLogger LOG = EELFManager.getInstance().getLogger(MMServiceImpl.class);
+ @Autowired
+ private DMaaPErrorMessages errorMessages;
+
+ @Autowired
+ @Qualifier("configurationReader")
+ private ConfigurationReader configReader;
+
+ // HttpServletRequest object
+ @Context
+ private HttpServletRequest request;
+
+ // HttpServletResponse object
+ @Context
+ private HttpServletResponse response;
+
+ @Override
+ public void addWhiteList() {
+
+ }
+
+ @Override
+ public void removeWhiteList() {
+
+ }
+
+ @Override
+ public void listWhiteList() {
+
+ }
+
+ @Override
+ public String subscribe(DMaaPContext ctx, String topic, String consumerGroup, String clientId)
+ throws ConfigDbException, TopicExistsException, AccessDeniedException, UnavailableException,
+ CambriaApiException, IOException {
+
+ // final long startTime = System.currentTimeMillis();
+ final HttpServletRequest req = ctx.getRequest();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ // was this host blacklisted?
+ final String remoteAddr = Utils.getRemoteAddress(ctx);
+
+ if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) {
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+ DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
+ "Source address [" + remoteAddr + "] is blacklisted. Please contact the cluster management team.",
+ null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
+ ctx.getRequest().getRemoteHost(), null, null);
+ LOG.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+ }
+
+ int limit = CambriaConstants.kNoLimit;
+
+ if (req.getParameter("limit") != null) {
+ limit = Integer.parseInt(req.getParameter("limit"));
+ }
+ limit = 1;
+ // int timeoutMs = 60000;
+ int timeoutMs = CambriaConstants.kNoTimeout;
+ String strtimeoutMS = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "timeout");
+ if (strtimeoutMS != null)
+ timeoutMs = Integer.parseInt(strtimeoutMS);
+ // int timeoutMs = ctx.getConfigReader().getSettings().getInt("timeout",
+ // CambriaConstants.kNoTimeout);
+ if (req.getParameter("timeout") != null) {
+ timeoutMs = Integer.parseInt(req.getParameter("timeout"));
+ }
+
+ // By default no filter is applied if filter is not passed as a
+ // parameter in the request URI
+ String topicFilter = CambriaConstants.kNoFilter;
+ if (null != req.getParameter("filter")) {
+ topicFilter = req.getParameter("filter");
+ }
+ // pretty to print the messaages in new line
+ String prettyval = "0";
+ String strPretty = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "pretty");
+ if (null != strPretty)
+ prettyval = strPretty;
+
+ String metaval = "0";
+ String strmeta = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "meta");
+ if (null != strmeta)
+ metaval = strmeta;
+
+ final boolean pretty = rrConvertor.convertToBooleanBroad(prettyval);
+ // withMeta to print offset along with message
+ final boolean withMeta = rrConvertor.convertToBooleanBroad(metaval);
+
+ // is this user allowed to read this topic?
+ //final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
+ final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
+
+ if (metatopic == null) {
+ // no such topic.
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+ DMaaPResponseCode.RESOURCE_NOT_FOUND.getResponseCode(),
+ errorMessages.getTopicNotExist() + "-[" + topic + "]", null, Utils.getFormattedDate(new Date()),
+ topic, null, null, clientId, ctx.getRequest().getRemoteHost());
+ LOG.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+ }
+ //String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "metrics.send.cambria.topic");
+ /*
+ * if (null==metricTopicname)
+ * metricTopicname="msgrtr.apinode.metrics.dmaap"; //else if(user!=null)
+ * if(null==ctx.getRequest().getHeader("Authorization")&&
+ * !topic.equalsIgnoreCase(metricTopicname)) { if (null !=
+ * metatopic.getOwner() && !("".equals(metatopic.getOwner()))){ // check
+ * permissions metatopic.checkUserRead(user); } }
+ */
+
+ Consumer c = null;
+ try {
+ final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
+
+ c = ctx.getConfigReader().getfConsumerFactory().getConsumerFor(topic, consumerGroup, clientId, timeoutMs);
+
+ final CambriaOutboundEventStream coes = new CambriaOutboundEventStream.Builder(c).timeout(timeoutMs)
+ .limit(limit).filter(topicFilter).pretty(pretty).withMeta(withMeta).build();
+ coes.setDmaapContext(ctx);
+ coes.setTopic(metatopic);
+
+ DMaaPResponseBuilder.setNoCacheHeadings(ctx);
+
+ try {
+ coes.write(baos);
+ } catch (Exception ex) {
+
+ }
+
+ c.commitOffsets();
+ final int sent = coes.getSentCount();
+
+ metricsSet.consumeTick(sent);
+
+ } catch (UnavailableException excp) {
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
+ DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
+ errorMessages.getServerUnav() + excp.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
+ null, null, clientId, ctx.getRequest().getRemoteHost());
+ LOG.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+
+ } catch (CambriaApiException excp) {
+
+ throw excp;
+ } catch (Exception excp) {
+
+ ctx.getConfigReader().getfConsumerFactory().destroyConsumer(topic, consumerGroup, clientId);
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
+ DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
+ "Couldn't respond to client, closing cambria consumer" + excp.getMessage(), null,
+ Utils.getFormattedDate(new Date()), topic, null, null, clientId, ctx.getRequest().getRemoteHost());
+ LOG.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+ } finally {
+
+ boolean kSetting_EnableCache = ConsumerFactory.kDefault_IsCacheEnabled;
+ String strkSetting_EnableCache = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+ ConsumerFactory.kSetting_EnableCache);
+ if (null != strkSetting_EnableCache)
+ kSetting_EnableCache = Boolean.parseBoolean(strkSetting_EnableCache);
+
+ if (!kSetting_EnableCache && (c != null)) {
+ c.close();
+
+ }
+ }
+ return baos.toString();
+ }
+
+ @Override
+ public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition,
+ final String requestTime) throws ConfigDbException, AccessDeniedException, TopicExistsException,
+ CambriaApiException, IOException, missingReqdSetting {
+
+ //final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
+ //final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
+
+ final String remoteAddr = Utils.getRemoteAddress(ctx);
+
+ if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) {
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+ DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
+ "Source address [" + remoteAddr + "] is blacklisted. Please contact the cluster management team.",
+ null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
+ ctx.getRequest().getRemoteHost(), null, null);
+ LOG.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+ }
+
+ String topicNameStd = null;
+
+ topicNameStd = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
+ "enforced.topic.name.AAF");
+ String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
+ "metrics.send.cambria.topic");
+ if (null == metricTopicname)
+ metricTopicname = "msgrtr.apinode.metrics.dmaap";
+ boolean topicNameEnforced = false;
+ if (null != topicNameStd && topic.startsWith(topicNameStd)) {
+ topicNameEnforced = true;
+ }
+
+ final HttpServletRequest req = ctx.getRequest();
+
+ boolean chunked = false;
+ if (null != req.getHeader(TRANSFER_ENCODING)) {
+ chunked = req.getHeader(TRANSFER_ENCODING).contains("chunked");
+ }
+
+ String mediaType = req.getContentType();
+ if (mediaType == null || mediaType.length() == 0) {
+ mediaType = MimeTypes.kAppGenericBinary;
+ }
+
+ if (mediaType.contains("charset=UTF-8")) {
+ mediaType = mediaType.replace("; charset=UTF-8", "").trim();
+ }
+
+ if (!topic.equalsIgnoreCase(metricTopicname)) {
+ pushEventsWithTransaction(ctx, msg, topic, defaultPartition, requestTime, chunked, mediaType);
+ } else {
+ pushEvents(ctx, topic, msg, defaultPartition, chunked, mediaType);
+ }
+ }
+
+ private static void addTransactionDetailsToMessage(message msg, final String topic, HttpServletRequest request,
+ final String messageCreationTime, final int messageSequence, final Long batchId,
+ final boolean transactionEnabled) {
+ LogDetails logDetails = generateLogDetails(topic, request, messageCreationTime, messageSequence, batchId,
+ transactionEnabled);
+ logDetails.setMessageLengthInBytes(Utils.messageLengthInBytes(msg.getMessage()));
+ msg.setTransactionEnabled(transactionEnabled);
+ msg.setLogDetails(logDetails);
+ }
+
+ private static LogDetails generateLogDetails(final String topicName, HttpServletRequest request,
+ final String messageTimestamp, int messageSequence, Long batchId, final boolean transactionEnabled) {
+ LogDetails logDetails = new LogDetails();
+ logDetails.setTopicId(topicName);
+ logDetails.setMessageTimestamp(messageTimestamp);
+ logDetails.setPublisherId(Utils.getUserApiKey(request));
+ logDetails.setPublisherIp(request.getRemoteHost());
+ logDetails.setMessageBatchId(batchId);
+ logDetails.setMessageSequence(String.valueOf(messageSequence));
+ logDetails.setTransactionEnabled(transactionEnabled);
+ logDetails.setTransactionIdTs(Utils.getFormattedDate(new Date()));
+ logDetails.setServerIp(request.getLocalAddr());
+ return logDetails;
+ }
+
+ private void pushEvents(DMaaPContext ctx, String topic, InputStream msg, String defaultPartition, boolean chunked,
+ String mediaType) throws ConfigDbException, AccessDeniedException, TopicExistsException,
+ CambriaApiException, IOException {
+ final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
+
+ // setup the event set
+ final CambriaEventSet events = new CambriaEventSet(mediaType, msg, chunked, defaultPartition);
+
+ // start processing, building a batch to push to the backend
+ final long startMs = System.currentTimeMillis();
+ long count = 0;
+
+ long maxEventBatch = 1024 * 16;
+ String batchlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH);
+ if (null != batchlen)
+ maxEventBatch = Long.parseLong(batchlen);
+
+ // long maxEventBatch =
+ // ctx.getConfigReader().getSettings().getLong(BATCH_LENGTH, 1024 * 16);
+ final LinkedList<Publisher.message> batch = new LinkedList<Publisher.message>();
+ final ArrayList<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();
+
+ try {
+ // for each message...
+ Publisher.message m = null;
+ while ((m = events.next()) != null) {
+ // add the message to the batch
+ batch.add(m);
+ final KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, m.getKey(),
+ m.getMessage());
+ kms.add(data);
+ // check if the batch is full
+ final int sizeNow = batch.size();
+ if (sizeNow > maxEventBatch) {
+ ctx.getConfigReader().getfPublisher().sendBatchMessage(topic, kms);
+ kms.clear();
+ batch.clear();
+ metricsSet.publishTick(sizeNow);
+ count += sizeNow;
+ }
+ }
+
+ // send the pending batch
+ final int sizeNow = batch.size();
+ if (sizeNow > 0) {
+ ctx.getConfigReader().getfPublisher().sendBatchMessage(topic, kms);
+ kms.clear();
+ batch.clear();
+ metricsSet.publishTick(sizeNow);
+ count += sizeNow;
+ }
+
+ final long endMs = System.currentTimeMillis();
+ final long totalMs = endMs - startMs;
+
+ LOG.info("Published " + count + " msgs in " + totalMs + "ms for topic " + topic);
+
+ // build a responseP
+ final JSONObject response = new JSONObject();
+ response.put("count", count);
+ response.put("serverTimeMs", totalMs);
+ // DMaaPResponseBuilder.respondOk(ctx, response);
+
+ } catch (Exception excp) {
+
+ int status = HttpStatus.SC_NOT_FOUND;
+ String errorMsg = null;
+ if (excp instanceof CambriaApiException) {
+ status = ((CambriaApiException) excp).getStatus();
+ JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
+ JSONObject errObject = new JSONObject(jsonTokener);
+ errorMsg = (String) errObject.get("message");
+
+ }
+ ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
+ errorMessages.getPublishMsgError() + ":" + topic + "." + errorMessages.getPublishMsgCount() + count
+ + "." + errorMsg,
+ null, Utils.getFormattedDate(new Date()), topic, null, ctx.getRequest().getRemoteHost(), null,
+ null);
+ LOG.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+
+ }
+ }
+
+ private void pushEventsWithTransaction(DMaaPContext ctx, InputStream inputStream, final String topic,
+ final String partitionKey, final String requestTime, final boolean chunked, final String mediaType)
+ throws ConfigDbException, AccessDeniedException, TopicExistsException, IOException,
+ CambriaApiException {
+
+ final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
+
+ // setup the event set
+ final CambriaEventSet events = new CambriaEventSet(mediaType, inputStream, chunked, partitionKey);
+
+ // start processing, building a batch to push to the backend
+ final long startMs = System.currentTimeMillis();
+ long count = 0;
+ long maxEventBatch = 1024 * 16;
+ String evenlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH);
+ if (null != evenlen)
+ maxEventBatch = Long.parseLong(evenlen);
+
+ final LinkedList<Publisher.message> batch = new LinkedList<Publisher.message>();
+ final ArrayList<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();
+
+ Publisher.message m = null;
+ int messageSequence = 1;
+ Long batchId = 1L;
+ final boolean transactionEnabled = true;
+ int publishBatchCount = 0;
+ SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SS");
+
+ // LOG.warn("Batch Start Id: " +
+ // Utils.getFromattedBatchSequenceId(batchId));
+ try {
+ // for each message...
+ batchId = DMaaPContext.getBatchID();
+
+ String responseTransactionId = null;
+
+ while ((m = events.next()) != null) {
+
+ // LOG.warn("Batch Start Id: " +
+ // Utils.getFromattedBatchSequenceId(batchId));
+
+ addTransactionDetailsToMessage(m, topic, ctx.getRequest(), requestTime, messageSequence, batchId,
+ transactionEnabled);
+ messageSequence++;
+
+ // add the message to the batch
+ batch.add(m);
+
+ responseTransactionId = m.getLogDetails().getTransactionId();
+
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put("message", m.getMessage());
+ jsonObject.put("transactionId", responseTransactionId);
+ final KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, m.getKey(),
+ jsonObject.toString());
+ kms.add(data);
+
+ // check if the batch is full
+ final int sizeNow = batch.size();
+ if (sizeNow >= maxEventBatch) {
+ String startTime = sdf.format(new Date());
+ LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id="
+ + batchId + "]");
+ try {
+ ctx.getConfigReader().getfPublisher().sendBatchMessage(topic, kms);
+ // transactionLogs(batch);
+ for (message msg : batch) {
+ LogDetails logDetails = msg.getLogDetails();
+ LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
+ }
+ } catch (Exception excp) {
+
+ int status = HttpStatus.SC_NOT_FOUND;
+ String errorMsg = null;
+ if (excp instanceof CambriaApiException) {
+ status = ((CambriaApiException) excp).getStatus();
+ JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
+ JSONObject errObject = new JSONObject(jsonTokener);
+ errorMsg = (String) errObject.get("message");
+ }
+ ErrorResponse errRes = new ErrorResponse(status,
+ DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
+ "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
+ + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
+ null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
+ ctx.getRequest().getRemoteHost(), null, null);
+ LOG.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+ }
+ kms.clear();
+ batch.clear();
+ metricsSet.publishTick(sizeNow);
+ publishBatchCount = sizeNow;
+ count += sizeNow;
+ // batchId++;
+ String endTime = sdf.format(new Date());
+ LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id="
+ + batchId + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime
+ + ",Batch End Time=" + endTime + "]");
+ batchId = DMaaPContext.getBatchID();
+ }
+ }
+
+ // send the pending batch
+ final int sizeNow = batch.size();
+ if (sizeNow > 0) {
+ String startTime = sdf.format(new Date());
+ LOG.info("Batch Start Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch Start Id="
+ + batchId + "]");
+ try {
+ ctx.getConfigReader().getfPublisher().sendBatchMessage(topic, kms);
+ // transactionLogs(batch);
+ for (message msg : batch) {
+ LogDetails logDetails = msg.getLogDetails();
+ LOG.info("Publisher Log Details : " + logDetails.getPublisherLogDetails());
+ }
+ } catch (Exception excp) {
+ int status = HttpStatus.SC_NOT_FOUND;
+ String errorMsg = null;
+ if (excp instanceof CambriaApiException) {
+ status = ((CambriaApiException) excp).getStatus();
+ JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
+ JSONObject errObject = new JSONObject(jsonTokener);
+ errorMsg = (String) errObject.get("message");
+ }
+
+ ErrorResponse errRes = new ErrorResponse(status,
+ DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
+ "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
+ + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
+ null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
+ ctx.getRequest().getRemoteHost(), null, null);
+ LOG.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+ }
+ kms.clear();
+ metricsSet.publishTick(sizeNow);
+ count += sizeNow;
+ // batchId++;
+ String endTime = sdf.format(new Date());
+ publishBatchCount = sizeNow;
+ LOG.info("Batch End Details:[serverIp=" + ctx.getRequest().getLocalAddr() + ",Batch End Id=" + batchId
+ + ",Batch Total=" + publishBatchCount + ",Batch Start Time=" + startTime + ",Batch End Time="
+ + endTime + "]");
+ }
+
+ final long endMs = System.currentTimeMillis();
+ final long totalMs = endMs - startMs;
+
+ LOG.info("Published " + count + " msgs in " + totalMs + "ms for topic " + topic);
+
+ // build a response
+ final JSONObject response = new JSONObject();
+ response.put("count", count);
+ response.put("serverTimeMs", totalMs);
+
+ } catch (Exception excp) {
+ int status = HttpStatus.SC_NOT_FOUND;
+ String errorMsg = null;
+ if (excp instanceof CambriaApiException) {
+ status = ((CambriaApiException) excp).getStatus();
+ JSONTokener jsonTokener = new JSONTokener(((CambriaApiException) excp).getBody());
+ JSONObject errObject = new JSONObject(jsonTokener);
+ errorMsg = (String) errObject.get("message");
+ }
+
+ ErrorResponse errRes = new ErrorResponse(status, DMaaPResponseCode.PARTIAL_PUBLISH_MSGS.getResponseCode(),
+ "Transaction-" + errorMessages.getPublishMsgError() + ":" + topic + "."
+ + errorMessages.getPublishMsgCount() + count + "." + errorMsg,
+ null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
+ ctx.getRequest().getRemoteHost(), null, null);
+ LOG.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+ }
+ }
+}
diff --git a/src/main/java/com/att/nsa/cambria/service/impl/MetricsServiceImpl.java b/src/main/java/com/att/nsa/cambria/service/impl/MetricsServiceImpl.java
new file mode 100644
index 0000000..1a1baf5
--- /dev/null
+++ b/src/main/java/com/att/nsa/cambria/service/impl/MetricsServiceImpl.java
@@ -0,0 +1,115 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.cambria.service.impl;
+
+import java.io.IOException;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import org.json.JSONObject;
+import org.springframework.stereotype.Component;
+
+import com.att.nsa.cambria.CambriaApiException;
+import com.att.nsa.cambria.backends.MetricsSet;
+import com.att.nsa.cambria.beans.DMaaPContext;
+import com.att.nsa.cambria.service.MetricsService;
+import com.att.nsa.cambria.utils.DMaaPResponseBuilder;
+import com.att.nsa.metrics.CdmMeasuredItem;
+
+/**
+ *
+ *
+ * This will provide all the generated metrics details also it can provide the
+ * get metrics details
+ *
+ *
+ * @author author
+ *
+ *
+ */
+@Component
+public class MetricsServiceImpl implements MetricsService {
+
+ //private static final Logger LOG = Logger.getLogger(MetricsService.class.toString());
+ private static final EELFLogger LOG = EELFManager.getInstance().getLogger(MetricsService.class);
+ /**
+ *
+ *
+ * @param ctx
+ * @throws IOException
+ *
+ *
+ * get Metric details
+ *
+ */
+ @Override
+
+ public void get(DMaaPContext ctx) throws IOException {
+ LOG.info("Inside : MetricsServiceImpl : get()");
+ final MetricsSet metrics = ctx.getConfigReader().getfMetrics();
+ DMaaPResponseBuilder.setNoCacheHeadings(ctx);
+ final JSONObject result = metrics.toJson();
+ DMaaPResponseBuilder.respondOk(ctx, result);
+ LOG.info("============ Metrics generated : " + result.toString() + "=================");
+
+ }
+
+
+ @Override
+ /**
+ *
+ * get Metric by name
+ *
+ *
+ * @param ctx
+ * @param name
+ * @throws IOException
+ * @throws CambriaApiException
+ *
+ *
+ */
+ public void getMetricByName(DMaaPContext ctx, String name) throws IOException, CambriaApiException {
+ LOG.info("Inside : MetricsServiceImpl : getMetricByName()");
+ final MetricsSet metrics = ctx.getConfigReader().getfMetrics();
+
+ final CdmMeasuredItem item = metrics.getItem(name);
+ /**
+ * check if item is null
+ */
+ if (item == null) {
+ throw new CambriaApiException(404, "No metric named [" + name + "].");
+ }
+
+ final JSONObject entry = new JSONObject();
+ entry.put("summary", item.summarize());
+ entry.put("raw", item.getRawValueString());
+
+ DMaaPResponseBuilder.setNoCacheHeadings(ctx);
+
+ final JSONObject result = new JSONObject();
+ result.put(name, entry);
+
+ DMaaPResponseBuilder.respondOk(ctx, result);
+ LOG.info("============ Metrics generated : " + entry.toString() + "=================");
+ }
+
+}
diff --git a/src/main/java/com/att/nsa/cambria/service/impl/TopicServiceImpl.java b/src/main/java/com/att/nsa/cambria/service/impl/TopicServiceImpl.java
new file mode 100644
index 0000000..658523d
--- /dev/null
+++ b/src/main/java/com/att/nsa/cambria/service/impl/TopicServiceImpl.java
@@ -0,0 +1,649 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+/**
+ *
+ */
+package com.att.nsa.cambria.service.impl;
+
+import java.io.IOException;
+
+import org.apache.http.HttpStatus;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import com.att.ajsc.filemonitor.AJSCPropertiesMap;
+import com.att.nsa.cambria.CambriaApiException;
+import com.att.nsa.cambria.beans.DMaaPContext;
+import com.att.nsa.cambria.beans.DMaaPKafkaMetaBroker;
+import com.att.nsa.cambria.beans.TopicBean;
+import com.att.nsa.cambria.constants.CambriaConstants;
+import com.att.nsa.cambria.exception.DMaaPAccessDeniedException;
+import com.att.nsa.cambria.exception.DMaaPErrorMessages;
+import com.att.nsa.cambria.exception.DMaaPResponseCode;
+import com.att.nsa.cambria.exception.ErrorResponse;
+import com.att.nsa.cambria.metabroker.Broker;
+import com.att.nsa.cambria.metabroker.Broker.TopicExistsException;
+import com.att.nsa.cambria.metabroker.Topic;
+import com.att.nsa.cambria.security.DMaaPAAFAuthenticator;
+import com.att.nsa.cambria.security.DMaaPAAFAuthenticatorImpl;
+import com.att.nsa.cambria.security.DMaaPAuthenticatorImpl;
+import com.att.nsa.cambria.service.TopicService;
+import com.att.nsa.cambria.utils.DMaaPResponseBuilder;
+import com.att.nsa.configs.ConfigDbException;
+import com.att.nsa.security.NsaAcl;
+import com.att.nsa.security.NsaApiKey;
+import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
+
+/**
+ * @author author
+ *
+ */
+@Service
+public class TopicServiceImpl implements TopicService {
+
+ //private static final Logger LOGGER = Logger.getLogger(TopicServiceImpl.class);
+ private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicServiceImpl.class);
+ @Autowired
+ private DMaaPErrorMessages errorMessages;
+
+ //@Value("${msgRtr.topicfactory.aaf}")
+ //private String mrFactory;
+
+
+ /**
+ * @param dmaapContext
+ * @throws JSONException
+ * @throws ConfigDbException
+ * @throws IOException
+ *
+ */
+ @Override
+ public void getTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
+
+ LOGGER.info("Fetching list of all the topics.");
+ JSONObject json = new JSONObject();
+
+ JSONArray topicsList = new JSONArray();
+
+ for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
+ topicsList.put(topic.getName());
+ }
+
+ json.put("topics", topicsList);
+
+ LOGGER.info("Returning list of all the topics.");
+ DMaaPResponseBuilder.respondOk(dmaapContext, json);
+
+ }
+
+ /**
+ * @param dmaapContext
+ * @throws JSONException
+ * @throws ConfigDbException
+ * @throws IOException
+ *
+ */
+ public void getAllTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
+
+ LOGGER.info("Fetching list of all the topics.");
+ JSONObject json = new JSONObject();
+
+ JSONArray topicsList = new JSONArray();
+
+ for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
+ JSONObject obj = new JSONObject();
+ obj.put("topicName", topic.getName());
+ //obj.put("description", topic.getDescription());
+ obj.put("owner", topic.getOwner());
+ obj.put("txenabled", topic.isTransactionEnabled());
+ topicsList.put(obj);
+ }
+
+ json.put("topics", topicsList);
+
+ LOGGER.info("Returning list of all the topics.");
+ DMaaPResponseBuilder.respondOk(dmaapContext, json);
+
+ }
+
+
+ /**
+ * @param dmaapContext
+ * @param topicName
+ * @throws ConfigDbException
+ * @throws IOException
+ * @throws TopicExistsException
+ */
+ @Override
+ public void getTopic(DMaaPContext dmaapContext, String topicName)
+ throws ConfigDbException, IOException, TopicExistsException {
+
+ LOGGER.info("Fetching details of topic " + topicName);
+ Topic t = getMetaBroker(dmaapContext).getTopic(topicName);
+
+ if (null == t) {
+ LOGGER.error("Topic [" + topicName + "] does not exist.");
+ throw new TopicExistsException("Topic [" + topicName + "] does not exist.");
+ }
+
+ JSONObject o = new JSONObject();
+ o.put ( "name", t.getName () );
+ o.put ( "description", t.getDescription () );
+
+ if (null!=t.getOwners ())
+ o.put ( "owner", t.getOwners ().iterator ().next () );
+ if(null!=t.getReaderAcl ())
+ o.put ( "readerAcl", aclToJson ( t.getReaderAcl () ) );
+ if(null!=t.getWriterAcl ())
+ o.put ( "writerAcl", aclToJson ( t.getWriterAcl () ) );
+
+ LOGGER.info("Returning details of topic " + topicName);
+ DMaaPResponseBuilder.respondOk(dmaapContext, o);
+
+ }
+
+
+ /**
+ * @param dmaapContext
+ * @param topicBean
+ * @throws CambriaApiException
+ * @throws AccessDeniedException
+ * @throws IOException
+ * @throws TopicExistsException
+ * @throws JSONException
+ *
+ *
+ *
+ */
+ @Override
+ public void createTopic(DMaaPContext dmaapContext, TopicBean topicBean)
+ throws CambriaApiException, DMaaPAccessDeniedException,IOException, TopicExistsException {
+
+ LOGGER.info("Creating topic " + topicBean.getTopicName());
+
+ final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
+ String key = null;
+ String appName=dmaapContext.getRequest().getHeader("AppName");
+ String enfTopicName= com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,"enforced.topic.name.AAF");
+
+ if(user != null)
+ {
+ key = user.getKey();
+
+ if( enfTopicName != null && topicBean.getTopicName().indexOf(enfTopicName) >=0 ) {
+
+ LOGGER.error("Failed to create topic"+topicBean.getTopicName()+", Authentication failed.");
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
+ errorMessages.getCreateTopicFail()+" "+errorMessages.getNotPermitted1()+" create "+errorMessages.getNotPermitted2());
+ LOGGER.info(errRes.toString());
+ throw new DMaaPAccessDeniedException(errRes);
+
+ }
+ }
+
+ //else if (user==null && (null==dmaapContext.getRequest().getHeader("Authorization") && null == dmaapContext.getRequest().getHeader("cookie")) ) {
+ else if (user == null && null==dmaapContext.getRequest().getHeader("Authorization") &&
+ (null == appName && null == dmaapContext.getRequest().getHeader("cookie"))) {
+ LOGGER.error("Failed to create topic"+topicBean.getTopicName()+", Authentication failed.");
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
+ errorMessages.getCreateTopicFail()+" "+errorMessages.getNotPermitted1()+" create "+errorMessages.getNotPermitted2());
+ LOGGER.info(errRes.toString());
+ throw new DMaaPAccessDeniedException(errRes);
+ }
+
+ if (user == null && (null!=dmaapContext.getRequest().getHeader("Authorization") ||
+ null != dmaapContext.getRequest().getHeader("cookie"))) {
+ //if (user == null && (null!=dmaapContext.getRequest().getHeader("Authorization") || null != dmaapContext.getRequest().getHeader("cookie"))) {
+ // ACL authentication is not provided so we will use the aaf authentication
+ LOGGER.info("Authorization the topic");
+
+ String permission = "";
+ String nameSpace="";
+ if(topicBean.getTopicName().indexOf(".")>1)
+ nameSpace = topicBean.getTopicName().substring(0,topicBean.getTopicName().lastIndexOf("."));
+
+ String mrFactoryVal=AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"msgRtr.topicfactory.aaf");
+
+ //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSettings_KafkaZookeeper);
+
+ permission = mrFactoryVal+nameSpace+"|create";
+ DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
+
+ if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
+ {
+
+ LOGGER.error("Failed to create topic"+topicBean.getTopicName()+", Authentication failed.");
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+ DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
+ errorMessages.getCreateTopicFail()+" "+errorMessages.getNotPermitted1()+" create "+errorMessages.getNotPermitted2());
+ LOGGER.info(errRes.toString());
+ throw new DMaaPAccessDeniedException(errRes);
+
+ }else{
+ // if user is null and aaf authentication is ok then key should be ""
+ //key = "";
+ /**
+ * Added as part of AAF user it should return username
+ */
+
+ key = dmaapContext.getRequest().getUserPrincipal().getName().toString();
+ LOGGER.info("key ==================== "+key);
+
+ }
+ }
+
+ try {
+ final String topicName = topicBean.getTopicName();
+ final String desc = topicBean.getTopicDescription();
+
+ final int partitions = topicBean.getPartitionCount();
+
+ final int replicas = topicBean.getReplicationCount();
+ boolean transactionEnabled = topicBean.isTransactionEnabled();
+
+
+ final Broker metabroker = getMetaBroker(dmaapContext);
+ final Topic t = metabroker.createTopic(topicName, desc, key, partitions, replicas,
+ transactionEnabled);
+
+ LOGGER.info("Topic created successfully. Sending response");
+ DMaaPResponseBuilder.respondOk(dmaapContext, topicToJson(t));
+ } catch (JSONException excp) {
+
+ LOGGER.error("Failed to create topic. Couldn't parse JSON data.", excp);
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
+ DMaaPResponseCode.INCORRECT_JSON.getResponseCode(),
+ errorMessages.getIncorrectJson());
+ LOGGER.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+
+ }
+ }
+
+ /**
+ * @param dmaapContext
+ * @param topicName
+ * @throws ConfigDbException
+ * @throws IOException
+ * @throws TopicExistsException
+ * @throws CambriaApiException
+ * @throws AccessDeniedException
+ */
+ @Override
+ public void deleteTopic(DMaaPContext dmaapContext, String topicName)
+ throws IOException, ConfigDbException, CambriaApiException, TopicExistsException, DMaaPAccessDeniedException, AccessDeniedException {
+
+ LOGGER.info("Deleting topic " + topicName);
+ final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
+
+ if (user == null && null!=dmaapContext.getRequest().getHeader("Authorization")) {
+ LOGGER.info("Authenticating the user, as ACL authentication is not provided");
+// String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
+ String permission = "";
+ String nameSpace = topicName.substring(0,topicName.lastIndexOf("."));
+ String mrFactoryVal=AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"msgRtr.topicfactory.aaf");
+// String tokens[] = topicName.split(".mr.topic.");
+ permission = mrFactoryVal+nameSpace+"|destroy";
+ DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
+ if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
+ {
+ LOGGER.error("Failed to delete topi"+topicName+". Authentication failed.");
+ ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+ DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
+ errorMessages.getCreateTopicFail()+" "+errorMessages.getNotPermitted1()+" delete "+errorMessages.getNotPermitted2());
+ LOGGER.info(errRes.toString());
+ throw new DMaaPAccessDeniedException(errRes);
+ }
+
+
+ }
+
+ final Broker metabroker = getMetaBroker(dmaapContext);
+ final Topic topic = metabroker.getTopic(topicName);
+
+ if (topic == null) {
+ LOGGER.error("Failed to delete topic. Topic [" + topicName + "] does not exist.");
+ throw new TopicExistsException("Failed to delete topic. Topic [" + topicName + "] does not exist.");
+ }
+
+ metabroker.deleteTopic(topicName);
+
+ LOGGER.info("Topic [" + topicName + "] deleted successfully. Sending response.");
+ DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Topic [" + topicName + "] deleted successfully");
+
+ }
+
+ /**
+ *
+ * @param dmaapContext
+ * @return
+ */
+ private DMaaPKafkaMetaBroker getMetaBroker(DMaaPContext dmaapContext) {
+ return (DMaaPKafkaMetaBroker) dmaapContext.getConfigReader().getfMetaBroker();
+ }
+
+ /**
+ * @param dmaapContext
+ * @param topicName
+ * @throws ConfigDbException
+ * @throws IOException
+ * @throws TopicExistsException
+ *
+ */
+ @Override
+ public void getPublishersByTopicName(DMaaPContext dmaapContext, String topicName)
+ throws ConfigDbException, IOException, TopicExistsException {
+ LOGGER.info("Retrieving list of all the publishers for topic " + topicName);
+ Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
+
+ if (topic == null) {
+ LOGGER.error("Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
+ throw new TopicExistsException(
+ "Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
+ }
+
+
+
+ final NsaAcl acl = topic.getWriterAcl();
+
+ LOGGER.info("Returning list of all the publishers for topic " + topicName + ". Sending response.");
+ DMaaPResponseBuilder.respondOk(dmaapContext, aclToJson(acl));
+
+ }
+
+ /**
+ *
+ * @param acl
+ * @return
+ */
+ private static JSONObject aclToJson(NsaAcl acl) {
+ final JSONObject o = new JSONObject();
+ if (acl == null) {
+ o.put("enabled", false);
+ o.put("users", new JSONArray());
+ } else {
+ o.put("enabled", acl.isActive());
+
+ final JSONArray a = new JSONArray();
+ for (String user : acl.getUsers()) {
+ a.put(user);
+ }
+ o.put("users", a);
+ }
+ return o;
+ }
+
+ /**
+ * @param dmaapContext
+ * @param topicName
+ */
+ @Override
+ public void getConsumersByTopicName(DMaaPContext dmaapContext, String topicName)
+ throws IOException, ConfigDbException, TopicExistsException {
+ LOGGER.info("Retrieving list of all the consumers for topic " + topicName);
+ Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
+
+ if (topic == null) {
+ LOGGER.error("Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
+ throw new TopicExistsException(
+ "Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
+ }
+
+ final NsaAcl acl = topic.getReaderAcl();
+
+ LOGGER.info("Returning list of all the consumers for topic " + topicName + ". Sending response.");
+ DMaaPResponseBuilder.respondOk(dmaapContext, aclToJson(acl));
+
+ }
+
+ /**
+ *
+ * @param t
+ * @return
+ */
+ private static JSONObject topicToJson(Topic t) {
+ final JSONObject o = new JSONObject();
+
+ o.put("name", t.getName());
+ o.put("description", t.getDescription());
+ o.put("owner", t.getOwner());
+ o.put("readerAcl", aclToJson(t.getReaderAcl()));
+ o.put("writerAcl", aclToJson(t.getWriterAcl()));
+
+ return o;
+ }
+
+ /**
+ * @param dmaapContext
+ * @param topicName
+ * @param producerId
+ * @throws ConfigDbException
+ * @throws IOException
+ * @throws TopicExistsException
+ * @throws AccessDeniedException
+ * @throws
+ *
+ */
+ @Override
+ public void permitPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
+ throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,CambriaApiException {
+
+ LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName);
+ final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
+
+// if (user == null) {
+//
+// LOGGER.info("Authenticating the user, as ACL authentication is not provided");
+//// String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
+//
+// DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
+// String permission = aaf.aafPermissionString(topicName, "manage");
+// if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
+// {
+// LOGGER.error("Failed to permit write access to producer [" + producerId + "] for topic " + topicName
+// + ". Authentication failed.");
+// ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+// DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
+// errorMessages.getNotPermitted1()+" <Grant publish permissions> "+errorMessages.getNotPermitted2()+ topicName);
+// LOGGER.info(errRes);
+// throw new DMaaPAccessDeniedException(errRes);
+// }
+// }
+
+ Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
+
+ if (null == topic) {
+ LOGGER.error("Failed to permit write access to producer [" + producerId + "] for topic. Topic [" + topicName
+ + "] does not exist.");
+ throw new TopicExistsException("Failed to permit write access to producer [" + producerId
+ + "] for topic. Topic [" + topicName + "] does not exist.");
+ }
+
+ topic.permitWritesFromUser(producerId, user);
+
+ LOGGER.info("Write access has been granted to producer [" + producerId + "] for topic [" + topicName
+ + "]. Sending response.");
+ DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Write access has been granted to publisher.");
+
+ }
+
+ /**
+ * @param dmaapContext
+ * @param topicName
+ * @param producerId
+ * @throws ConfigDbException
+ * @throws IOException
+ * @throws TopicExistsException
+ * @throws AccessDeniedException
+ * @throws DMaaPAccessDeniedException
+ *
+ */
+ @Override
+ public void denyPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
+ throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, DMaaPAccessDeniedException {
+
+ LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName);
+ final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
+// if (user == null) {
+//
+//// String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
+// DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
+// String permission = aaf.aafPermissionString(topicName, "manage");
+// if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
+// {
+// LOGGER.error("Failed to revoke write access to producer [" + producerId + "] for topic " + topicName
+// + ". Authentication failed.");
+// ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+// DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
+// errorMessages.getNotPermitted1()+" <Revoke publish permissions> "+errorMessages.getNotPermitted2()+ topicName);
+// LOGGER.info(errRes);
+// throw new DMaaPAccessDeniedException(errRes);
+//
+// }
+// }
+
+ Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
+
+ if (null == topic) {
+ LOGGER.error("Failed to revoke write access to producer [" + producerId + "] for topic. Topic [" + topicName
+ + "] does not exist.");
+ throw new TopicExistsException("Failed to revoke write access to producer [" + producerId
+ + "] for topic. Topic [" + topicName + "] does not exist.");
+ }
+
+ topic.denyWritesFromUser(producerId, user);
+
+ LOGGER.info("Write access has been revoked to producer [" + producerId + "] for topic [" + topicName
+ + "]. Sending response.");
+ DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Write access has been revoked for publisher.");
+
+ }
+
+ /**
+ * @param dmaapContext
+ * @param topicName
+ * @param consumerId
+ * @throws DMaaPAccessDeniedException
+ */
+ @Override
+ public void permitConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
+ throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, DMaaPAccessDeniedException {
+
+ LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName);
+ final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
+// if (user == null) {
+//
+//// String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
+// DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
+// String permission = aaf.aafPermissionString(topicName, "manage");
+// if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
+// {
+// LOGGER.error("Failed to permit read access to consumer [" + consumerId + "] for topic " + topicName
+// + ". Authentication failed.");
+// ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+// DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
+// errorMessages.getNotPermitted1()+" <Grant consume permissions> "+errorMessages.getNotPermitted2()+ topicName);
+// LOGGER.info(errRes);
+// throw new DMaaPAccessDeniedException(errRes);
+// }
+// }
+
+ Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
+
+ if (null == topic) {
+ LOGGER.error("Failed to permit read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
+ + "] does not exist.");
+ throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
+ + "] for topic. Topic [" + topicName + "] does not exist.");
+ }
+
+ topic.permitReadsByUser(consumerId, user);
+
+ LOGGER.info("Read access has been granted to consumer [" + consumerId + "] for topic [" + topicName
+ + "]. Sending response.");
+ DMaaPResponseBuilder.respondOkWithHtml(dmaapContext,
+ "Read access has been granted for consumer [" + consumerId + "] for topic [" + topicName + "].");
+ }
+
+ /**
+ * @param dmaapContext
+ * @param topicName
+ * @param consumerId
+ * @throws DMaaPAccessDeniedException
+ */
+ @Override
+ public void denyConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
+ throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, DMaaPAccessDeniedException {
+
+ LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName);
+ final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
+// if (user == null) {
+//// String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
+// DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
+// String permission = aaf.aafPermissionString(topicName, "manage");
+// if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
+// {
+// LOGGER.error("Failed to revoke read access to consumer [" + consumerId + "] for topic " + topicName
+// + ". Authentication failed.");
+// ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+// DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
+// errorMessages.getNotPermitted1()+" <Grant consume permissions> "+errorMessages.getNotPermitted2()+ topicName);
+// LOGGER.info(errRes);
+// throw new DMaaPAccessDeniedException(errRes);
+// }
+//
+//
+// }
+
+ Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
+
+ if (null == topic) {
+ LOGGER.error("Failed to revoke read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
+ + "] does not exist.");
+ throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
+ + "] for topic. Topic [" + topicName + "] does not exist.");
+ }
+
+ topic.denyReadsByUser(consumerId, user);
+
+ LOGGER.info("Read access has been revoked to consumer [" + consumerId + "] for topic [" + topicName
+ + "]. Sending response.");
+ DMaaPResponseBuilder.respondOkWithHtml(dmaapContext,
+ "Read access has been revoked for consumer [" + consumerId + "] for topic [" + topicName + "].");
+
+ }
+
+
+
+
+
+}
diff --git a/src/main/java/com/att/nsa/cambria/service/impl/TransactionServiceImpl.java b/src/main/java/com/att/nsa/cambria/service/impl/TransactionServiceImpl.java
new file mode 100644
index 0000000..9da2852
--- /dev/null
+++ b/src/main/java/com/att/nsa/cambria/service/impl/TransactionServiceImpl.java
@@ -0,0 +1,100 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.cambria.service.impl;
+
+import java.io.IOException;
+
+import org.springframework.stereotype.Service;
+
+import com.att.aft.dme2.internal.jettison.json.JSONException;
+import com.att.nsa.cambria.beans.DMaaPContext;
+import com.att.nsa.cambria.service.TransactionService;
+import com.att.nsa.cambria.transaction.TransactionObj;
+import com.att.nsa.configs.ConfigDbException;
+
+/**
+ * Once the transaction rest gateway will be using that time it will provide all
+ * the transaction details like fetching all the transactional objects or get
+ * any particular transaction object details
+ *
+ * @author author
+ *
+ */
+@Service
+public class TransactionServiceImpl implements TransactionService {
+
+ @Override
+ public void checkTransaction(TransactionObj trnObj) {
+ /* Need to implement the method */
+ }
+
+ @Override
+ public void getAllTransactionObjs(DMaaPContext dmaapContext)
+ throws ConfigDbException, IOException {
+
+ /*
+ * ConfigurationReader configReader = dmaapContext.getConfigReader();
+ *
+ * LOG.info("configReader : "+configReader.toString());
+ *
+ * final JSONObject result = new JSONObject (); final JSONArray
+ * transactionIds = new JSONArray (); result.put ( "transactionIds",
+ * transactionIds );
+ *
+ * DMaaPTransactionObjDB<DMaaPTransactionObj> transDb =
+ * configReader.getfTranDb();
+ *
+ * for (String transactionId : transDb.loadAllTransactionObjs()) {
+ * transactionIds.put (transactionId); } LOG.info(
+ * "========== TransactionServiceImpl: getAllTransactionObjs: Transaction objects are : "
+ * + transactionIds.toString()+"===========");
+ * DMaaPResponseBuilder.respondOk(dmaapContext, result);
+ */
+ }
+
+ @Override
+ public void getTransactionObj(DMaaPContext dmaapContext,
+ String transactionId) throws ConfigDbException, JSONException,
+ IOException {
+
+ /*
+ * if (null != transactionId) {
+ *
+ * ConfigurationReader configReader = dmaapContext.getConfigReader();
+ *
+ * DMaaPTransactionObj trnObj;
+ *
+ * trnObj = configReader.getfTranDb().loadTransactionObj(transactionId);
+ *
+ *
+ * if (null != trnObj) { trnObj.serialize(); JSONObject result =
+ * trnObj.asJsonObject(); DMaaPResponseBuilder.respondOk(dmaapContext,
+ * result);
+ * LOG.info("========== TransactionServiceImpl: getTransactionObj : "+
+ * result.toString()+"==========="); return; }
+ *
+ * } LOG.info(
+ * "========== TransactionServiceImpl: getTransactionObj: Error : Transaction object does not exist. "
+ * +"===========");
+ */
+ }
+}
diff --git a/src/main/java/com/att/nsa/cambria/service/impl/UIServiceImpl.java b/src/main/java/com/att/nsa/cambria/service/impl/UIServiceImpl.java
new file mode 100644
index 0000000..0fbf657
--- /dev/null
+++ b/src/main/java/com/att/nsa/cambria/service/impl/UIServiceImpl.java
@@ -0,0 +1,206 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package com.att.nsa.cambria.service.impl;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import org.json.JSONArray;
+import org.json.JSONObject;
+import org.springframework.stereotype.Service;
+
+import com.att.nsa.cambria.beans.DMaaPContext;
+import com.att.nsa.cambria.beans.DMaaPKafkaMetaBroker;
+import com.att.nsa.cambria.metabroker.Topic;
+import com.att.nsa.cambria.service.UIService;
+import com.att.nsa.cambria.utils.DMaaPResponseBuilder;
+import com.att.nsa.configs.ConfigDbException;
+import com.att.nsa.security.db.NsaApiDb;
+import com.att.nsa.security.db.simple.NsaSimpleApiKey;
+
+import kafka.common.TopicExistsException;
+
+/**
+ * @author author
+ *
+ */
+@Service
+public class UIServiceImpl implements UIService {
+
+ //private static final Logger LOGGER = Logger.getLogger(UIServiceImpl.class);
+ private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(UIServiceImpl.class);
+ /**
+ * Returning template of hello page
+ * @param dmaapContext
+ * @throws IOException
+ */
+ @Override
+ public void hello(DMaaPContext dmaapContext) throws IOException {
+ LOGGER.info("Returning template of hello page.");
+ DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "templates/hello.html");
+ }
+
+ /**
+ * Fetching list of all api keys and returning in a templated form for display.
+ * @param dmaapContext
+ * @throws ConfigDbException
+ * @throws IOException
+ */
+ @Override
+ public void getApiKeysTable(DMaaPContext dmaapContext) throws ConfigDbException, IOException {
+ // TODO - We need to work on the templates and how data will be set in
+ // the template
+ LOGGER.info("Fetching list of all api keys and returning in a templated form for display.");
+ Map<String, NsaSimpleApiKey> keyMap = getApiKeyDb(dmaapContext).loadAllKeyRecords();
+
+ LinkedList<JSONObject> keyList = new LinkedList<JSONObject>();
+
+ JSONObject jsonList = new JSONObject();
+
+ for (Entry<String, NsaSimpleApiKey> e : keyMap.entrySet()) {
+ final NsaSimpleApiKey key = e.getValue();
+ final JSONObject jsonObject = new JSONObject();
+ jsonObject.put("key", key.getKey());
+ jsonObject.put("email", key.getContactEmail());
+ jsonObject.put("description", key.getDescription());
+ keyList.add(jsonObject);
+ }
+
+ jsonList.put("apiKeys", keyList);
+
+ LOGGER.info("Returning list of all the api keys in JSON format for the template.");
+ // "templates/apiKeyList.html"
+ DMaaPResponseBuilder.respondOk(dmaapContext, jsonList);
+
+ }
+
+ /**
+ * @param dmaapContext
+ * @param apiKey
+ * @throws Exception
+ */
+ @Override
+ public void getApiKey(DMaaPContext dmaapContext, String apiKey) throws Exception {
+ // TODO - We need to work on the templates and how data will be set in
+ // the template
+ LOGGER.info("Fetching detials of apikey: " + apiKey);
+ final NsaSimpleApiKey key = getApiKeyDb(dmaapContext).loadApiKey(apiKey);
+
+ if (null != key) {
+ LOGGER.info("Details of apikey [" + apiKey + "] found. Returning response");
+ DMaaPResponseBuilder.respondOk(dmaapContext, key.asJsonObject());
+ } else {
+ LOGGER.info("Details of apikey [" + apiKey + "] not found. Returning response");
+ throw new Exception("Key [" + apiKey + "] not found.");
+ }
+
+ }
+
+ /**
+ * Fetching list of all the topics
+ * @param dmaapContext
+ * @throws ConfigDbException
+ * @throws IOException
+ */
+ @Override
+ public void getTopicsTable(DMaaPContext dmaapContext) throws ConfigDbException, IOException {
+ // TODO - We need to work on the templates and how data will be set in
+ // the template
+ LOGGER.info("Fetching list of all the topics and returning in a templated form for display");
+ List<Topic> topicsList = getMetaBroker(dmaapContext).getAllTopics();
+
+ JSONObject jsonObject = new JSONObject();
+
+ JSONArray topicsArray = new JSONArray();
+
+ List<Topic> topicList = getMetaBroker(dmaapContext).getAllTopics();
+
+ for (Topic topic : topicList) {
+ JSONObject obj = new JSONObject();
+ obj.put("topicName", topic.getName());
+ obj.put("description", topic.getDescription());
+ obj.put("owner", topic.getOwner());
+ topicsArray.put(obj);
+ }
+
+ jsonObject.put("topics", topicsList);
+
+ LOGGER.info("Returning the list of topics in templated format for display.");
+ DMaaPResponseBuilder.respondOk(dmaapContext, jsonObject);
+
+ }
+
+ /**
+ * @param dmaapContext
+ * @param topicName
+ * @throws ConfigDbException
+ * @throws IOException
+ * @throws TopicExistsException
+ */
+ @Override
+ public void getTopic(DMaaPContext dmaapContext, String topicName)
+ throws ConfigDbException, IOException, TopicExistsException {
+ // TODO - We need to work on the templates and how data will be set in
+ // the template
+ LOGGER.info("Fetching detials of apikey: " + topicName);
+ Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
+
+ if (null == topic) {
+ LOGGER.error("Topic [" + topicName + "] does not exist.");
+ throw new TopicExistsException("Topic [" + topicName + "] does not exist.");
+ }
+
+ JSONObject json = new JSONObject();
+ json.put("topicName", topic.getName());
+ json.put("description", topic.getDescription());
+ json.put("owner", topic.getOwner());
+
+ LOGGER.info("Returning details of topic [" + topicName + "]. Sending response.");
+ DMaaPResponseBuilder.respondOk(dmaapContext, json);
+
+ }
+
+ /**
+ *
+ * @param dmaapContext
+ * @return
+ */
+ private NsaApiDb<NsaSimpleApiKey> getApiKeyDb(DMaaPContext dmaapContext) {
+ return dmaapContext.getConfigReader().getfApiKeyDb();
+
+ }
+
+ /**
+ *
+ * @param dmaapContext
+ * @return
+ */
+ private DMaaPKafkaMetaBroker getMetaBroker(DMaaPContext dmaapContext) {
+ return (DMaaPKafkaMetaBroker) dmaapContext.getConfigReader().getfMetaBroker();
+ }
+
+}