diff options
author | su622b <su622b@att.com> | 2020-03-09 07:14:03 -0400 |
---|---|---|
committer | su622b <su622b@att.com> | 2020-03-09 07:16:43 -0400 |
commit | 96b4eae15f6e43154d47b1fc2442f1345f405c0b (patch) | |
tree | fce85c2804f007a5afbb2ed6c4b1dff734617a13 /src/main | |
parent | d1118202d8eb04babd7fa1151a5d01d85c6e93dd (diff) |
confluent based image
Issue-ID: DMAAP-1337
Change-Id: I242f13be3b36f0bc7fdbb5fcdb3f6ba7a92647cb
Signed-off-by: su622b <su622b@att.com>
Diffstat (limited to 'src/main')
7 files changed, 230 insertions, 191 deletions
diff --git a/src/main/docker/Dockerfile b/src/main/docker/Dockerfile index ee39c76..1896aab 100644 --- a/src/main/docker/Dockerfile +++ b/src/main/docker/Dockerfile @@ -1,57 +1,40 @@ -FROM openjdk:8-jre-alpine3.9 +FROM confluentinc/cp-base:5.3.1 + +# allow arg override of required env params +ARG KAFKA_ZOOKEEPER_CONNECT +ENV KAFKA_ZOOKEEPER_CONNECT=${KAFKA_ZOOKEEPER_CONNECT} +ARG KAFKA_ADVERTISED_LISTENERS +ENV KAFKA_ADVERTISED_LISTENERS=${KAFKA_ADVERTISED_LISTENERS} + +ENV COMPONENT=kafka \ + KAFKA_USER=mrkafka + +RUN echo "===> installing ${COMPONENT}..." \ + && apt-get update && apt-get install -y confluent-kafka-${SCALA_VERSION}=${CONFLUENT_VERSION}${CONFLUENT_PLATFORM_LABEL}-${CONFLUENT_DEB_VERSION} \ + \ + && echo "===> clean up ..." \ + && apt-get clean && rm -rf /tmp/* /var/lib/apt/lists/* \ + \ + && echo "===> Setting up ${COMPONENT} dirs..." \ + && mkdir -p /var/lib/${COMPONENT}/data /etc/${COMPONENT}/secrets/cert /etc/${COMPONENT}/secrets/jaas /etc/${COMPONENT}/data /var/log/kafka /var/log/confluent \ + && chmod -R ag+w /etc/${COMPONENT} /var/lib/${COMPONENT}/data /etc/${COMPONENT}/secrets /etc/${COMPONENT}/data /var/log/kafka /var/log/confluent \ + && chown -R root:root /var/log/kafka /var/log/confluent /var/lib/kafka /var/lib/zookeeper + +COPY include/etc/confluent/docker /etc/confluent/docker +RUN chmod -R +x /etc/confluent/docker + +COPY org.onap.dmaap.mr.trust.jks \ + org.onap.dmaap.mr.p12 \ + org.onap.dmaap.mr.keyfile \ + /etc/${COMPONENT}/secrets/cert/ -ARG kafka_version=1.1.1 -ARG scala_version=2.12 +COPY kafka11aaf-jar-with-dependencies.jar /usr/share/java/${COMPONENT}/ -RUN apk add --update unzip wget curl jq coreutils bash +RUN useradd -u 1000 -g 0 $KAFKA_USER -ENV KAFKA_VERSION=$kafka_version SCALA_VERSION=$scala_version +USER $KAFKA_USER -COPY download-kafka.sh \ - kafka_server_jaas.conf \ - org.onap.dmaap.mr.trust.jks \ - org.onap.dmaap.mr.p12 \ - org.onap.dmaap.mr.keyfile \ - cadi.properties \ - kafka11aaf-jar-with-dependencies.jar \ - dmaapMMAgent.jar \ - kafka-run-class.sh \ - /tmp/ - -COPY mmagent.config \ - consumer.properties \ - producer.properties \ - /opt/etc/ - -RUN chmod a+x /tmp/download-kafka.sh && sync && /tmp/download-kafka.sh && tar xfz /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz -C /opt && rm /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz && ln -s /opt/kafka_${SCALA_VERSION}-${KAFKA_VERSION} /opt/kafka - -VOLUME ["/kafka"] - -ENV KAFKA_HOME /opt/kafka -ENV PATH ${PATH}:${KAFKA_HOME}/bin - -COPY start-kafka.sh \ - broker-list.sh \ - create-topics.sh \ - start-kafkaOrMirrorMaker.sh \ - start-mirrormaker.sh \ - /usr/bin/ - -RUN mkdir /opt/logs && \ - touch /opt/logs/mmagent.log - -# The scripts need to have executable permission -RUN chmod a+x /usr/bin/start-kafka.sh && \ - chmod a+x /usr/bin/broker-list.sh && \ - chmod a+x /usr/bin/start-kafkaOrMirrorMaker.sh && \ - chmod a+x /usr/bin/start-mirrormaker.sh && \ - chmod a+x /usr/bin/create-topics.sh -# Use "exec" form so that it runs as PID 1 (useful for graceful shutdown) -CMD ["start-kafkaOrMirrorMaker.sh"] - -RUN addgroup -S -g 1000 mrkafka \ - && adduser -S -u 1000 mrkafka mrkafka \ - && chown -R mrkafka:mrkafka /opt/kafka/ /opt/logs/ /opt/etc/ /kafka/ /usr/bin/ /tmp/ - -USER mrkafka +EXPOSE 9092 9093 + +CMD ["/etc/confluent/docker/run"]
\ No newline at end of file diff --git a/src/main/docker/cadi.properties b/src/main/docker/cadi.properties index a63a1bf..55d11c4 100644 --- a/src/main/docker/cadi.properties +++ b/src/main/docker/cadi.properties @@ -3,13 +3,13 @@ aaf_url=https://AAF_LOCATE_URL/onap.org.osaaf.aaf.service:2.1 aaf_env=DEV aaf_lur=org.onap.aaf.cadi.aaf.v2_0.AAFLurPerm -cadi_truststore=/opt/kafka/config/org.onap.dmaap.mr.trust.jks +cadi_truststore=/etc/kafka/secrets/cert/org.onap.dmaap.mr.trust.jks cadi_truststore_password=enc:gvXm0E9p-_SRNw5_feOUE7wqXBxgxV3S_bdAyB08Sq9F35cCUZHWgQyKIDtTAbEw -cadi_keyfile=/opt/kafka/config/org.onap.dmaap.mr.keyfile +cadi_keyfile=/etc/kafka/secrets/cert/org.onap.dmaap.mr.keyfile cadi_alias=dmaapmr@mr.dmaap.onap.org -cadi_keystore=/opt/kafka/config/org.onap.dmaap.mr.p12 +cadi_keystore=/etc/kafka/secrets/cert/org.onap.dmaap.mr.p12 cadi_keystore_password=enc:pLMCzQzk-OP7IpYNi0TPtQSkNcraFAdarZG8HbdOKq4BycW6g_7mfhphLhOZo6ht cadi_x509_issuers=CN=intermediateCA_1, OU=OSAAF, O=ONAP, C=US:CN=intermediateCA_7, OU=OSAAF, O=ONAP, C=US:CN=intermediateCA_9, OU=OSAAF, O=ONAP, C=US @@ -17,4 +17,4 @@ cadi_x509_issuers=CN=intermediateCA_1, OU=OSAAF, O=ONAP, C=US:CN=intermediateCA_ cadi_loglevel=INFO cadi_protocols=TLSv1.1,TLSv1.2 cadi_latitude=37.78187 -cadi_longitude=-122.26147
\ No newline at end of file +cadi_longitude=-122.26147 diff --git a/src/main/docker/kafka_server_jaas.conf b/src/main/docker/kafka_server_jaas.conf index 163041b..3e69fc6 100644 --- a/src/main/docker/kafka_server_jaas.conf +++ b/src/main/docker/kafka_server_jaas.conf @@ -1,5 +1,12 @@ KafkaServer { org.onap.dmaap.kafkaAuthorize.PlainLoginModule1 required username="admin" - password="admin_secret"; + password="admin_secret" + user_admin="admin_secret"; }; +Client { + org.apache.zookeeper.server.auth.DigestLoginModule required + username="kafka" + password="kafka_secret"; + }; + diff --git a/src/main/java/org/onap/dmaap/commonauth/kafka/base/authorization/Cadi3AAFProvider.java b/src/main/java/org/onap/dmaap/commonauth/kafka/base/authorization/Cadi3AAFProvider.java index da01829..56fd1bb 100644 --- a/src/main/java/org/onap/dmaap/commonauth/kafka/base/authorization/Cadi3AAFProvider.java +++ b/src/main/java/org/onap/dmaap/commonauth/kafka/base/authorization/Cadi3AAFProvider.java @@ -44,17 +44,20 @@ public class Cadi3AAFProvider implements AuthorizationProvider { private static PropAccess access; private static AAFCon<?> aafcon; - private static final String CADI_PROPERTIES = "/opt/kafka/config/cadi.properties"; + private static final String CADI_PROPERTIES = "/etc/kafka/data/cadi.properties"; private static final String AAF_LOCATOR_ENV = "aaf_locate_url"; private static String apiKey = null; private static String kafkaUsername = null; private static AAFAuthn<?> aafAuthn; private static AbsAAFLur<AAFPermission> aafLur; - + private static boolean enableCadi = false; private static final Logger logger = LoggerFactory.getLogger(Cadi3AAFProvider.class); static { + if (System.getenv("enableCadi") != null && System.getenv("enableCadi").equals("true")) { + enableCadi = true; + } Configuration config = Configuration.getConfiguration(); try { if (config == null) { @@ -86,6 +89,11 @@ public class Cadi3AAFProvider implements AuthorizationProvider { return kafkaUsername; } + public static boolean isCadiEnabled() { + + return enableCadi; + } + public static AAFAuthn<?> getAafAuthn() throws CadiException { if (aafAuthn == null) { throw new CadiException("Cadi is uninitialized in Cadi3AAFProvider.getAafAuthn()"); @@ -172,25 +180,32 @@ public class Cadi3AAFProvider implements AuthorizationProvider { public String authenticate(String userId, String password) throws Exception { logger.info("^Event received with username " + userId); - if (userId.equals(kafkaUsername)) { - if (password.equals(apiKey)) { - logger.info("by passes the authentication for the admin " + kafkaUsername); - return null; - } else { - String errorMessage = "Authentication failed for user " + kafkaUsername; - logger.error(errorMessage); - return errorMessage; - } - } + boolean enableCadi = System.getenv("enableCadi") == null ? true : false; + if (!enableCadi) { + return null; + } else { + if (userId.equals(kafkaUsername)) { + if (password.equals(apiKey)) { + logger.info("by passes the authentication for the admin " + kafkaUsername); + return null; + } else { + String errorMessage = "Authentication failed for user " + kafkaUsername; + logger.error(errorMessage); + return errorMessage; + } - String aafResponse = aafAuthn.validate(userId, password); - logger.info("aafResponse=" + aafResponse + " for " + userId); + } - if (aafResponse != null) { - logger.error("Authentication failed for user ." + userId); + String aafResponse = aafAuthn.validate(userId, password); + logger.info("aafResponse=" + aafResponse + " for " + userId); + + if (aafResponse != null) { + logger.error("Authentication failed for user ." + userId); + } + return aafResponse; } - return aafResponse; + } } diff --git a/src/main/java/org/onap/dmaap/kafkaAuthorize/KafkaCustomAuthorizer.java b/src/main/java/org/onap/dmaap/kafkaAuthorize/KafkaCustomAuthorizer.java index 7d38cd2..950cd9f 100644 --- a/src/main/java/org/onap/dmaap/kafkaAuthorize/KafkaCustomAuthorizer.java +++ b/src/main/java/org/onap/dmaap/kafkaAuthorize/KafkaCustomAuthorizer.java @@ -161,7 +161,7 @@ public class KafkaCustomAuthorizer implements Authorizer { return true; } - if (null != topicName && !topicName.startsWith("org.onap")) { + if ((!Cadi3AAFProvider.isCadiEnabled())||(null != topicName && !topicName.startsWith("org.onap"))) { return true; } diff --git a/src/main/java/org/onap/dmaap/kafkaAuthorize/PlainSaslServer1.java b/src/main/java/org/onap/dmaap/kafkaAuthorize/PlainSaslServer1.java index 508d583..6213b9b 100644 --- a/src/main/java/org/onap/dmaap/kafkaAuthorize/PlainSaslServer1.java +++ b/src/main/java/org/onap/dmaap/kafkaAuthorize/PlainSaslServer1.java @@ -1,4 +1,4 @@ -/******************************************************************************* +/****************************************************************************** * ============LICENSE_START======================================================= * org.onap.dmaap * ================================================================================ @@ -21,18 +21,25 @@ package org.onap.dmaap.kafkaAuthorize; import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.Map; +import javax.security.auth.callback.Callback; import javax.security.auth.callback.CallbackHandler; +import javax.security.auth.callback.NameCallback; import javax.security.sasl.Sasl; import javax.security.sasl.SaslException; import javax.security.sasl.SaslServer; import javax.security.sasl.SaslServerFactory; +import org.apache.kafka.common.errors.SaslAuthenticationException; import org.apache.kafka.common.security.JaasContext; import org.apache.kafka.common.security.authenticator.SaslServerCallbackHandler; - +import org.apache.kafka.common.security.plain.PlainAuthenticateCallback; +import org.apache.kafka.common.security.plain.internals.PlainSaslServer; import org.onap.dmaap.commonauth.kafka.base.authorization.AuthorizationProviderFactory; /** @@ -51,126 +58,152 @@ import org.onap.dmaap.commonauth.kafka.base.authorization.AuthorizationProviderF */ public class PlainSaslServer1 implements SaslServer { - public static final String PLAIN_MECHANISM = "PLAIN"; - - private boolean complete; - private String authorizationID; - - - @Override - public byte[] evaluateResponse(byte[] response) throws SaslException { - /* - * Message format (from https://tools.ietf.org/html/rfc4616): - * - * message = [authzid] UTF8NUL authcid UTF8NUL passwd authcid = 1*SAFE ; - * MUST accept up to 255 octets authzid = 1*SAFE ; MUST accept up to 255 - * octets passwd = 1*SAFE ; MUST accept up to 255 octets UTF8NUL = %x00 - * ; UTF-8 encoded NUL character - * - * SAFE = UTF1 / UTF2 / UTF3 / UTF4 ;; any UTF-8 encoded Unicode - * character except NUL - */ - - String[] tokens; - try { - tokens = new String(response, "UTF-8").split("\u0000"); - } catch (UnsupportedEncodingException e) { - throw new SaslException("UTF-8 encoding not supported", e); - } - if (tokens.length != 3) - throw new SaslException("Invalid SASL/PLAIN response: expected 3 tokens, got " + tokens.length); - authorizationID = tokens[0]; - String username = tokens[1]; - String password = tokens[2]; - - if (username.isEmpty()) { - throw new SaslException("Authentication failed: username not specified"); - } - if (password.isEmpty()) { - throw new SaslException("Authentication failed: password not specified"); - } - if (authorizationID.isEmpty()) - authorizationID = username; - - String aafResponse = "Not Verified"; + public static final String PLAIN_MECHANISM = "PLAIN"; + + private boolean complete; + private String authorizationId; + + + /** + * @throws SaslAuthenticationException if username/password combination is invalid or if the requested + * authorization id is not the same as username. + * <p> + * <b>Note:</b> This method may throw {@link SaslAuthenticationException} to provide custom error messages + * to clients. But care should be taken to avoid including any information in the exception message that + * should not be leaked to unauthenticated clients. It may be safer to throw {@link SaslException} in + * some cases so that a standard error message is returned to clients. + * </p> + */ + @Override + public byte[] evaluateResponse(byte[] responseBytes) throws SaslAuthenticationException { + /* + * Message format (from https://tools.ietf.org/html/rfc4616): + * + * message = [authzid] UTF8NUL authcid UTF8NUL passwd + * authcid = 1*SAFE ; MUST accept up to 255 octets + * authzid = 1*SAFE ; MUST accept up to 255 octets + * passwd = 1*SAFE ; MUST accept up to 255 octets + * UTF8NUL = %x00 ; UTF-8 encoded NUL character + * + * SAFE = UTF1 / UTF2 / UTF3 / UTF4 + * ;; any UTF-8 encoded Unicode character except NUL + */ + String response = new String(responseBytes, StandardCharsets.UTF_8); + List<String> tokens = extractTokens(response); + String authorizationIdFromClient = tokens.get(0); + String username = tokens.get(1); + String password = tokens.get(2); + + if (username.isEmpty()) { + throw new SaslAuthenticationException("Authentication failed: username not specified"); + } + if (password.isEmpty()) { + throw new SaslAuthenticationException("Authentication failed: password not specified"); + } + + String aafResponse = "Not Verified"; try { aafResponse = AuthorizationProviderFactory.getProviderFactory().getProvider().authenticate(username, password); } catch (Exception e) { } - if (null != aafResponse) { - throw new SaslException("Authentication failed: " + aafResponse + " User " + username); + throw new SaslAuthenticationException("Authentication failed: " + aafResponse + " User " + username); } - complete = true; - return new byte[0]; - } - - @Override - public String getAuthorizationID() { - if (!complete) - throw new IllegalStateException("Authentication exchange has not completed"); - return authorizationID; - } - - @Override - public String getMechanismName() { - return PLAIN_MECHANISM; - } - - @Override - public Object getNegotiatedProperty(String propName) { - if (!complete) - throw new IllegalStateException("Authentication exchange has not completed"); - return null; - } - - @Override - public boolean isComplete() { - return complete; - } - - @Override - public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException { - if (!complete) - throw new IllegalStateException("Authentication exchange has not completed"); - return Arrays.copyOfRange(incoming, offset, offset + len); - } - - @Override - public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException { - if (!complete) - throw new IllegalStateException("Authentication exchange has not completed"); - return Arrays.copyOfRange(outgoing, offset, offset + len); - } - - @Override - public void dispose() throws SaslException { - } - - public static class PlainSaslServerFactory1 implements SaslServerFactory { - - @Override - public SaslServer createSaslServer(String mechanism, String protocol, String serverName, Map<String, ?> props, - CallbackHandler cbh) throws SaslException { - - if (!PLAIN_MECHANISM.equals(mechanism)) - throw new SaslException( - String.format("Mechanism \'%s\' is not supported. Only PLAIN is supported.", mechanism)); - - return new PlainSaslServer1(); - } - @Override - public String[] getMechanismNames(Map<String, ?> props) { - if (props == null) - return new String[] { PLAIN_MECHANISM }; - String noPlainText = (String) props.get(Sasl.POLICY_NOPLAINTEXT); - if ("true".equals(noPlainText)) - return new String[] {}; - else - return new String[] { PLAIN_MECHANISM }; - } - } + if (!authorizationIdFromClient.isEmpty() && !authorizationIdFromClient.equals(username)) + throw new SaslAuthenticationException("Authentication failed: Client requested an authorization id that is different from username"); + + this.authorizationId = username; + + complete = true; + return new byte[0]; + } + + private List<String> extractTokens(String string) { + List<String> tokens = new ArrayList<>(); + int startIndex = 0; + for (int i = 0; i < 4; ++i) { + int endIndex = string.indexOf("\u0000", startIndex); + if (endIndex == -1) { + tokens.add(string.substring(startIndex)); + break; + } + tokens.add(string.substring(startIndex, endIndex)); + startIndex = endIndex + 1; + } + + if (tokens.size() != 3) + throw new SaslAuthenticationException("Invalid SASL/PLAIN response: expected 3 tokens, got " + + tokens.size()); + + return tokens; + } + + @Override + public String getAuthorizationID() { + if (!complete) + throw new IllegalStateException("Authentication exchange has not completed"); + return authorizationId; + } + + @Override + public String getMechanismName() { + return PLAIN_MECHANISM; + } + + @Override + public Object getNegotiatedProperty(String propName) { + if (!complete) + throw new IllegalStateException("Authentication exchange has not completed"); + return null; + } + + @Override + public boolean isComplete() { + return complete; + } + + @Override + public byte[] unwrap(byte[] incoming, int offset, int len) { + if (!complete) + throw new IllegalStateException("Authentication exchange has not completed"); + return Arrays.copyOfRange(incoming, offset, offset + len); + } + + @Override + public byte[] wrap(byte[] outgoing, int offset, int len) { + if (!complete) + throw new IllegalStateException("Authentication exchange has not completed"); + return Arrays.copyOfRange(outgoing, offset, offset + len); + } + + @Override + public void dispose() { + } + + public static class PlainSaslServerFactory1 implements SaslServerFactory { + + @Override + public SaslServer createSaslServer(String mechanism, String protocol, String serverName, Map<String, ?> props, CallbackHandler cbh) + throws SaslException { + + if (!PLAIN_MECHANISM.equals(mechanism)) + throw new SaslException(String.format("Mechanism \'%s\' is not supported. Only PLAIN is supported.", mechanism)); + + return new PlainSaslServer1(); + } + + @Override + public String[] getMechanismNames(Map<String, ?> props) { + if (props == null) return new String[]{PLAIN_MECHANISM}; + String noPlainText = (String) props.get(Sasl.POLICY_NOPLAINTEXT); + if ("true".equals(noPlainText)) + return new String[]{}; + else + return new String[]{PLAIN_MECHANISM}; + } + } } + diff --git a/src/main/java/org/onap/dmaap/kafkaAuthorize/PlainSaslServerProvider1.java b/src/main/java/org/onap/dmaap/kafkaAuthorize/PlainSaslServerProvider1.java index 16a11f4..441a023 100644 --- a/src/main/java/org/onap/dmaap/kafkaAuthorize/PlainSaslServerProvider1.java +++ b/src/main/java/org/onap/dmaap/kafkaAuthorize/PlainSaslServerProvider1.java @@ -35,6 +35,7 @@ public class PlainSaslServerProvider1 extends Provider { } public static void initialize() { - Security.addProvider(new PlainSaslServerProvider1()); + Security.insertProviderAt(new PlainSaslServerProvider1(),1); } } + |