summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dmaap/kafkaAuthorize/KafkaCustomAuthorizer.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/dmaap/kafkaAuthorize/KafkaCustomAuthorizer.java')
-rw-r--r--src/main/java/org/onap/dmaap/kafkaAuthorize/KafkaCustomAuthorizer.java153
1 files changed, 153 insertions, 0 deletions
diff --git a/src/main/java/org/onap/dmaap/kafkaAuthorize/KafkaCustomAuthorizer.java b/src/main/java/org/onap/dmaap/kafkaAuthorize/KafkaCustomAuthorizer.java
new file mode 100644
index 0000000..cb33e29
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/kafkaAuthorize/KafkaCustomAuthorizer.java
@@ -0,0 +1,153 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ *
+ *******************************************************************************/
+package org.onap.dmaap.kafkaAuthorize;
+
+import java.util.Map;
+
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.onap.aaf.cadi.PropAccess;
+import org.onap.dmaap.commonauth.kafka.base.authorization.AuthorizationProviderFactory;
+import kafka.network.RequestChannel.Session;
+import kafka.security.auth.Acl;
+import kafka.security.auth.Authorizer;
+import kafka.security.auth.Operation;
+import kafka.security.auth.Resource;
+import scala.collection.immutable.Set;
+
+/**
+ * A trivial Kafka Authorizer for use with SSL and AAF
+ * Authentication/Authorization.
+ *
+ */
+public class KafkaCustomAuthorizer implements Authorizer {
+ private PropAccess access;
+ private static final Logger logger = LoggerFactory.getLogger(KafkaCustomAuthorizer.class);
+
+ // I'm assuming this is called BEFORE any usage...
+ @Override
+ public void configure(final Map<String, ?> arg0) {
+ // TODO Auto-generate method stub
+ }
+
+ @Override
+ public void addAcls(final Set<Acl> arg0, final Resource arg1) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public boolean authorize(final Session arg0, final Operation arg1, final Resource arg2) {
+ if (arg0.principal() == null) {
+ return false;
+ }
+
+ String fullName = arg0.principal().getName();
+ fullName = fullName != null ? fullName.trim() : fullName;
+ String topicName = null;
+ String namspace = null;
+ String ins = null;
+ String type = null;
+ String action = null;
+
+ String kafkaactivity = arg1.name();
+
+ if (kafkaactivity.equals("Read")) {
+ action = "sub";
+ } else if (kafkaactivity.equals("Write")) {
+ action = "pub";
+ } else if (kafkaactivity.equals("Describe")) {
+ return true;
+ }
+ if (arg2.resourceType().name().equals("Topic")) {
+ topicName = arg2.name();
+ } else {
+ return true;
+ }
+
+ try {
+
+ if (null != topicName && topicName.indexOf(".") > 0) {
+ namspace = topicName.substring(0, topicName.lastIndexOf("."));
+ ins = namspace + ".topic";
+ type = ":topic." + topicName;
+ logger.info("^Event Received for topic " + topicName + " , User " + fullName + " , action = " + action);
+ }
+
+ if (fullName.equals("admin")) {
+ return true;
+ }
+
+ if (null != topicName) {
+ boolean hasResp = AuthorizationProviderFactory.getProviderFactory().getProvider()
+ .hasPermission(fullName, ins, type, action);
+ if (hasResp) {
+ logger.info("Successful Authorization for " + fullName + " on " + topicName + " for " + ins + "|"
+ + type + "|" + action);
+ }
+ if (!hasResp) {
+ logger.info(fullName + " is not allowed in " + ins + "|" + type + "|" + action);
+ throw new Exception(fullName + " is not allowed in " + ins + "|" + type + "|" + action);
+ }
+ }
+ } catch (final Exception e) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void close() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls(final KafkaPrincipal arg0) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public boolean removeAcls(final Resource arg0) {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean removeAcls(final Set<Acl> arg0, final Resource arg1) {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ public Set<Acl> getAcls(Resource arg0) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+}