diff options
Diffstat (limited to 'src/main/java')
7 files changed, 687 insertions, 0 deletions
diff --git a/src/main/java/org/onap/dmaap/commonauth/kafka/base/authorization/AuthorizationProvider.java b/src/main/java/org/onap/dmaap/commonauth/kafka/base/authorization/AuthorizationProvider.java new file mode 100644 index 0000000..da96929 --- /dev/null +++ b/src/main/java/org/onap/dmaap/commonauth/kafka/base/authorization/AuthorizationProvider.java @@ -0,0 +1,30 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * + *******************************************************************************/ +package org.onap.dmaap.commonauth.kafka.base.authorization; + +public interface AuthorizationProvider { + + public boolean hasPermission(String userId, String permission, String instance, String action); + + public String getId(); + + public String authenticate(String userId, String password) throws Exception; +} diff --git a/src/main/java/org/onap/dmaap/commonauth/kafka/base/authorization/AuthorizationProviderFactory.java b/src/main/java/org/onap/dmaap/commonauth/kafka/base/authorization/AuthorizationProviderFactory.java new file mode 100644 index 0000000..6b872af --- /dev/null +++ b/src/main/java/org/onap/dmaap/commonauth/kafka/base/authorization/AuthorizationProviderFactory.java @@ -0,0 +1,51 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * + *******************************************************************************/ +package org.onap.dmaap.commonauth.kafka.base.authorization; + +import java.util.HashMap; +import java.util.Map; +import java.util.ServiceLoader; + +public class AuthorizationProviderFactory<K, V> { + private static final Map<String, AuthorizationProvider> AUTHORIZATION_PROVIDER_MAP = new HashMap<String, AuthorizationProvider>(); + private static final AuthorizationProviderFactory AUTHORIZATION_PROVIDER_FACTORY = new AuthorizationProviderFactory(); + + private AuthorizationProviderFactory() { + try { + ServiceLoader<AuthorizationProvider> serviceLoader = ServiceLoader.load(AuthorizationProvider.class); + for (AuthorizationProvider authzProvider : serviceLoader) { + AUTHORIZATION_PROVIDER_MAP.put(authzProvider.getId(), authzProvider); + + } + } catch (Exception ee) { + System.out.println(ee); + System.exit(0); + } + } + + public static AuthorizationProviderFactory getProviderFactory() { + return AUTHORIZATION_PROVIDER_FACTORY; + } + + public AuthorizationProvider getProvider() { + return AUTHORIZATION_PROVIDER_MAP.get(System.getProperty("kafka.authorization.provider", "CADI_AAF_PROVIDER")); + } +} diff --git a/src/main/java/org/onap/dmaap/commonauth/kafka/base/authorization/Cadi3AAFProvider.java b/src/main/java/org/onap/dmaap/commonauth/kafka/base/authorization/Cadi3AAFProvider.java new file mode 100644 index 0000000..60c1868 --- /dev/null +++ b/src/main/java/org/onap/dmaap/commonauth/kafka/base/authorization/Cadi3AAFProvider.java @@ -0,0 +1,158 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * + *******************************************************************************/ +package org.onap.dmaap.commonauth.kafka.base.authorization; + +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Properties; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.onap.aaf.cadi.CadiException; +import org.onap.aaf.cadi.PropAccess; +import org.onap.aaf.cadi.aaf.AAFPermission; +import org.onap.aaf.cadi.aaf.v2_0.AAFAuthn; +import org.onap.aaf.cadi.aaf.v2_0.AAFCon; +import org.onap.aaf.cadi.aaf.v2_0.AAFConHttp; +import org.onap.aaf.cadi.aaf.v2_0.AbsAAFLur; +import org.onap.aaf.cadi.principal.UnAuthPrincipal; + +public class Cadi3AAFProvider implements AuthorizationProvider { + + private static PropAccess access; + private static AAFCon<?> aafcon; + private static final String CADI_PROPERTIES = "/opt/kafka/config/cadi.properties"; + private static final String AAF_LOCATOR_ENV = "aaf_locate_url"; + private static final String MR_NAMESPACE = " org.onap.dmaap.mr"; + + public static AAFAuthn<?> getAafAuthn() throws CadiException { + if (aafAuthn == null) { + throw new CadiException("Cadi is uninitialized in Cadi3AAFProvider.getAafAuthn()"); + } + return aafAuthn; + } + + private static AAFAuthn<?> aafAuthn; + private static AbsAAFLur<AAFPermission> aafLur; + + private static boolean props_ok = false; + + private static final Logger logger = LoggerFactory.getLogger(Cadi3AAFProvider.class); + + public Cadi3AAFProvider() { + setup(); + } + + private synchronized void setup() { + if (access == null) { + + Properties props = new Properties(); + FileInputStream fis = null; + try { + if (System.getProperty("CADI_PROPERTIES") != null) { + fis = new FileInputStream(System.getProperty("CADI_PROPERTIES")); + } else { + fis = new FileInputStream(CADI_PROPERTIES); + } + try { + props.load(fis); + if (System.getenv(AAF_LOCATOR_ENV) != null) + props.setProperty(AAF_LOCATOR_ENV, System.getenv(AAF_LOCATOR_ENV)); + access = new PropAccess(props); + } finally { + fis.close(); + } + } catch (IOException e) { + logger.error("Unable to load " + CADI_PROPERTIES); + logger.error("Error", e); + } + + props_ok = true; + if (props_ok == false) { + return; + } + } + + if (aafAuthn == null) { + try { + aafcon = new AAFConHttp(access); + aafAuthn = aafcon.newAuthn(); + aafLur = aafcon.newLur(aafAuthn); + } catch (final Exception e) { + aafAuthn = null; + if (access != null) + access.log(e, "Failed to initialize AAF"); + props_ok = false; + } + } + + } + + /** + * Checks if a user has a particular permission + * <p/> + * Returns true if the permission in found + */ + public boolean hasPermission(String userId, String permission, String instance, String action) { + boolean hasPermission = false; + try { + logger.info("^ Event at hasPermission to validate userid " + userId + " with " + permission + " " + instance + + " " + action); + // AAF Style permissions are in the form + // Resource Name, Resource Type, Action + if (userId.equals("admin")) { + hasPermission = true; + return hasPermission; + } + AAFPermission perm = new AAFPermission(MR_NAMESPACE, permission, instance, action); + if (aafLur != null) { + hasPermission = aafLur.fish(new UnAuthPrincipal(userId), perm); + logger.trace("Permission: " + perm.getKey() + " for user :" + userId + " found: " + hasPermission); + } else { + logger.error("AAF client not initialized. Not able to find permissions."); + } + } catch (Exception e) { + logger.error("AAF client not initialized", e); + } + return hasPermission; + } + + public String getId() { + return "CADI_AAF_PROVIDER"; + } + + public String authenticate(String userId, String password) throws Exception { + logger.info("^Event received with username " + userId); + if (userId.equals("admin")) { + logger.info("User Admin by passess AAF call ...."); + return null; + } + String aafResponse = aafAuthn.validate(userId, password); + logger.info("aafResponse=" + aafResponse + " for " + userId); + + if (aafResponse != null) { + logger.error("Authentication failed for user ." + userId); + } + return aafResponse; + } + +} diff --git a/src/main/java/org/onap/dmaap/kafkaAuthorize/KafkaCustomAuthorizer.java b/src/main/java/org/onap/dmaap/kafkaAuthorize/KafkaCustomAuthorizer.java new file mode 100644 index 0000000..cb33e29 --- /dev/null +++ b/src/main/java/org/onap/dmaap/kafkaAuthorize/KafkaCustomAuthorizer.java @@ -0,0 +1,153 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * + *******************************************************************************/ +package org.onap.dmaap.kafkaAuthorize; + +import java.util.Map; + +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.onap.aaf.cadi.PropAccess; +import org.onap.dmaap.commonauth.kafka.base.authorization.AuthorizationProviderFactory; +import kafka.network.RequestChannel.Session; +import kafka.security.auth.Acl; +import kafka.security.auth.Authorizer; +import kafka.security.auth.Operation; +import kafka.security.auth.Resource; +import scala.collection.immutable.Set; + +/** + * A trivial Kafka Authorizer for use with SSL and AAF + * Authentication/Authorization. + * + */ +public class KafkaCustomAuthorizer implements Authorizer { + private PropAccess access; + private static final Logger logger = LoggerFactory.getLogger(KafkaCustomAuthorizer.class); + + // I'm assuming this is called BEFORE any usage... + @Override + public void configure(final Map<String, ?> arg0) { + // TODO Auto-generate method stub + } + + @Override + public void addAcls(final Set<Acl> arg0, final Resource arg1) { + // TODO Auto-generated method stub + + } + + @Override + public boolean authorize(final Session arg0, final Operation arg1, final Resource arg2) { + if (arg0.principal() == null) { + return false; + } + + String fullName = arg0.principal().getName(); + fullName = fullName != null ? fullName.trim() : fullName; + String topicName = null; + String namspace = null; + String ins = null; + String type = null; + String action = null; + + String kafkaactivity = arg1.name(); + + if (kafkaactivity.equals("Read")) { + action = "sub"; + } else if (kafkaactivity.equals("Write")) { + action = "pub"; + } else if (kafkaactivity.equals("Describe")) { + return true; + } + if (arg2.resourceType().name().equals("Topic")) { + topicName = arg2.name(); + } else { + return true; + } + + try { + + if (null != topicName && topicName.indexOf(".") > 0) { + namspace = topicName.substring(0, topicName.lastIndexOf(".")); + ins = namspace + ".topic"; + type = ":topic." + topicName; + logger.info("^Event Received for topic " + topicName + " , User " + fullName + " , action = " + action); + } + + if (fullName.equals("admin")) { + return true; + } + + if (null != topicName) { + boolean hasResp = AuthorizationProviderFactory.getProviderFactory().getProvider() + .hasPermission(fullName, ins, type, action); + if (hasResp) { + logger.info("Successful Authorization for " + fullName + " on " + topicName + " for " + ins + "|" + + type + "|" + action); + } + if (!hasResp) { + logger.info(fullName + " is not allowed in " + ins + "|" + type + "|" + action); + throw new Exception(fullName + " is not allowed in " + ins + "|" + type + "|" + action); + } + } + } catch (final Exception e) { + return false; + } + return true; + } + + @Override + public void close() { + // TODO Auto-generated method stub + + } + + @Override + public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls() { + // TODO Auto-generated method stub + return null; + } + + @Override + public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls(final KafkaPrincipal arg0) { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean removeAcls(final Resource arg0) { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean removeAcls(final Set<Acl> arg0, final Resource arg1) { + // TODO Auto-generated method stub + return false; + } + + public Set<Acl> getAcls(Resource arg0) { + // TODO Auto-generated method stub + return null; + } +} diff --git a/src/main/java/org/onap/dmaap/kafkaAuthorize/PlainLoginModule1.java b/src/main/java/org/onap/dmaap/kafkaAuthorize/PlainLoginModule1.java new file mode 100644 index 0000000..dd21682 --- /dev/null +++ b/src/main/java/org/onap/dmaap/kafkaAuthorize/PlainLoginModule1.java @@ -0,0 +1,70 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * + *******************************************************************************/ +package org.onap.dmaap.kafkaAuthorize; + +import java.util.Map; + +import javax.security.auth.Subject; +import javax.security.auth.callback.CallbackHandler; +import javax.security.auth.login.LoginException; +import javax.security.auth.spi.LoginModule; + +public class PlainLoginModule1 implements LoginModule { + + private static final String USERNAME_CONFIG = "username"; + private static final String PASSWORD_CONFIG = "password"; + + static { + PlainSaslServerProvider1.initialize(); + } + + @Override + public void initialize(Subject subject, CallbackHandler callbackHandler, Map<String, ?> sharedState, + Map<String, ?> options) { + String username = (String) options.get(USERNAME_CONFIG); + if (username != null) + subject.getPublicCredentials().add(username); + String password = (String) options.get(PASSWORD_CONFIG); + if (password != null) + subject.getPrivateCredentials().add(password); + + } + + @Override + public boolean login() throws LoginException { + return true; + } + + @Override + public boolean logout() throws LoginException { + return true; + } + + @Override + public boolean commit() throws LoginException { + return true; + } + + @Override + public boolean abort() throws LoginException { + return false; + } +} diff --git a/src/main/java/org/onap/dmaap/kafkaAuthorize/PlainSaslServer1.java b/src/main/java/org/onap/dmaap/kafkaAuthorize/PlainSaslServer1.java new file mode 100644 index 0000000..f28671b --- /dev/null +++ b/src/main/java/org/onap/dmaap/kafkaAuthorize/PlainSaslServer1.java @@ -0,0 +1,185 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * + *******************************************************************************/ +package org.onap.dmaap.kafkaAuthorize; + +import java.io.UnsupportedEncodingException; +import java.util.Arrays; +import java.util.Map; + +import javax.security.auth.callback.CallbackHandler; +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslException; +import javax.security.sasl.SaslServer; +import javax.security.sasl.SaslServerFactory; + +import org.apache.kafka.common.security.JaasContext; +import org.apache.kafka.common.security.authenticator.SaslServerCallbackHandler; + +import org.onap.dmaap.commonauth.kafka.base.authorization.AuthorizationProviderFactory; + +/** + * Simple SaslServer implementation for SASL/PLAIN. In order to make this + * implementation fully pluggable, authentication of username/password is fully + * contained within the server implementation. + * <p> + * Valid users with passwords are specified in the Jaas configuration file. Each + * user is specified with user_<username> as key and <password> as value. This + * is consistent with Zookeeper Digest-MD5 implementation. + * <p> + * To avoid storing clear passwords on disk or to integrate with external + * authentication servers in production systems, this module can be replaced + * with a different implementation. + * + */ +public class PlainSaslServer1 implements SaslServer { + + public static final String PLAIN_MECHANISM = "PLAIN"; + + private final JaasContext jaasContext; + + private boolean complete; + private String authorizationID; + + public PlainSaslServer1(JaasContext jaasContext) { + this.jaasContext = jaasContext; + } + + @Override + public byte[] evaluateResponse(byte[] response) throws SaslException { + /* + * Message format (from https://tools.ietf.org/html/rfc4616): + * + * message = [authzid] UTF8NUL authcid UTF8NUL passwd authcid = 1*SAFE ; + * MUST accept up to 255 octets authzid = 1*SAFE ; MUST accept up to 255 + * octets passwd = 1*SAFE ; MUST accept up to 255 octets UTF8NUL = %x00 + * ; UTF-8 encoded NUL character + * + * SAFE = UTF1 / UTF2 / UTF3 / UTF4 ;; any UTF-8 encoded Unicode + * character except NUL + */ + + String[] tokens; + try { + tokens = new String(response, "UTF-8").split("\u0000"); + } catch (UnsupportedEncodingException e) { + throw new SaslException("UTF-8 encoding not supported", e); + } + if (tokens.length != 3) + throw new SaslException("Invalid SASL/PLAIN response: expected 3 tokens, got " + tokens.length); + authorizationID = tokens[0]; + String username = tokens[1]; + String password = tokens[2]; + + if (username.isEmpty()) { + throw new SaslException("Authentication failed: username not specified"); + } + if (password.isEmpty()) { + throw new SaslException("Authentication failed: password not specified"); + } + if (authorizationID.isEmpty()) + authorizationID = username; + + String aafResponse = "Not Verified"; + try { + aafResponse = AuthorizationProviderFactory.getProviderFactory().getProvider().authenticate(username, + password); + } catch (Exception e) { + } + + if (null != aafResponse) { + throw new SaslException("Authentication failed: " + aafResponse + " User " + username); + } + + complete = true; + return new byte[0]; + } + + @Override + public String getAuthorizationID() { + if (!complete) + throw new IllegalStateException("Authentication exchange has not completed"); + return authorizationID; + } + + @Override + public String getMechanismName() { + return PLAIN_MECHANISM; + } + + @Override + public Object getNegotiatedProperty(String propName) { + if (!complete) + throw new IllegalStateException("Authentication exchange has not completed"); + return null; + } + + @Override + public boolean isComplete() { + return complete; + } + + @Override + public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException { + if (!complete) + throw new IllegalStateException("Authentication exchange has not completed"); + return Arrays.copyOfRange(incoming, offset, offset + len); + } + + @Override + public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException { + if (!complete) + throw new IllegalStateException("Authentication exchange has not completed"); + return Arrays.copyOfRange(outgoing, offset, offset + len); + } + + @Override + public void dispose() throws SaslException { + } + + public static class PlainSaslServerFactory1 implements SaslServerFactory { + + @Override + public SaslServer createSaslServer(String mechanism, String protocol, String serverName, Map<String, ?> props, + CallbackHandler cbh) throws SaslException { + + if (!PLAIN_MECHANISM.equals(mechanism)) + throw new SaslException( + String.format("Mechanism \'%s\' is not supported. Only PLAIN is supported.", mechanism)); + + if (!(cbh instanceof SaslServerCallbackHandler)) + throw new SaslException( + "CallbackHandler must be of type SaslServerCallbackHandler, but it is: " + cbh.getClass()); + + return new PlainSaslServer1(((SaslServerCallbackHandler) cbh).jaasContext()); + } + + @Override + public String[] getMechanismNames(Map<String, ?> props) { + if (props == null) + return new String[] { PLAIN_MECHANISM }; + String noPlainText = (String) props.get(Sasl.POLICY_NOPLAINTEXT); + if ("true".equals(noPlainText)) + return new String[] {}; + else + return new String[] { PLAIN_MECHANISM }; + } + } +} diff --git a/src/main/java/org/onap/dmaap/kafkaAuthorize/PlainSaslServerProvider1.java b/src/main/java/org/onap/dmaap/kafkaAuthorize/PlainSaslServerProvider1.java new file mode 100644 index 0000000..16a11f4 --- /dev/null +++ b/src/main/java/org/onap/dmaap/kafkaAuthorize/PlainSaslServerProvider1.java @@ -0,0 +1,40 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * + *******************************************************************************/ +package org.onap.dmaap.kafkaAuthorize; + +import java.security.Provider; +import java.security.Security; + +import org.onap.dmaap.kafkaAuthorize.PlainSaslServer1.PlainSaslServerFactory1; + +public class PlainSaslServerProvider1 extends Provider { + + private static final long serialVersionUID = 1L; + + protected PlainSaslServerProvider1() { + super("Simple SASL/PLAIN Server Provider", 1.0, "Simple SASL/PLAIN Server Provider for Kafka"); + super.put("SaslServerFactory." + PlainSaslServer1.PLAIN_MECHANISM, PlainSaslServerFactory1.class.getName()); + } + + public static void initialize() { + Security.addProvider(new PlainSaslServerProvider1()); + } +} |