aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Dockerfile (renamed from devops/chameleon/Dockerfile)18
-rw-r--r--dev/dev.clj17
-rwxr-xr-x[-rw-r--r--]devops/chameleon/build-chameleon0
-rw-r--r--pom.xml39
-rw-r--r--project.clj15
-rw-r--r--resources/chameleon_logback.xml10
-rw-r--r--resources/log/ChameleonMsgs.properties10
-rw-r--r--src/chameleon/aai_processor.clj2
-rw-r--r--src/chameleon/config.clj4
-rw-r--r--src/chameleon/event.clj36
-rw-r--r--src/chameleon/handler.clj14
-rw-r--r--src/chameleon/kafka.clj97
-rw-r--r--src/chameleon/logging.clj5
-rw-r--r--src/chameleon/route.clj5
-rw-r--r--src/chameleon/specs.clj2
15 files changed, 225 insertions, 49 deletions
diff --git a/devops/chameleon/Dockerfile b/Dockerfile
index 97e9c3b..caf0bf5 100644
--- a/devops/chameleon/Dockerfile
+++ b/Dockerfile
@@ -5,7 +5,7 @@ ENV JAVA_VERSION_MAJOR 8
ENV JAVA_VERSION_MINOR 131
ENV JAVA_VERSION_BUILD 11
ENV JAVA_PACKAGE jre
-ENV GLIBC_VERSION 2.25-r0
+ENV GLIBC_VERSION 2.28-r0
ENV JAVA_8_BASE_URL http://download.oracle.com/otn-pub/java/jdk/${JAVA_VERSION_MAJOR}u${JAVA_VERSION_MINOR}-b${JAVA_VERSION_BUILD}/d54c1d3a095b4ff2b6607d096fa80163/${JAVA_PACKAGE}-${JAVA_VERSION_MAJOR}u${JAVA_VERSION_MINOR}
@@ -13,10 +13,10 @@ RUN apk update
RUN apk add curl
# Install glibc (required for java)
-RUN curl -jsSL -o /etc/apk/keys/sgerrand.rsa.pub https://raw.githubusercontent.com/sgerrand/alpine-pkg-glibc/master/sgerrand.rsa.pub && \
- curl -jsSL -O https://github.com/sgerrand/alpine-pkg-glibc/releases/download/${GLIBC_VERSION}/glibc-${GLIBC_VERSION}.apk && \
- apk add glibc-${GLIBC_VERSION}.apk && \
- rm -f glibc-${GLIBC_VERSION}.apk
+RUN apk --no-cache add ca-certificates wget && \
+ wget -q -O /etc/apk/keys/sgerrand.rsa.pub https://alpine-pkgs.sgerrand.com/sgerrand.rsa.pub && \
+ wget https://github.com/sgerrand/alpine-pkg-glibc/releases/download/${GLIBC_VERSION}/glibc-${GLIBC_VERSION}.apk && \
+ apk add glibc-${GLIBC_VERSION}.apk
# Install Java
RUN mkdir /opt && \
@@ -37,9 +37,9 @@ RUN apk add zip openssh-keygen openssh-client
EXPOSE 80
-RUN mkdir -p /opt/chameleon
-COPY chameleon.jar /opt/chameleon
-
-COPY ./docker-entrypoint.sh /
+COPY ./devops/chameleon/docker-entrypoint.sh /
RUN chmod 700 /docker-entrypoint.sh
ENTRYPOINT ["/docker-entrypoint.sh"]
+
+RUN mkdir -p /opt/chameleon
+COPY target/chameleon-0.1.0-jar-with-dependencies.jar /opt/chameleon
diff --git a/dev/dev.clj b/dev/dev.clj
index 51d0fdf..48b8f2e 100644
--- a/dev/dev.clj
+++ b/dev/dev.clj
@@ -11,10 +11,21 @@
[clojure.tools.namespace.repl :refer [refresh refresh-all disable-reload!]]
[clojure.repl :refer [apropos dir doc find-doc pst source]]
[clojure.test :refer [run-tests run-all-tests]]
- [clojure.pprint :refer [pprint]]))
+ [clojure.pprint :refer [pprint]]
+ [chameleon.kafka :refer [deser]]))
(disable-reload! (find-ns 'integrant.core))
+
+
+
+(def kafka-config {"client.id" "chameleon.developer"
+ "bootstrap.servers" "host:port"
+ "key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"
+ "value.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"
+ "enable.auto.commit" "false"
+ "auto.offset.reset" "earliest"})
+
(integrant.repl/set-prep! (constantly (config {:event-config {:aai {:host "localhost:3904"
:topic "events"
:motsid ""
@@ -23,6 +34,8 @@
:consumer-id"chameleon1"
:timeout 15000
:batch-size 8
- :type "HTTPAUTH"}}
+ :type "HTTPAUTH"
+ :kafka-config kafka-config}
+ :source :kafka}
:gallifrey-host "localhost:443"
:http-port 3449})))
diff --git a/devops/chameleon/build-chameleon b/devops/chameleon/build-chameleon
index c588e95..c588e95 100644..100755
--- a/devops/chameleon/build-chameleon
+++ b/devops/chameleon/build-chameleon
diff --git a/pom.xml b/pom.xml
index 1d9ab52..07fdebf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -7,9 +7,7 @@
<name>chameleon</name>
<description/>
<scm>
- <tag>36b5671af2c3eec5ca81663382c4ca2898f79e55
-</tag>
- <url/>
+ <tag>33595bcc69c84291645bd60cc15bddea1255e298</tag>
</scm>
<build>
<sourceDirectory>src</sourceDirectory>
@@ -102,6 +100,27 @@
</executions>
</plugin>
<plugin>
+ <groupId>com.spotify</groupId>
+ <artifactId>dockerfile-maven-plugin</artifactId>
+ <version>1.4.4</version>
+ <configuration>
+ <tag>latest</tag>
+ <repository>${docker.push.registry}/onap/chameleon</repository>
+ <verbose>true</verbose>
+ <serverId>docker-hub</serverId>
+ </configuration>
+ <executions>
+ <execution>
+ <id>default</id>
+ <goals>
+ <goal>build</goal>
+ <goal>push</goal>
+ <goal>tag</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.7</version>
@@ -136,7 +155,7 @@
</repository>
<repository>
<id>clojars</id>
- <url>https://clojars.org/repo/</url>
+ <url>https://repo.clojars.org/</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
@@ -195,6 +214,11 @@
<version>1.9.0</version>
</dependency>
<dependency>
+ <groupId>org.clojure</groupId>
+ <artifactId>core.async</artifactId>
+ <version>0.4.474</version>
+ </dependency>
+ <dependency>
<groupId>com.7theta</groupId>
<artifactId>utilis</artifactId>
<version>1.0.4</version>
@@ -257,7 +281,12 @@
<dependency>
<groupId>org.onap.aai.event-client</groupId>
<artifactId>event-client-dmaap</artifactId>
- <version>1.2.1</version>
+ <version>1.3.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.onap.aai.event-client</groupId>
+ <artifactId>event-client-kafka</artifactId>
+ <version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.onap.aai.logging-service</groupId>
diff --git a/project.clj b/project.clj
index 430173c..c59de69 100644
--- a/project.clj
+++ b/project.clj
@@ -1,5 +1,6 @@
(defproject chameleon "0.1.0"
:dependencies [[org.clojure/clojure "1.9.0"]
+ [org.clojure/core.async "0.4.474"]
[com.7theta/utilis "1.0.4"]
[http-kit "2.2.0"]
[ring/ring-core "1.6.3"]
@@ -12,7 +13,8 @@
[clj-time "0.14.2"]
[integrant "0.6.2"]
[yogthos/config "0.9"]
- [org.onap.aai.event-client/event-client-dmaap "1.2.1"]
+ [org.onap.aai.event-client/event-client-dmaap "1.3.0"]
+ [org.onap.aai.event-client/event-client-kafka "1.3.0"]
[org.onap.aai.logging-service/common-logging "1.2.2"]
[camel-snake-kebab "0.4.0"]
[metosin/ring-http-response "0.9.0"]
@@ -58,4 +60,13 @@
[:mainClass "chameleon.server"]]])
:executions ([:execution [:id "assemble"]
[:phase "package"]
- [:goals ([:goal "single"])]])}]])
+ [:goals ([:goal "single"])]])}]
+ [com.spotify/dockerfile-maven-plugin "1.4.4"
+ {:configuration ([:tag "latest"]
+ [:repository "${docker.push.registry}/onap/chameleon"]
+ [:verbose true]
+ [:serverId "docker-hub"])
+ :executions ([:execution [:id "default"]
+ [:goals ([:goal "build"]
+ [:goal "push"]
+ [:goal "tag"])]])}]])
diff --git a/resources/chameleon_logback.xml b/resources/chameleon_logback.xml
index 5b723d7..b413012 100644
--- a/resources/chameleon_logback.xml
+++ b/resources/chameleon_logback.xml
@@ -1,5 +1,4 @@
<configuration scan="true" scanPeriod="3 seconds" debug="false">
- <!--<jmxConfigurator /> -->
<!-- directory path for all other type logs -->
<property name="logDir" value="/opt/chameleon/logs" />
@@ -13,7 +12,7 @@
<property name="auditLogName" value="audit" />
<property name="debugLogName" value="debug" />
- <property name="errorLogPattern" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX}|%mdc{RequestId}|%thread|DataRouter|%mdc{PartnerName}|%logger||%.-5level|%msg%n" />
+ <property name="errorLogPattern" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX}|%mdc{RequestId}|%thread|CHAMELEON|%mdc{PartnerName}|%logger||%.-5level|%msg%n" />
<property name="auditMetricPattern" value="%msg%n" />
@@ -23,8 +22,7 @@
<!-- EELF Appenders -->
<!-- ============================================================================ -->
- <!-- The EELFAppender is used to record events to the general application
- log -->
+ <!-- The EELFAppender is used to record events to the general application log -->
<appender name="EELF"
class="ch.qos.logback.core.rolling.RollingFileAppender">
@@ -55,7 +53,7 @@
related logging events. The audit logger and appender are specializations
of the EELF application root logger and appender. This can be used to segregate
Policy engine events from other components, or it can be eliminated to record
- these events as part of the application root log. -->
+ these events as part of the application root log. -->
<appender name="EELFAudit"
class="ch.qos.logback.core.rolling.RollingFileAppender">
@@ -80,7 +78,7 @@
<!-- EELF loggers -->
<!-- ============================================================================ -->
- <!-- CRUD Service loggers -->
+ <!-- CHAMELEON Service loggers -->
<logger name="chameleon" level="INFO">
<appender-ref ref="asyncEELF" />
</logger>
diff --git a/resources/log/ChameleonMsgs.properties b/resources/log/ChameleonMsgs.properties
index 4e61e2a..91995ea 100644
--- a/resources/log/ChameleonMsgs.properties
+++ b/resources/log/ChameleonMsgs.properties
@@ -8,12 +8,16 @@ Attempt Gallifrey Assertion of operation {0} with type {1} and key {2} \
GALLIFREY_ASSERTED=\
CHM0002I|\
-Asserted type {0} and key {1} in Gallifrey\
+Gallifrey Assertion "{0}" for type "{1}" and key "{2}"\
RESPONSE=\
CHM0003I|\
-Response for operation "{0}" for endpoint "{1}" resulted in status "{2}" with body "{3}"\
+Response for operation "{0}" for endpoint "{1}" resulted in status {2} ; {3}\
CHAMELEON_REQUEST=\
CHM0004I|\
-Incoming Request of type "{0}" for endpoint "{1}" from address "{2}"\ \ No newline at end of file
+Incoming Request of type {0} for endpoint {1} from address {2}\
+
+ERROR=\
+CHM0001E|\
+ERROR ENCOUNTERED Caused By: "{0}" with message: {1}\ \ No newline at end of file
diff --git a/src/chameleon/aai_processor.clj b/src/chameleon/aai_processor.clj
index 4243b39..74c6da9 100644
--- a/src/chameleon/aai_processor.clj
+++ b/src/chameleon/aai_processor.clj
@@ -55,7 +55,7 @@
(defn from-spike
"Transforms Spike-based event payloads to a format accepted by Gallifrey for vertices and relationships"
[gallifrey-host payload & [error-logger audit-logger]]
- (let [txpayload (map-keywords (parse-string payload))
+ (let [txpayload (:body (parse-string payload true))
operation (:operation txpayload)
parse-type (if (contains? txpayload :vertex)
:vertex
diff --git a/src/chameleon/config.clj b/src/chameleon/config.clj
index 10324e2..d5a8406 100644
--- a/src/chameleon/config.clj
+++ b/src/chameleon/config.clj
@@ -15,8 +15,8 @@
:gallifrey-transformer from-gallifrey
:loggers (ig/ref :chameleon/loggers)}
:chameleon/aai-processor
- {:provenance-attr "last-mod-source-of-truth"
- :truth-attr "truth-time"}
+ {:provenance-attr :last-mod-source-of-truth
+ :truth-attr :truth-time}
:chameleon/http-server
{:port (:http-port app-config)
:handler (ig/ref :chameleon/handler)}}]
diff --git a/src/chameleon/event.clj b/src/chameleon/event.clj
index 92f4211..2eddf85 100644
--- a/src/chameleon/event.clj
+++ b/src/chameleon/event.clj
@@ -1,11 +1,22 @@
(ns chameleon.event
(:require [integrant.core :as ig]
[clojure.string :refer [starts-with?]]
- [chameleon.logging :as log])
- (:import [org.onap.aai.event.client DMaaPEventConsumer]))
+ [chameleon.logging :as log]
+ [clojure.core.async :as ca]
+ [chameleon.kafka :as ck])
+ (:import [org.onap.aai.event.client DMaaPEventConsumer KafkaEventConsumer]))
+
+(declare from-dmaap from-kafka)
(defmethod ig/init-key :chameleon/event
[_ {:keys [event-config gallifrey-host loggers]}]
+ (case (:source event-config)
+ :dmaap (from-dmaap event-config gallifrey-host loggers)
+ :kafka (from-kafka event-config gallifrey-host loggers)
+ :error))
+
+(defn- from-dmaap
+ [event-config gallifrey-host loggers]
(let [{:keys [host topic motsid pass consumer-group consumer-id timeout batch-size type processor]} (:aai event-config)
[error-logger audit-logger] loggers
event-processor (DMaaPEventConsumer. host topic motsid pass consumer-group consumer-id timeout batch-size type)]
@@ -23,3 +34,24 @@
(log/mdc-clear!)))
(catch Exception e
(println (str "Unexpected exception during processing: " (.getMessage e)))))))))))))
+
+(defn- from-kafka
+ [event-config gallifrey-host loggers]
+ (let [{:keys [topic consumer-group processor kafka-config]} (:aai event-config)
+ [error-logger audit-logger] loggers
+ kfc (ck/clj-kafka-consumer kafka-config consumer-group topic)
+ chan (ca/chan 5)
+ error-chan (ck/subscribe kfc chan 30000 "Polling-Kafka-Thread")]
+ (log/info error-logger "EVENT_PROCESSOR"
+ [(format "AAI created. Starting polling a KAFKA Topic '%s' on '%s'" topic (kafka-config "bootstrap.servers"))])
+ (ca/go-loop []
+ (let [recs (ca/<! chan)]
+ (log/mdc-init! "SPIKE-EVENT" "CHAMELEON" "" "" gallifrey-host)
+ (if recs
+ (do (doseq [r recs]
+ (log/info error-logger "EVENT_PROCESSOR" [(str "Processing " (:value r))])
+ (processor gallifrey-host (:value r) error-logger audit-logger)
+ (log/info error-logger "EVENT_PROCESSOR" [(str "Processed Message " (:value r))])
+ (log/mdc-clear!))
+ (recur))
+ (ca/<! error-chan))))))
diff --git a/src/chameleon/handler.clj b/src/chameleon/handler.clj
index 675a34b..453aba6 100644
--- a/src/chameleon/handler.clj
+++ b/src/chameleon/handler.clj
@@ -78,17 +78,3 @@
(-> app-routes
(wrap-defaults api-defaults)
(log-reqs loggers)))
-
-
-;;; Implementation
-
-(defn- serialize
- [e]
- (compact
- (update e :_meta #(map-vals
- (fn [m]
- (map-vals str m)) %))))
-
-(defn- de-serialize
- [e]
- e)
diff --git a/src/chameleon/kafka.clj b/src/chameleon/kafka.clj
new file mode 100644
index 0000000..ca730a1
--- /dev/null
+++ b/src/chameleon/kafka.clj
@@ -0,0 +1,97 @@
+(ns chameleon.kafka
+ (:refer-clojure :exclude [send])
+ (:require [clojure.core.async :as ca])
+ (:import [java.util Properties]
+ [org.apache.kafka.clients.consumer KafkaConsumer]
+ [org.apache.kafka.clients.consumer ConsumerRecord]
+ [org.apache.kafka.common.serialization StringDeserializer StringSerializer Serdes]))
+
+(declare consume run-away)
+
+(defrecord CljKafkaConsumer [config topic])
+
+
+;; Public
+(def deser StringDeserializer)
+(def ser StringSerializer)
+
+(defmacro run-away
+ [name & body]
+ `(try (.start (Thread. (fn [] ~@body) ~name))
+ (catch OutOfMemoryError e#
+ (throw (ex-info {:error "Could not allocate resources for asynchronous execution"})))))
+
+(defn clj-kafka-consumer
+ "Given a config, group-id, and a topic, return a CljKafkaConsumer"
+ [config group-id topic]
+ (CljKafkaConsumer. (assoc config "group.id" group-id) topic))
+
+(defn subscribe
+ "Given a CljKafkaConsumer, a channel, and a session timeout (in
+ ms), return a channel. The input channel is where the messages will
+ be published."
+ ([{:keys [config topic] :as conf} channel session-timeout]
+ (subscribe conf channel session-timeout "Kafka-Subscriber-Thread"))
+ ([{:keys [config topic]} channel session-timeout thread-name]
+ (let [control-channel (ca/chan (ca/dropping-buffer 1))]
+ (run-away thread-name (consume config topic channel session-timeout control-channel))
+ control-channel)))
+
+;; Private
+
+(defn- consumer-record->clojure
+ [consumer-record]
+ {:topic (.topic ^ConsumerRecord consumer-record)
+ :partition (.partition ^ConsumerRecord consumer-record)
+ :offset (.offset ^ConsumerRecord consumer-record)
+ :key (.key ^ConsumerRecord consumer-record)
+ :value (.value ^ConsumerRecord consumer-record)})
+
+(defn- send-with-sla-timeout
+ "Consumer Records will always be a sequence. Try to put a message on
+ the channel before the sla-timeout. Return true if a message was put
+ on the channel or when there were no messages to put on the
+ channel. Return false if there is a timeout."
+ [chan sla-timeout messages]
+ (if (seq messages)
+ (let [sent? (ca/alt!! [[chan messages]] :sent
+ (ca/timeout sla-timeout) :timeout)]
+ (if (= sent? :timeout)
+ false
+ true))
+ true))
+
+(defn- as-properties
+ [m]
+ (reduce (fn [props [k v]]
+ (.setProperty ^Properties props k v)
+ props)
+ (Properties.) m))
+
+(defn- consume
+ "Consume elements from a topic and put it on the given
+ channel. Block until a given timeout to put messages on the channel
+ or put sla-exception on the control channel (channel returned by
+ this function). Once the message is put on the channel, it is
+ considered safe to update the offset of the kafka consumer. The
+ `sla-timeout` is an agreement such that the data has to be taken off
+ the channel within the given timeout. If failed to take the data,
+ kaka consumer will be disconnected and an exception will be put on
+ the control channel."
+ [config topic chan sla-timeout control-channel]
+ (let [consumer (KafkaConsumer. ^Properties (as-properties config))]
+ (.subscribe ^KafkaConsumer consumer (list topic))
+ (loop []
+ (let [consumer-records (.poll consumer 0)]
+ (when (and (->> consumer-records
+ (map consumer-record->clojure)
+ (send-with-sla-timeout chan sla-timeout))
+ (try (.commitSync consumer)
+ true
+ (catch Exception e
+ (ca/>!! control-channel (.getMessage e))
+ false)))
+ (recur))))
+ (ca/>!! control-channel :sla-timeout)
+ (.unsubscribe consumer)
+ (.close consumer)))
diff --git a/src/chameleon/logging.clj b/src/chameleon/logging.clj
index 15e1e7b..aa43974 100644
--- a/src/chameleon/logging.clj
+++ b/src/chameleon/logging.clj
@@ -76,7 +76,10 @@
:chameleon.specs/logger logger)]
(if (empty? confirmed-specs)
(.info logger (string->enum enum) (logfields fields) (into-array java.lang.String msgs))
- confirmed-specs)))
+ (.info logger (string->enum "ERROR") (logfields fields) (->> confirmed-specs
+ (into ["SPEC ERROR"])
+ (mapv str)
+ (into-array java.lang.String))))))
(defn debug
[^AaiLoggerAdapter logger ^String enum msgs]
diff --git a/src/chameleon/route.clj b/src/chameleon/route.clj
index 0918da4..707c608 100644
--- a/src/chameleon/route.clj
+++ b/src/chameleon/route.clj
@@ -59,6 +59,9 @@
"UPDATE" (assert-update! host actor type key body time)
"DELETE" (assert-delete! host actor type key time))
{:keys [status body]} @g-assert]
- (log/info e-logger "GALLIFREY_ASSERTED" [(str type) (str key)])
+ (log/info e-logger "RESPONSE" (mapv str [operation key status body]))
+ (if (and (>= status 200) (<= status 299))
+ (log/info e-logger "GALLIFREY_ASSERTED" ["SUCCEEDED" (str type) (str key)])
+ (log/info e-logger "GALLIFREY_ASSERTED" ["FAILED" (str type) (str key)]))
(log/info a-logger "RESPONSE" (mapv str [operation key status body])
:fields {:response-code status :response-description (hs/get-name status)})))
diff --git a/src/chameleon/specs.clj b/src/chameleon/specs.clj
index 40d5768..630a4a8 100644
--- a/src/chameleon/specs.clj
+++ b/src/chameleon/specs.clj
@@ -67,5 +67,5 @@
;; Logger specs
(s/def ::logger (s/spec log/logger? :gen #(gen/return (log/error-logger "chameleon.specs"))))
(s/def ::loggers (s/cat :e :chameleon.specs/logger :a :chameleon.specs/logger))
-(s/def :logging/msgs (s/* ::string))
+(s/def :logging/msgs (s/* string?))
(s/def :logging/valid-fields log/valid-logfields?)