From 525c7d8dbef106245c4fce29a5b6b31b7160b21c Mon Sep 17 00:00:00 2001 From: Shwetank Dave Date: Fri, 24 Aug 2018 13:24:49 -0400 Subject: Updated log messages to be more intuitive Added support for kafka Updated logging.clj file to not reject messages which contain an empty substring. Updating Dockerfile to use the specified GLIBC_VERSION Adding spotify docker plugin Moving the Dockerfile to directory root. Issue-ID: AAI-1542 Change-Id: I4c41486cb6c7698a2c2736d574721f6a5237563e Signed-off-by: Shwetank Dave --- Dockerfile | 45 ++++++++++++++++ dev/dev.clj | 17 +++++- devops/chameleon/Dockerfile | 45 ---------------- devops/chameleon/build-chameleon | 0 pom.xml | 39 ++++++++++++-- project.clj | 15 +++++- resources/chameleon_logback.xml | 10 ++-- resources/log/ChameleonMsgs.properties | 10 ++-- src/chameleon/aai_processor.clj | 2 +- src/chameleon/config.clj | 4 +- src/chameleon/event.clj | 36 ++++++++++++- src/chameleon/handler.clj | 14 ----- src/chameleon/kafka.clj | 97 ++++++++++++++++++++++++++++++++++ src/chameleon/logging.clj | 5 +- src/chameleon/route.clj | 5 +- src/chameleon/specs.clj | 2 +- 16 files changed, 261 insertions(+), 85 deletions(-) create mode 100644 Dockerfile delete mode 100644 devops/chameleon/Dockerfile mode change 100644 => 100755 devops/chameleon/build-chameleon create mode 100644 src/chameleon/kafka.clj diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..caf0bf5 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,45 @@ +FROM alpine:3.6 + +# Java Version +ENV JAVA_VERSION_MAJOR 8 +ENV JAVA_VERSION_MINOR 131 +ENV JAVA_VERSION_BUILD 11 +ENV JAVA_PACKAGE jre +ENV GLIBC_VERSION 2.28-r0 + +ENV JAVA_8_BASE_URL http://download.oracle.com/otn-pub/java/jdk/${JAVA_VERSION_MAJOR}u${JAVA_VERSION_MINOR}-b${JAVA_VERSION_BUILD}/d54c1d3a095b4ff2b6607d096fa80163/${JAVA_PACKAGE}-${JAVA_VERSION_MAJOR}u${JAVA_VERSION_MINOR} + +RUN apk update +RUN apk add curl + +# Install glibc (required for java) +RUN apk --no-cache add ca-certificates wget && \ + wget -q -O /etc/apk/keys/sgerrand.rsa.pub https://alpine-pkgs.sgerrand.com/sgerrand.rsa.pub && \ + wget https://github.com/sgerrand/alpine-pkg-glibc/releases/download/${GLIBC_VERSION}/glibc-${GLIBC_VERSION}.apk && \ + apk add glibc-${GLIBC_VERSION}.apk + +# Install Java +RUN mkdir /opt && \ + curl -jsSL -H "Cookie: oraclelicense=accept-securebackup-cookie" ${JAVA_8_BASE_URL}-linux-x64.tar.gz | tar -xzf - -C /opt && \ + ln -s /opt/${JAVA_PACKAGE}1.${JAVA_VERSION_MAJOR}.0_${JAVA_VERSION_MINOR} /opt/${JAVA_PACKAGE} && \ + cd /opt/${JAVA_PACKAGE}/ && rm -rf *src.zip lib/missioncontrol lib/visualvm lib/plugin.jar plugin lib/*javafx* lib/*jfx* lib/ext/jfxrt.jar bin/javaws lib/javaws.jar lib/desktop lib/deploy* lib/amd64/libdecora_sse.so lib/amd64/libprism_*.so lib/amd64/libfxplugins.so lib/amd64/libglass.so lib/amd64/libgstreamer-lite.so lib/amd64/libjavafx*.so lib/amd64/libjfx*. + +# Install Java Cryptography Extension (JCE) Unlimited Strength +RUN curl -jksSLH "Cookie: oraclelicense=accept-securebackup-cookie" -o /tmp/jce_policy.zip \ + http://download.oracle.com/otn-pub/java/jce/${JAVA_VERSION_MAJOR}/jce_policy-${JAVA_VERSION_MAJOR}.zip && \ + unzip -o -d /opt/${JAVA_PACKAGE}/lib/security /tmp/jce_policy.zip && rm -f /tmp/jce_policy.zip + +RUN apk del curl + +ENV PATH /opt/jre/bin:${PATH} + +RUN apk add zip openssh-keygen openssh-client + +EXPOSE 80 + +COPY ./devops/chameleon/docker-entrypoint.sh / +RUN chmod 700 /docker-entrypoint.sh +ENTRYPOINT ["/docker-entrypoint.sh"] + +RUN mkdir -p /opt/chameleon +COPY target/chameleon-0.1.0-jar-with-dependencies.jar /opt/chameleon diff --git a/dev/dev.clj b/dev/dev.clj index 51d0fdf..48b8f2e 100644 --- a/dev/dev.clj +++ b/dev/dev.clj @@ -11,10 +11,21 @@ [clojure.tools.namespace.repl :refer [refresh refresh-all disable-reload!]] [clojure.repl :refer [apropos dir doc find-doc pst source]] [clojure.test :refer [run-tests run-all-tests]] - [clojure.pprint :refer [pprint]])) + [clojure.pprint :refer [pprint]] + [chameleon.kafka :refer [deser]])) (disable-reload! (find-ns 'integrant.core)) + + + +(def kafka-config {"client.id" "chameleon.developer" + "bootstrap.servers" "host:port" + "key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer" + "value.deserializer" "org.apache.kafka.common.serialization.StringDeserializer" + "enable.auto.commit" "false" + "auto.offset.reset" "earliest"}) + (integrant.repl/set-prep! (constantly (config {:event-config {:aai {:host "localhost:3904" :topic "events" :motsid "" @@ -23,6 +34,8 @@ :consumer-id"chameleon1" :timeout 15000 :batch-size 8 - :type "HTTPAUTH"}} + :type "HTTPAUTH" + :kafka-config kafka-config} + :source :kafka} :gallifrey-host "localhost:443" :http-port 3449}))) diff --git a/devops/chameleon/Dockerfile b/devops/chameleon/Dockerfile deleted file mode 100644 index 97e9c3b..0000000 --- a/devops/chameleon/Dockerfile +++ /dev/null @@ -1,45 +0,0 @@ -FROM alpine:3.6 - -# Java Version -ENV JAVA_VERSION_MAJOR 8 -ENV JAVA_VERSION_MINOR 131 -ENV JAVA_VERSION_BUILD 11 -ENV JAVA_PACKAGE jre -ENV GLIBC_VERSION 2.25-r0 - -ENV JAVA_8_BASE_URL http://download.oracle.com/otn-pub/java/jdk/${JAVA_VERSION_MAJOR}u${JAVA_VERSION_MINOR}-b${JAVA_VERSION_BUILD}/d54c1d3a095b4ff2b6607d096fa80163/${JAVA_PACKAGE}-${JAVA_VERSION_MAJOR}u${JAVA_VERSION_MINOR} - -RUN apk update -RUN apk add curl - -# Install glibc (required for java) -RUN curl -jsSL -o /etc/apk/keys/sgerrand.rsa.pub https://raw.githubusercontent.com/sgerrand/alpine-pkg-glibc/master/sgerrand.rsa.pub && \ - curl -jsSL -O https://github.com/sgerrand/alpine-pkg-glibc/releases/download/${GLIBC_VERSION}/glibc-${GLIBC_VERSION}.apk && \ - apk add glibc-${GLIBC_VERSION}.apk && \ - rm -f glibc-${GLIBC_VERSION}.apk - -# Install Java -RUN mkdir /opt && \ - curl -jsSL -H "Cookie: oraclelicense=accept-securebackup-cookie" ${JAVA_8_BASE_URL}-linux-x64.tar.gz | tar -xzf - -C /opt && \ - ln -s /opt/${JAVA_PACKAGE}1.${JAVA_VERSION_MAJOR}.0_${JAVA_VERSION_MINOR} /opt/${JAVA_PACKAGE} && \ - cd /opt/${JAVA_PACKAGE}/ && rm -rf *src.zip lib/missioncontrol lib/visualvm lib/plugin.jar plugin lib/*javafx* lib/*jfx* lib/ext/jfxrt.jar bin/javaws lib/javaws.jar lib/desktop lib/deploy* lib/amd64/libdecora_sse.so lib/amd64/libprism_*.so lib/amd64/libfxplugins.so lib/amd64/libglass.so lib/amd64/libgstreamer-lite.so lib/amd64/libjavafx*.so lib/amd64/libjfx*. - -# Install Java Cryptography Extension (JCE) Unlimited Strength -RUN curl -jksSLH "Cookie: oraclelicense=accept-securebackup-cookie" -o /tmp/jce_policy.zip \ - http://download.oracle.com/otn-pub/java/jce/${JAVA_VERSION_MAJOR}/jce_policy-${JAVA_VERSION_MAJOR}.zip && \ - unzip -o -d /opt/${JAVA_PACKAGE}/lib/security /tmp/jce_policy.zip && rm -f /tmp/jce_policy.zip - -RUN apk del curl - -ENV PATH /opt/jre/bin:${PATH} - -RUN apk add zip openssh-keygen openssh-client - -EXPOSE 80 - -RUN mkdir -p /opt/chameleon -COPY chameleon.jar /opt/chameleon - -COPY ./docker-entrypoint.sh / -RUN chmod 700 /docker-entrypoint.sh -ENTRYPOINT ["/docker-entrypoint.sh"] diff --git a/devops/chameleon/build-chameleon b/devops/chameleon/build-chameleon old mode 100644 new mode 100755 diff --git a/pom.xml b/pom.xml index 1d9ab52..07fdebf 100644 --- a/pom.xml +++ b/pom.xml @@ -7,9 +7,7 @@ chameleon - 36b5671af2c3eec5ca81663382c4ca2898f79e55 - - + 33595bcc69c84291645bd60cc15bddea1255e298 src @@ -101,6 +99,27 @@ + + com.spotify + dockerfile-maven-plugin + 1.4.4 + + latest + ${docker.push.registry}/onap/chameleon + true + docker-hub + + + + default + + build + push + tag + + + + org.codehaus.mojo build-helper-maven-plugin @@ -136,7 +155,7 @@ clojars - https://clojars.org/repo/ + https://repo.clojars.org/ true @@ -194,6 +213,11 @@ clojure 1.9.0 + + org.clojure + core.async + 0.4.474 + com.7theta utilis @@ -257,7 +281,12 @@ org.onap.aai.event-client event-client-dmaap - 1.2.1 + 1.3.0 + + + org.onap.aai.event-client + event-client-kafka + 1.3.0 org.onap.aai.logging-service diff --git a/project.clj b/project.clj index 430173c..c59de69 100644 --- a/project.clj +++ b/project.clj @@ -1,5 +1,6 @@ (defproject chameleon "0.1.0" :dependencies [[org.clojure/clojure "1.9.0"] + [org.clojure/core.async "0.4.474"] [com.7theta/utilis "1.0.4"] [http-kit "2.2.0"] [ring/ring-core "1.6.3"] @@ -12,7 +13,8 @@ [clj-time "0.14.2"] [integrant "0.6.2"] [yogthos/config "0.9"] - [org.onap.aai.event-client/event-client-dmaap "1.2.1"] + [org.onap.aai.event-client/event-client-dmaap "1.3.0"] + [org.onap.aai.event-client/event-client-kafka "1.3.0"] [org.onap.aai.logging-service/common-logging "1.2.2"] [camel-snake-kebab "0.4.0"] [metosin/ring-http-response "0.9.0"] @@ -58,4 +60,13 @@ [:mainClass "chameleon.server"]]]) :executions ([:execution [:id "assemble"] [:phase "package"] - [:goals ([:goal "single"])]])}]]) + [:goals ([:goal "single"])]])}] + [com.spotify/dockerfile-maven-plugin "1.4.4" + {:configuration ([:tag "latest"] + [:repository "${docker.push.registry}/onap/chameleon"] + [:verbose true] + [:serverId "docker-hub"]) + :executions ([:execution [:id "default"] + [:goals ([:goal "build"] + [:goal "push"] + [:goal "tag"])]])}]]) diff --git a/resources/chameleon_logback.xml b/resources/chameleon_logback.xml index 5b723d7..b413012 100644 --- a/resources/chameleon_logback.xml +++ b/resources/chameleon_logback.xml @@ -1,5 +1,4 @@ - @@ -13,7 +12,7 @@ - + @@ -23,8 +22,7 @@ - + @@ -55,7 +53,7 @@ related logging events. The audit logger and appender are specializations of the EELF application root logger and appender. This can be used to segregate Policy engine events from other components, or it can be eliminated to record - these events as part of the application root log. --> + these events as part of the application root log. --> @@ -80,7 +78,7 @@ - + diff --git a/resources/log/ChameleonMsgs.properties b/resources/log/ChameleonMsgs.properties index 4e61e2a..91995ea 100644 --- a/resources/log/ChameleonMsgs.properties +++ b/resources/log/ChameleonMsgs.properties @@ -8,12 +8,16 @@ Attempt Gallifrey Assertion of operation {0} with type {1} and key {2} \ GALLIFREY_ASSERTED=\ CHM0002I|\ -Asserted type {0} and key {1} in Gallifrey\ +Gallifrey Assertion "{0}" for type "{1}" and key "{2}"\ RESPONSE=\ CHM0003I|\ -Response for operation "{0}" for endpoint "{1}" resulted in status "{2}" with body "{3}"\ +Response for operation "{0}" for endpoint "{1}" resulted in status {2} ; {3}\ CHAMELEON_REQUEST=\ CHM0004I|\ -Incoming Request of type "{0}" for endpoint "{1}" from address "{2}"\ \ No newline at end of file +Incoming Request of type {0} for endpoint {1} from address {2}\ + +ERROR=\ +CHM0001E|\ +ERROR ENCOUNTERED Caused By: "{0}" with message: {1}\ \ No newline at end of file diff --git a/src/chameleon/aai_processor.clj b/src/chameleon/aai_processor.clj index 4243b39..74c6da9 100644 --- a/src/chameleon/aai_processor.clj +++ b/src/chameleon/aai_processor.clj @@ -55,7 +55,7 @@ (defn from-spike "Transforms Spike-based event payloads to a format accepted by Gallifrey for vertices and relationships" [gallifrey-host payload & [error-logger audit-logger]] - (let [txpayload (map-keywords (parse-string payload)) + (let [txpayload (:body (parse-string payload true)) operation (:operation txpayload) parse-type (if (contains? txpayload :vertex) :vertex diff --git a/src/chameleon/config.clj b/src/chameleon/config.clj index 10324e2..d5a8406 100644 --- a/src/chameleon/config.clj +++ b/src/chameleon/config.clj @@ -15,8 +15,8 @@ :gallifrey-transformer from-gallifrey :loggers (ig/ref :chameleon/loggers)} :chameleon/aai-processor - {:provenance-attr "last-mod-source-of-truth" - :truth-attr "truth-time"} + {:provenance-attr :last-mod-source-of-truth + :truth-attr :truth-time} :chameleon/http-server {:port (:http-port app-config) :handler (ig/ref :chameleon/handler)}}] diff --git a/src/chameleon/event.clj b/src/chameleon/event.clj index 92f4211..2eddf85 100644 --- a/src/chameleon/event.clj +++ b/src/chameleon/event.clj @@ -1,11 +1,22 @@ (ns chameleon.event (:require [integrant.core :as ig] [clojure.string :refer [starts-with?]] - [chameleon.logging :as log]) - (:import [org.onap.aai.event.client DMaaPEventConsumer])) + [chameleon.logging :as log] + [clojure.core.async :as ca] + [chameleon.kafka :as ck]) + (:import [org.onap.aai.event.client DMaaPEventConsumer KafkaEventConsumer])) + +(declare from-dmaap from-kafka) (defmethod ig/init-key :chameleon/event [_ {:keys [event-config gallifrey-host loggers]}] + (case (:source event-config) + :dmaap (from-dmaap event-config gallifrey-host loggers) + :kafka (from-kafka event-config gallifrey-host loggers) + :error)) + +(defn- from-dmaap + [event-config gallifrey-host loggers] (let [{:keys [host topic motsid pass consumer-group consumer-id timeout batch-size type processor]} (:aai event-config) [error-logger audit-logger] loggers event-processor (DMaaPEventConsumer. host topic motsid pass consumer-group consumer-id timeout batch-size type)] @@ -23,3 +34,24 @@ (log/mdc-clear!))) (catch Exception e (println (str "Unexpected exception during processing: " (.getMessage e))))))))))))) + +(defn- from-kafka + [event-config gallifrey-host loggers] + (let [{:keys [topic consumer-group processor kafka-config]} (:aai event-config) + [error-logger audit-logger] loggers + kfc (ck/clj-kafka-consumer kafka-config consumer-group topic) + chan (ca/chan 5) + error-chan (ck/subscribe kfc chan 30000 "Polling-Kafka-Thread")] + (log/info error-logger "EVENT_PROCESSOR" + [(format "AAI created. Starting polling a KAFKA Topic '%s' on '%s'" topic (kafka-config "bootstrap.servers"))]) + (ca/go-loop [] + (let [recs (ca/ app-routes (wrap-defaults api-defaults) (log-reqs loggers))) - - -;;; Implementation - -(defn- serialize - [e] - (compact - (update e :_meta #(map-vals - (fn [m] - (map-vals str m)) %)))) - -(defn- de-serialize - [e] - e) diff --git a/src/chameleon/kafka.clj b/src/chameleon/kafka.clj new file mode 100644 index 0000000..ca730a1 --- /dev/null +++ b/src/chameleon/kafka.clj @@ -0,0 +1,97 @@ +(ns chameleon.kafka + (:refer-clojure :exclude [send]) + (:require [clojure.core.async :as ca]) + (:import [java.util Properties] + [org.apache.kafka.clients.consumer KafkaConsumer] + [org.apache.kafka.clients.consumer ConsumerRecord] + [org.apache.kafka.common.serialization StringDeserializer StringSerializer Serdes])) + +(declare consume run-away) + +(defrecord CljKafkaConsumer [config topic]) + + +;; Public +(def deser StringDeserializer) +(def ser StringSerializer) + +(defmacro run-away + [name & body] + `(try (.start (Thread. (fn [] ~@body) ~name)) + (catch OutOfMemoryError e# + (throw (ex-info {:error "Could not allocate resources for asynchronous execution"}))))) + +(defn clj-kafka-consumer + "Given a config, group-id, and a topic, return a CljKafkaConsumer" + [config group-id topic] + (CljKafkaConsumer. (assoc config "group.id" group-id) topic)) + +(defn subscribe + "Given a CljKafkaConsumer, a channel, and a session timeout (in + ms), return a channel. The input channel is where the messages will + be published." + ([{:keys [config topic] :as conf} channel session-timeout] + (subscribe conf channel session-timeout "Kafka-Subscriber-Thread")) + ([{:keys [config topic]} channel session-timeout thread-name] + (let [control-channel (ca/chan (ca/dropping-buffer 1))] + (run-away thread-name (consume config topic channel session-timeout control-channel)) + control-channel))) + +;; Private + +(defn- consumer-record->clojure + [consumer-record] + {:topic (.topic ^ConsumerRecord consumer-record) + :partition (.partition ^ConsumerRecord consumer-record) + :offset (.offset ^ConsumerRecord consumer-record) + :key (.key ^ConsumerRecord consumer-record) + :value (.value ^ConsumerRecord consumer-record)}) + +(defn- send-with-sla-timeout + "Consumer Records will always be a sequence. Try to put a message on + the channel before the sla-timeout. Return true if a message was put + on the channel or when there were no messages to put on the + channel. Return false if there is a timeout." + [chan sla-timeout messages] + (if (seq messages) + (let [sent? (ca/alt!! [[chan messages]] :sent + (ca/timeout sla-timeout) :timeout)] + (if (= sent? :timeout) + false + true)) + true)) + +(defn- as-properties + [m] + (reduce (fn [props [k v]] + (.setProperty ^Properties props k v) + props) + (Properties.) m)) + +(defn- consume + "Consume elements from a topic and put it on the given + channel. Block until a given timeout to put messages on the channel + or put sla-exception on the control channel (channel returned by + this function). Once the message is put on the channel, it is + considered safe to update the offset of the kafka consumer. The + `sla-timeout` is an agreement such that the data has to be taken off + the channel within the given timeout. If failed to take the data, + kaka consumer will be disconnected and an exception will be put on + the control channel." + [config topic chan sla-timeout control-channel] + (let [consumer (KafkaConsumer. ^Properties (as-properties config))] + (.subscribe ^KafkaConsumer consumer (list topic)) + (loop [] + (let [consumer-records (.poll consumer 0)] + (when (and (->> consumer-records + (map consumer-record->clojure) + (send-with-sla-timeout chan sla-timeout)) + (try (.commitSync consumer) + true + (catch Exception e + (ca/>!! control-channel (.getMessage e)) + false))) + (recur)))) + (ca/>!! control-channel :sla-timeout) + (.unsubscribe consumer) + (.close consumer))) diff --git a/src/chameleon/logging.clj b/src/chameleon/logging.clj index 15e1e7b..aa43974 100644 --- a/src/chameleon/logging.clj +++ b/src/chameleon/logging.clj @@ -76,7 +76,10 @@ :chameleon.specs/logger logger)] (if (empty? confirmed-specs) (.info logger (string->enum enum) (logfields fields) (into-array java.lang.String msgs)) - confirmed-specs))) + (.info logger (string->enum "ERROR") (logfields fields) (->> confirmed-specs + (into ["SPEC ERROR"]) + (mapv str) + (into-array java.lang.String)))))) (defn debug [^AaiLoggerAdapter logger ^String enum msgs] diff --git a/src/chameleon/route.clj b/src/chameleon/route.clj index 0918da4..707c608 100644 --- a/src/chameleon/route.clj +++ b/src/chameleon/route.clj @@ -59,6 +59,9 @@ "UPDATE" (assert-update! host actor type key body time) "DELETE" (assert-delete! host actor type key time)) {:keys [status body]} @g-assert] - (log/info e-logger "GALLIFREY_ASSERTED" [(str type) (str key)]) + (log/info e-logger "RESPONSE" (mapv str [operation key status body])) + (if (and (>= status 200) (<= status 299)) + (log/info e-logger "GALLIFREY_ASSERTED" ["SUCCEEDED" (str type) (str key)]) + (log/info e-logger "GALLIFREY_ASSERTED" ["FAILED" (str type) (str key)])) (log/info a-logger "RESPONSE" (mapv str [operation key status body]) :fields {:response-code status :response-description (hs/get-name status)}))) diff --git a/src/chameleon/specs.clj b/src/chameleon/specs.clj index 40d5768..630a4a8 100644 --- a/src/chameleon/specs.clj +++ b/src/chameleon/specs.clj @@ -67,5 +67,5 @@ ;; Logger specs (s/def ::logger (s/spec log/logger? :gen #(gen/return (log/error-logger "chameleon.specs")))) (s/def ::loggers (s/cat :e :chameleon.specs/logger :a :chameleon.specs/logger)) -(s/def :logging/msgs (s/* ::string)) +(s/def :logging/msgs (s/* string?)) (s/def :logging/valid-fields log/valid-logfields?) -- cgit 1.2.3-korg