summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.gitreview4
-rw-r--r--docs/conf.py94
-rw-r--r--docs/requirements-docs.txt18
-rw-r--r--docs/tox.ini18
-rw-r--r--pom.xml2
-rw-r--r--releases/1.4.0-container.yaml8
-rw-r--r--src/main/ajsc/dmaap_v1/dmaap/v1/conf/jaxrsBeans.groovy22
-rw-r--r--src/main/ajsc/dmaap_v1/dmaap/v1/conf/serviceBeans.xml109
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java176
-rw-r--r--src/test/java/org/onap/dmaap/mr/cambria/beans/DMaaPKafkaMetaBrokerTest.java15
-rw-r--r--version.properties4
11 files changed, 204 insertions, 266 deletions
diff --git a/.gitreview b/.gitreview
new file mode 100644
index 0000000..c1492a9
--- /dev/null
+++ b/.gitreview
@@ -0,0 +1,4 @@
+[gerrit]
+host=gerrit.onap.org
+port=29418
+project=dmaap/messagerouter/messageservice
diff --git a/docs/conf.py b/docs/conf.py
index 8f01b44..bd96005 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -1,103 +1,9 @@
from docs_conf.conf import *
-branch = 'latest'
-doc_url = 'https://docs.onap.org/projects'
master_doc = 'index'
intersphinx_mapping = {}
-# Latest (change to branch)
-intersphinx_mapping['onap-aai-aai-common'] = ('{}/onap-aai-aai-common/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-aai-sparky-be'] = ('{}/onap-aai-sparky-be/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-ccsdk-cds'] = ('{}/onap-ccsdk-cds/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-ccsdk-features'] = ('{}/onap-ccsdk-features/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-ccsdk-distribution'] = ('{}/onap-ccsdk-distribution/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-ccsdk-oran'] = ('{}/onap-ccsdk-oran/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-cli'] = ('{}/onap-cli/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-cps'] = ('{}/onap-cps/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-dcaegen2'] = ('{}/onap-dcaegen2/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-dmaap-messagerouter-messageservice'] = (
- '{}/onap-dmaap-messagerouter-messageservice/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-dmaap-buscontroller'] = ('{}/onap-dmaap-buscontroller/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-dmaap-datarouter'] = ('{}/onap-dmaap-datarouter/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-dmaap-dbcapi'] = ('{}/onap-dmaap-dbcapi/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-externalapi-nbi'] = ('{}/onap-externalapi-nbi/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-holmes-engine-management'] = (
- '{}/onap-holmes-engine-management/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-holmes-rule-management'] = (
- '{}/onap-holmes-rule-management/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-integration'] = ('{}/onap-integration/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-modeling-etsicatalog'] = ('{}/onap-modeling-etsicatalog/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-modeling-modelspec'] = ('{}/onap-modeling-modelspec/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-multicloud-framework'] = ('{}/onap-multicloud-framework/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-msb-apigateway'] = ('{}/onap-msb-apigateway/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-oom'] = ('{}/onap-oom/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-oom-offline-installer'] = ('{}/onap-oom-offline-installer/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-oom-platform-cert-service'] = (
- '{}/onap-oom-platform-cert-service/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-optf-cmso'] = ('{}/onap-optf-cmso/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-optf-osdf'] = ('{}/onap-optf-osdf/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-optf-has'] = ('{}/onap-optf-has/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-policy-clamp'] = ('{}/onap-policy-clamp/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-policy-parent'] = ('{}/onap-policy-parent/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-sdc'] = ('{}/onap-sdc/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-sdnc-oam'] = ('{}/onap-sdnc-oam/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-so'] = ('{}/onap-so/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-usecase-ui'] = ('{}/onap-usecase-ui/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-vfc-nfvo-lcm'] = ('{}/onap-vfc-nfvo-lcm/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-vid'] = ('{}/onap-vid/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-vnfrqts-guidelines'] = ('{}/onap-vnfrqts-guidelines/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-vnfrqts-requirements'] = ('{}/onap-vnfrqts-requirements/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-vnfrqts-testcases'] = ('{}/onap-vnfrqts-testcases/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-vnfrqts-usecases'] = ('{}/onap-vnfrqts-usecases/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-vnfsdk-model'] = ('{}/onap-vnfsdk-model/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-vvp-documentation'] = ('{}/onap-vvp-documentation/en/%s'.format(doc_url) % branch, None)
-
-# Guilin
-branch = 'guilin'
-intersphinx_mapping['onap-portal'] = ('{}/onap-portal/en/%s'.format(doc_url) % branch, None)
-
-# Frankfurt
-branch = 'frankfurt'
-intersphinx_mapping['onap-appc'] = ('{}/onap-appc/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-appc-deployment'] = ('{}/onap-appc-deployment/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-music'] = ('{}/onap-music/en/%s'.format(doc_url) % branch, None)
-
-# Latest
-branch = 'latest'
-intersphinx_mapping['onap-aaf-authz'] = ('{}/onap-aaf-authz/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-aaf-sms'] = ('{}/onap-aaf-sms/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-aai-event-client'] = ('{}/onap-aai-event-client/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-aai-esr-gui'] = ('{}/onap-aai-esr-gui/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-aai-esr-server'] = ('{}/onap-aai-esr-server/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-ccsdk-apps'] = ('{}/onap-ccsdk-apps/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-ccsdk-dashboard'] = ('{}/onap-ccsdk-dashboard/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-ccsdk-platform-plugins'] = (
- '{}/onap-ccsdk-platform-plugins/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-logging-analytics'] = ('{}/onap-logging-analytics/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-logging-analytics-pomba-pomba-audit-common'] = (
- '{}/onap-logging-analytics-pomba-pomba-audit-common/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-modeling-toscaparsers'] = (
- '{}/onap-modeling-toscaparsers/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-msb-discovery'] = ('{}/onap-msb-discovery/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-msb-java-sdk'] = ('{}/onap-msb-java-sdk/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-msb-swagger-sdk'] = ('{}/onap-msb-swagger-sdk/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-multicloud-azure'] = ('{}/onap-multicloud-azure/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-multicloud-k8s'] = ('{}/onap-multicloud-k8s/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-music-distributed-kv-store'] = (
- '{}/onap-music-distributed-kv-store/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-oparent-cia'] = ('{}/onap-oparent-cia/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-osa'] = ('{}/onap-osa/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-sdc-sdc-distribution-client'] = (
- '{}/onap-sdc-sdc-distribution-client/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-sdc-sdc-workflow-designer'] = (
- '{}/onap-sdc-sdc-workflow-designer/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-sdc-sdc-tosca'] = ('{}/onap-sdc-sdc-tosca/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-sdc-sdc-docker-base'] = ('{}/onap-sdc-sdc-docker-base/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-so-libs'] = ('{}/onap-so-libs/en/%s'.format(doc_url) % branch, None)
-intersphinx_mapping['onap-vfc-nfvo-driver-vnfm-svnfm'] = (
- '{}/onap-vfc-nfvo-driver-vnfm-svnfm/en/%s'.format(doc_url) % branch, None)
-
linkcheck_ignore = [
'http://localhost',
'https://example.com',
diff --git a/docs/requirements-docs.txt b/docs/requirements-docs.txt
index 59387aa..3b3441a 100644
--- a/docs/requirements-docs.txt
+++ b/docs/requirements-docs.txt
@@ -1,15 +1,3 @@
-tox
-Sphinx==4.2.0
-doc8
-docutils
-setuptools
-six
-lfdocs-conf~=0.7.5
-funcparserlib~=1.0.0a0
-sphinxcontrib.blockdiag~=2.0.0
-sphinxcontrib-needs<0.6.0
-sphinxcontrib.plantuml~=0.21
-sphinxcontrib.nwdiag~=2.0.0
-sphinxcontrib-seqdiag~=2.0.0
-sphinxcontrib-swaggerdoc~=0.1.7
-sphinx-rtd-theme~=1.0.0 \ No newline at end of file
+lfdocs-conf
+sphinx>=4.2.0 # BSD
+sphinx-rtd-theme>=1.0.0 # MIT
diff --git a/docs/tox.ini b/docs/tox.ini
index edac8c3..7c0973c 100644
--- a/docs/tox.ini
+++ b/docs/tox.ini
@@ -5,18 +5,22 @@ skipsdist = true
[testenv:docs]
basepython = python3
-deps = -r{toxinidir}/requirements-docs.txt
+deps =
+ -r{toxinidir}/requirements-docs.txt
+ -chttps://git.onap.org/doc/plain/etc/upper-constraints.os.txt
+ -chttps://git.onap.org/doc/plain/etc/upper-constraints.onap.txt
+whitelist_externals =
+ echo
commands =
sphinx-build -b html -n -d {envtmpdir}/doctrees ./ {toxinidir}/_build/html
echo "Generated docs available in {toxinidir}/_build/html"
-whitelist_externals =
- echo
- git
- sh
[testenv:docs-linkcheck]
basepython = python3
-#deps = -r{toxinidir}/requirements-docs.txt
+# deps =
+# -r{toxinidir}/requirements-docs.txt
+# -chttps://git.onap.org/doc/plain/etc/upper-constraints.os.txt
+# -chttps://git.onap.org/doc/plain/etc/upper-constraints.onap.txt
commands = echo "Link Checking not enforced"
-#commands = sphinx-build -b linkcheck -d {envtmpdir}/doctrees ./ {toxinidir}/_build/linkcheck
+# commands = sphinx-build -b linkcheck -d {envtmpdir}/doctrees ./ {toxinidir}/_build/linkcheck
whitelist_externals = echo
diff --git a/pom.xml b/pom.xml
index e66feda..9a3f661 100644
--- a/pom.xml
+++ b/pom.xml
@@ -23,7 +23,7 @@
<groupId>org.onap.dmaap.messagerouter.messageservice</groupId>
<artifactId>dmaapMR1</artifactId>
- <version>1.3.2-SNAPSHOT</version>
+ <version>1.4.0-SNAPSHOT</version>
<name>dmaap-messagerouter-messageservice</name>
<description>Message Router - Restful interface built for kafka</description>
<licenses>
diff --git a/releases/1.4.0-container.yaml b/releases/1.4.0-container.yaml
new file mode 100644
index 0000000..f38799b
--- /dev/null
+++ b/releases/1.4.0-container.yaml
@@ -0,0 +1,8 @@
+distribution_type: 'container'
+container_release_tag: '1.4.0'
+project: 'dmaap-messagerouter-docker'
+log-dir: 'dmaap-messagerouter-messageservice-java-maven-docker-stage-master/2'
+ref: e6afd43dbc9d4f5979bbc9dc1309d826aa8cf58d
+containers:
+ - name: dmaap/dmaap-mr
+ version: 1.4.0-STAGING-20220504T020911Z \ No newline at end of file
diff --git a/src/main/ajsc/dmaap_v1/dmaap/v1/conf/jaxrsBeans.groovy b/src/main/ajsc/dmaap_v1/dmaap/v1/conf/jaxrsBeans.groovy
index 0841a3a..e09c538 100644
--- a/src/main/ajsc/dmaap_v1/dmaap/v1/conf/jaxrsBeans.groovy
+++ b/src/main/ajsc/dmaap_v1/dmaap/v1/conf/jaxrsBeans.groovy
@@ -3,20 +3,20 @@
xmlns jaxrs: "http://cxf.apache.org/jaxrs"
xmlns util: "http://www.springframework.org/schema/util"
- echoService(org.onap.dmaap.JaxrsEchoService)
- userService(org.onap.dmaap.JaxrsUserService)
+// echoService(org.onap.dmaap.JaxrsEchoService)
+// userService(org.onap.dmaap.JaxrsUserService)
topicService(org.onap.dmaap.service.TopicRestService)
eventService(org.onap.dmaap.service.EventsRestService)
- adminService(org.onap.dmaap.service.AdminRestService)
+// adminService(org.onap.dmaap.service.AdminRestService)
apiKeyService(org.onap.dmaap.service.ApiKeysRestService)
metricsService(org.onap.dmaap.service.MetricsRestService)
- transactionService(org.onap.dmaap.service.TransactionRestService)
- UIService(org.onap.dmaap.service.UIRestServices)
- mirrorService(org.onap.dmaap.service.MMRestService)
+// transactionService(org.onap.dmaap.service.TransactionRestService)
+// UIService(org.onap.dmaap.service.UIRestServices)
+// mirrorService(org.onap.dmaap.service.MMRestService)
- util.list(id: 'jaxrsServices') {
- ref(bean:'echoService')
- ref(bean:'userService')
-
- }
+// util.list(id: 'jaxrsServices') {
+// ref(bean:'echoService')
+// ref(bean:'userService')
+//
+// }
} \ No newline at end of file
diff --git a/src/main/ajsc/dmaap_v1/dmaap/v1/conf/serviceBeans.xml b/src/main/ajsc/dmaap_v1/dmaap/v1/conf/serviceBeans.xml
index 090a76b..20e4ceb 100644
--- a/src/main/ajsc/dmaap_v1/dmaap/v1/conf/serviceBeans.xml
+++ b/src/main/ajsc/dmaap_v1/dmaap/v1/conf/serviceBeans.xml
@@ -5,45 +5,68 @@
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd">
-
+
<!-- Dependency Injection with annotations -->
<!-- <context:property-placeholder
location="file:/C:/Users/su622b/Desktop/testonap.properties"/> -->
- <!-- <context:property-placeholder
- location="classpath:msgRtrApi.properties,classpath:DMaaPErrorMesaages.properties" /> -->
-
- <context:component-scan
- base-package="org.onap.dmaap,org.onap.dmaap.filemonitor,org.onap.dmaap.mmagent,org.onap.dmaap.service,org.onap.dmaap.tools,org.onap.dmaap.util,org.onap.dmaap.filter,org.onap.dmaap.apiServer.metrics.cambria,
- org.onap.dmaap.dmf.mr,org.onap.dmaap.dmf.mr.backends,org.onap.dmaap.dmf.mr.backends.kafka,org.onap.dmaap.dmf.mr.backends.memory,org.onap.dmaap.dmf.mr.beans,org.onap.dmaap.dmf.mr.constants,org.onap.dmaap.dmf.mr.exception,
- org.onap.dmaap.dmf.mr.listener,org.onap.dmaap.dmf.mr.metabroker,org.onap.dmaap.dmf.mr.metrics.publisher,org.onap.dmaap.dmf.mr.metrics.publisher.impl,org.onap.dmaap.dmf.mr.resources,org.onap.dmaap.dmf.mr.resources.streamReaders,org.onap.dmaap.dmf.mr.security,
- org.onap.dmaap.dmf.mr.security.impl,org.onap.dmaap.dmf.mr.service,org.onap.dmaap.dmf.mr.service.impl,org.onap.dmaap.dmf.mr.transaction,org.onap.dmaap.dmf.mr.transaction.impl,org.onap.dmaap.dmf.mr.utils,
- com.att,com.att.dmf.mr.utils, com.att.dmf.mr, com.att.dmf.mr.rest,com.att.dmf.mr.service,
- com.att.dmf.mr.service.impl,com.att.dmf.mr.beans,com.att.dmf.mr.security,com.att.dmf.mr.exception,com.att.dmf.mr.backends,com.att.dmf.mr.backends.kafka,
- com.att.dmf.mr.transaction,com.att.dmf.mr.exception,com.att.nsa.dmaap,com.att.nsa.dmaap.service,com.att.nsa.dmaap.util,java.lang,java.util,com.att.dmf.mr.exception, com.att.dmf,com.att.nsa.dmaap.mmagent" />
- <context:property-placeholder
- location="file:${AJSC_HOME}/bundleconfig/etc/appprops/MsgRtrApi.properties,file:${AJSC_HOME}/etc/DMaaPErrorMesaages.properties"/>
-
- <bean id="jsonProvider" class="org.apache.cxf.jaxrs.provider.json.JSONProvider">
- <property name="dropRootElement" value="true" />
- <property name="supportUnwrapped" value="true" />
- </bean>
-
- <bean id="jacksonProvider" class="com.fasterxml.jackson.jaxrs.json.JacksonJaxbJsonProvider" />
-
- <bean id="cambriaExMapper" class="org.onap.dmaap.DMaaPCambriaExceptionMapper" />
-
- <bean id="webExMapper" class="org.onap.dmaap.DMaaPWebExceptionMapper" />
-
-
+ <!-- <context:property-placeholder
+ location="classpath:msgRtrApi.properties,classpath:DMaaPErrorMesaages.properties" /> -->
+
+ <context:component-scan
+ base-package="
+ org.onap.dmaap,org.onap.dmaap.filemonitor,
+ org.onap.dmaap.mmagent,
+ org.onap.dmaap.service,
+ org.onap.dmaap.tools,
+ org.onap.dmaap.util,
+ org.onap.dmaap.filter,
+ org.onap.dmaap.apiServer.metrics.cambria,
+ org.onap.dmaap.dmf.mr,
+ org.onap.dmaap.dmf.mr.backends,
+ org.onap.dmaap.dmf.mr.backends.kafka,
+ org.onap.dmaap.dmf.mr.backends.memory,
+ org.onap.dmaap.dmf.mr.beans,
+ org.onap.dmaap.dmf.mr.constants,
+ org.onap.dmaap.dmf.mr.exception,
+ org.onap.dmaap.dmf.mr.listener,
+ org.onap.dmaap.dmf.mr.metabroker,
+ org.onap.dmaap.dmf.mr.metrics.publisher,
+ org.onap.dmaap.dmf.mr.metrics.publisher.impl,
+ org.onap.dmaap.dmf.mr.resources,
+ org.onap.dmaap.dmf.mr.resources.streamReaders,
+ org.onap.dmaap.dmf.mr.security,
+ org.onap.dmaap.dmf.mr.security.impl,
+ org.onap.dmaap.dmf.mr.service,
+ org.onap.dmaap.dmf.mr.service.impl,
+ org.onap.dmaap.dmf.mr.transaction,
+ org.onap.dmaap.dmf.mr.transaction.impl,
+ org.onap.dmaap.dmf.mr.utils,
+ java.lang,
+ java.util" />
+ <context:property-placeholder
+ location="file:${AJSC_HOME}/bundleconfig/etc/appprops/MsgRtrApi.properties,file:${AJSC_HOME}/etc/DMaaPErrorMesaages.properties"/>
+
+ <bean id="jsonProvider" class="org.apache.cxf.jaxrs.provider.json.JSONProvider">
+ <property name="dropRootElement" value="true" />
+ <property name="supportUnwrapped" value="true" />
+ </bean>
+
+ <bean id="jacksonProvider" class="com.fasterxml.jackson.jaxrs.json.JacksonJaxbJsonProvider" />
+
+ <bean id="cambriaExMapper" class="org.onap.dmaap.DMaaPCambriaExceptionMapper" />
+
+ <bean id="webExMapper" class="org.onap.dmaap.DMaaPWebExceptionMapper" />
+
+
<!-- Your bean definitions goes here -->
-<!-- <bean id="performanceLog" name="performanceLog" class="com.att.ajsc.csi.logging.PerformanceTracking" /> -->
-<!-- <bean id="processRestletHeaders" name="processRestletHeaders" class="ajsc.restlet.ProcessRestletHeaders" /> -->
- <bean id="servicePropsBean" name="servicePropsBean"
+ <!-- <bean id="performanceLog" name="performanceLog" class="com.att.ajsc.csi.logging.PerformanceTracking" /> -->
+ <!-- <bean id="processRestletHeaders" name="processRestletHeaders" class="ajsc.restlet.ProcessRestletHeaders" /> -->
+ <bean id="servicePropsBean" name="servicePropsBean"
class="org.onap.dmaap.util.ServicePropertiesMapBean" />
-
- <!-- Msgrtr beans -->
- <bean id="propertyReader" class="org.onap.dmaap.dmf.mr.utils.PropertyReader" />
- <bean
+
+ <!-- Msgrtr beans -->
+ <bean id="propertyReader" class="org.onap.dmaap.dmf.mr.utils.PropertyReader" />
+ <bean
class="org.springframework.beans.factory.config.MethodInvokingFactoryBean">
<!-- Next value is the full qualified name of the static setter including
method name -->
@@ -60,7 +83,7 @@
class="com.att.nsa.drumlin.service.framework.routing.DrumlinRequestRouter" />
<bean id="dMaaPMetricsSet" class="org.onap.dmaap.dmf.mr.beans.DMaaPMetricsSet">
- <constructor-arg ref="propertyReader" />
+ <constructor-arg ref="propertyReader" />
</bean>
<bean id="dMaaPZkClient" class=" org.onap.dmaap.dmf.mr.beans.DMaaPZkClient">
@@ -71,7 +94,7 @@
<constructor-arg ref="dMaaPZkClient" />
<constructor-arg ref="propertyReader" />
</bean>
-
+
<bean id="kafkaPublisher" class="org.onap.dmaap.dmf.mr.backends.kafka.KafkaPublisher">
<constructor-arg ref="propertyReader" />
@@ -82,13 +105,13 @@
<constructor-arg ref="dMaaPMetricsSet" />
<constructor-arg ref="kafkalockavoid" />
</bean> -->
-
- <bean id="dMaaPKafkaConsumerFactory" class="org.onap.dmaap.dmf.mr.beans.DMaaPKafkaConsumerFactory">
+
+ <bean id="dMaaPKafkaConsumerFactory" class="org.onap.dmaap.dmf.mr.beans.DMaaPKafkaConsumerFactory">
<constructor-arg ref="dMaaPMetricsSet" />
<constructor-arg ref="curator" />
<constructor-arg ref="kafkalockavoid" />
</bean>
-
+
<bean id="curator" class="org.onap.dmaap.dmf.mr.utils.DMaaPCuratorFactory"
factory-method="getCurator">
@@ -125,9 +148,9 @@
<bean id="defLength" class="org.onap.dmaap.mr.filter.DefaultLength">
<property name="defaultLength" value="${maxcontentlength}"></property>
</bean>
-
- <bean id="kafkalockavoid" class="org.onap.dmaap.dmf.mr.backends.kafka.KafkaLiveLockAvoider2" />
-
- <bean class="org.springframework.context.annotation.CommonAnnotationBeanPostProcessor"/>
+ <bean id="kafkalockavoid" class="org.onap.dmaap.dmf.mr.backends.kafka.KafkaLiveLockAvoider2" />
+
+
+ <bean class="org.springframework.context.annotation.CommonAnnotationBeanPostProcessor"/>
</beans>
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java
index 7a08345..ae7414e 100644
--- a/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java
+++ b/src/main/java/org/onap/dmaap/dmf/mr/beans/DMaaPKafkaMetaBroker.java
@@ -36,6 +36,7 @@ import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.KafkaFuture;
import org.json.JSONArray;
@@ -44,7 +45,6 @@ import org.onap.dmaap.dmf.mr.CambriaApiException;
import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
import org.onap.dmaap.dmf.mr.metabroker.Broker1;
import org.onap.dmaap.dmf.mr.metabroker.Topic;
-import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
import org.onap.dmaap.dmf.mr.utils.Utils;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.util.StringUtils;
@@ -62,31 +62,35 @@ import java.util.concurrent.ExecutionException;
//@Component
public class DMaaPKafkaMetaBroker implements Broker1 {
+ private static final EELFLogger log = EELFManager.getLogger(DMaaPKafkaMetaBroker.class);
+ private final AdminClient fKafkaAdminClient;
+ private static final boolean GET_TOPICS_FROM_ZK = Boolean.parseBoolean(System.getenv().
+ getOrDefault("useZkTopicStore", "true"));
+ private final ZkClient fZk;
+ private final ConfigDb fCambriaConfig;
+ private final ConfigPath fBaseTopicData;
+ private static final String ZK_TOPICS_ROOT = "/brokers/topics";
+ private static final JSONObject kEmptyAcl = new JSONObject();
+
public DMaaPKafkaMetaBroker() {
fZk = null;
fCambriaConfig = null;
fBaseTopicData = null;
final Properties props = new Properties ();
String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
- "kafka.metadata.broker.list");
- if (StringUtils.isEmpty(fkafkaBrokers)) {
+ "kafka.metadata.broker.list");
+ if (StringUtils.isEmpty(fkafkaBrokers)) {
fkafkaBrokers = "localhost:9092";
}
-
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
+
if(Utils.isCadiEnabled()){
props.putAll(Utils.addSaslProps());
}
fKafkaAdminClient=AdminClient.create ( props );
-
}
- private static final EELFLogger log = EELFManager.getInstance().getLogger(ConfigurationReader.class);
- private final AdminClient fKafkaAdminClient;
-
-
-
/**
* DMaaPKafkaMetaBroker constructor initializing
*
@@ -95,50 +99,61 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
* @param configDb
*/
public DMaaPKafkaMetaBroker(@Qualifier("propertyReader") rrNvReadable settings,
- @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) {
+ @Qualifier("dMaaPZkClient") ZkClient zk, @Qualifier("dMaaPZkConfigDb") ConfigDb configDb) {
fZk = zk;
fCambriaConfig = configDb;
fBaseTopicData = configDb.parse("/topics");
final Properties props = new Properties ();
String fkafkaBrokers = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
- "kafka.metadata.broker.list");
- if (null == fkafkaBrokers) {
+ "kafka.metadata.broker.list");
+ if (null == fkafkaBrokers) {
fkafkaBrokers = "localhost:9092";
}
+ props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
if(Utils.isCadiEnabled()){
props.putAll(Utils.addSaslProps());
}
- props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, fkafkaBrokers );
-
fKafkaAdminClient=AdminClient.create ( props );
-
-
-
}
- public DMaaPKafkaMetaBroker( rrNvReadable settings,
- ZkClient zk, ConfigDb configDb,AdminClient client) {
-
+ public DMaaPKafkaMetaBroker(ZkClient zk, ConfigDb configDb,AdminClient client) {
fZk = zk;
fCambriaConfig = configDb;
fBaseTopicData = configDb.parse("/topics");
fKafkaAdminClient= client;
-
-
-
}
@Override
public List<Topic> getAllTopics() throws ConfigDbException {
log.info("Retrieving list of all the topics.");
- final LinkedList<Topic> result = new LinkedList<>();
+ if (!GET_TOPICS_FROM_ZK) {
+ return getTopicsFromKafka();
+ }
+ return getTopicsFromZookeeper();
+ }
+
+ private LinkedList<Topic> getTopicsFromKafka() throws ConfigDbException {
+ LinkedList<Topic> res = new LinkedList<>();
+ final ListTopicsResult ltr = fKafkaAdminClient.listTopics();
try {
- log.info("Retrieving all topics from root: " + zkTopicsRoot);
- final List<String> topics = fZk.getChildren(zkTopicsRoot);
+ for (String name: ltr.names().get()) {
+ res.add(new KafkaTopic(name, fCambriaConfig, fBaseTopicData));
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ log.error("GetAllTopicsFromKafka: Failed to retrieve topic list from kafka.", e);
+ }
+ return res;
+ }
+
+ private LinkedList<Topic> getTopicsFromZookeeper() throws ConfigDbException {
+ final LinkedList<Topic> legacyResult = new LinkedList<>();
+ try {
+ log.info("Retrieving all topics from root: " + ZK_TOPICS_ROOT);
+ final List<String> topics = fZk.getChildren(ZK_TOPICS_ROOT);
for (String topic : topics) {
- result.add(new KafkaTopic(topic, fCambriaConfig, fBaseTopicData));
+ legacyResult.add(new KafkaTopic(topic, fCambriaConfig, fBaseTopicData));
}
JSONObject dataObj = new JSONObject();
dataObj.put("topics", new JSONObject());
@@ -148,17 +163,29 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
}
} catch (ZkNoNodeException excp) {
// very fresh kafka doesn't have any topics or a topics node
- log.error("ZK doesn't have a Kakfa topics node at " + zkTopicsRoot, excp);
+ log.error("ZK doesn't have a Kakfa topics node at " + ZK_TOPICS_ROOT, excp);
}
- return result;
+ return legacyResult;
}
@Override
public Topic getTopic(String topic) throws ConfigDbException {
- if (fZk.exists(zkTopicsRoot + "/" + topic)) {
+ if (!GET_TOPICS_FROM_ZK) {
+ try {
+ for (String name : fKafkaAdminClient.listTopics().names().get()) {
+ if (name.equals(topic)) {
+ log.debug("TOPIC_NAME: {} is equal to : {}", name, topic);
+ return getKafkaTopicConfig(fCambriaConfig, fBaseTopicData, topic);
+ }
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ log.error("GetTopicFromKafka: Failed to retrieve topic list from kafka.", e);
+ return null;
+ }
+ } else if (fZk.exists(ZK_TOPICS_ROOT + "/" + topic)) {
return getKafkaTopicConfig(fCambriaConfig, fBaseTopicData, topic);
}
- // else: no such topic in kafka
+ // else: no such topic
return null;
}
@@ -180,42 +207,38 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
*/
@Override
public Topic createTopic(String topic, String desc, String ownerApiKey, int partitions, int replicas,
- boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException {
- log.info("Creating topic: " + topic);
+ boolean transactionEnabled) throws TopicExistsException, CambriaApiException,ConfigDbException {
+ log.info("Creating topic: {}", topic);
try {
- log.info("Check if topic [" + topic + "] exist.");
+ log.info("Check if topic [{}] exist.", topic);
// first check for existence "our way"
final Topic t = getTopic(topic);
if (t != null) {
- log.info("Could not create topic [" + topic + "]. Topic Already exists.");
+ log.info("Could not create topic [{}]. Topic Already exists.", topic);
throw new TopicExistsException("Could not create topic [" + topic + "]. Topic Alreay exists.");
}
} catch (ConfigDbException e1) {
log.error("Topic [" + topic + "] could not be created. Couldn't check topic data in config db.", e1);
throw new CambriaApiException(HttpStatusCodes.k503_serviceUnavailable,
- "Couldn't check topic data in config db.");
+ "Couldn't check topic data in config db.");
}
// we only allow 3 replicas. (If we don't test this, we get weird
// results from the cluster,
// so explicit test and fail.)
if (replicas < 1 || replicas > 3) {
- log.info("Topic [" + topic + "] could not be created. The replica count must be between 1 and 3.");
+ log.info("Topic [{}] could not be created. The replica count must be between 1 and 3.", topic);
throw new CambriaApiException(HttpStatusCodes.k400_badRequest,
- "The replica count must be between 1 and 3.");
+ "The replica count must be between 1 and 3.");
}
if (partitions < 1) {
- log.info("Topic [" + topic + "] could not be created. The partition count must be at least 1.");
+ log.info("Topic [{}] could not be created. The partition count must be at least 1.", topic);
throw new CambriaApiException(HttpStatusCodes.k400_badRequest, "The partition count must be at least 1.");
}
-
// create via kafka
-
try {
- final NewTopic topicRequest =
- new NewTopic(topic, partitions, (short)replicas);
- final CreateTopicsResult ctr =
- fKafkaAdminClient.createTopics(Arrays.asList(topicRequest));
+ final NewTopic topicRequest = new NewTopic(topic, partitions, (short)replicas);
+ final CreateTopicsResult ctr = fKafkaAdminClient.createTopics(Arrays.asList(topicRequest));
final KafkaFuture<Void> ctrResult = ctr.all();
ctrResult.get();
// underlying Kafka topic created. now setup our API info
@@ -232,16 +255,13 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
@Override
public void deleteTopic(String topic) throws CambriaApiException, TopicExistsException,ConfigDbException {
- log.info("Deleting topic: " + topic);
+ log.info("Deleting topic: {}", topic);
try {
log.info("Loading zookeeper client for topic deletion.");
// topic creation. (Otherwise, the topic is only partially created
// in ZK.)
-
-
fKafkaAdminClient.deleteTopics(Arrays.asList(topic));
log.info("Zookeeper client loaded successfully. Deleting topic.");
-
} catch (Exception e) {
log.error("Failed to delete topic [" + topic + "]. " + e.getMessage(), e);
throw new ConfigDbException(e);
@@ -250,13 +270,6 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
}
}
- private final ZkClient fZk;
- private final ConfigDb fCambriaConfig;
- private final ConfigPath fBaseTopicData;
-
- private static final String zkTopicsRoot = "/brokers/topics";
- private static final JSONObject kEmptyAcl = new JSONObject();
-
/**
* method Providing KafkaTopic Object associated with owner and
* transactionenabled or not
@@ -269,7 +282,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
* @throws ConfigDbException
*/
public KafkaTopic createTopicEntry(String name, String desc, String owner, boolean transactionEnabled)
- throws ConfigDbException {
+ throws ConfigDbException {
return createTopicEntry(fCambriaConfig, fBaseTopicData, name, desc, owner, transactionEnabled);
}
@@ -286,18 +299,20 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
* @throws ConfigDbException
*/
public static KafkaTopic createTopicEntry(ConfigDb db, ConfigPath basePath, String name, String desc, String owner,
- boolean transactionEnabled) throws ConfigDbException {
+ boolean transactionEnabled) throws ConfigDbException {
final JSONObject o = new JSONObject();
o.put("owner", owner);
o.put("description", desc);
o.put("txenabled", transactionEnabled);
- db.store(basePath.getChild(name), o.toString());
+ if (GET_TOPICS_FROM_ZK) {
+ db.store(basePath.getChild(name), o.toString());
+ }
return new KafkaTopic(name, db, basePath);
}
/**
- * class performing all user opearation like user is eligible to read,
- * write. permitting a user to write and read,
+ * class performing all user operation like user is eligible to read,
+ * write. permitting a user to write and read etc
*
* @author anowarul.islam
*
@@ -311,6 +326,16 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
* @param baseTopic
* @throws ConfigDbException
*/
+
+ private final String fName;
+ private final ConfigDb fConfigDb;
+ private final ConfigPath fBaseTopicData;
+ private final String fOwner;
+ private final String fDesc;
+ private final NsaAcl fReaders;
+ private final NsaAcl fWriters;
+ private final boolean fTransactionEnabled;
+
public KafkaTopic(String name, ConfigDb configdb, ConfigPath baseTopic) throws ConfigDbException {
fName = name;
fConfigDb = configdb;
@@ -396,7 +421,7 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
@Override
public void permitWritesFromUser(String pubId, NsaApiKey asUser)
- throws ConfigDbException, AccessDeniedException {
+ throws ConfigDbException, AccessDeniedException {
updateAcl(asUser, false, true, pubId);
}
@@ -407,22 +432,20 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
@Override
public void permitReadsByUser(String consumerId, NsaApiKey asUser)
- throws ConfigDbException, AccessDeniedException {
+ throws ConfigDbException, AccessDeniedException {
updateAcl(asUser, true, true, consumerId);
}
@Override
public void denyReadsByUser(String consumerId, NsaApiKey asUser)
- throws ConfigDbException, AccessDeniedException {
+ throws ConfigDbException, AccessDeniedException {
updateAcl(asUser, true, false, consumerId);
}
private void updateAcl(NsaApiKey asUser, boolean reader, boolean add, String key)
- throws ConfigDbException, AccessDeniedException{
- try
- {
+ throws ConfigDbException, AccessDeniedException{
+ try {
final NsaAcl acl = NsaAclUtils.updateAcl ( this, asUser, key, reader, add );
-
// we have to assume we have current data, or load it again. for the expected use
// case, assuming we can overwrite the data is fine.
final JSONObject o = new JSONObject ();
@@ -432,10 +455,8 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
fConfigDb.store ( fBaseTopicData.getChild ( fName ), o.toString () );
log.info ( "ACL_UPDATE: " + asUser.getKey () + " " + ( add ? "added" : "removed" ) + ( reader?"subscriber":"publisher" ) + " " + key + " on " + fName );
-
- }
- catch ( ConfigDbException | AccessDeniedException x )
- {
+ } catch ( ConfigDbException | AccessDeniedException x ) {
+ log.info("Error when trying to update acl for key {}", key);
throw x;
}
@@ -445,15 +466,6 @@ public class DMaaPKafkaMetaBroker implements Broker1 {
return acl == null ? null : acl.serialize();
}
- private final String fName;
- private final ConfigDb fConfigDb;
- private final ConfigPath fBaseTopicData;
- private final String fOwner;
- private final String fDesc;
- private final NsaAcl fReaders;
- private final NsaAcl fWriters;
- private boolean fTransactionEnabled;
-
public boolean isTransactionEnabled() {
return fTransactionEnabled;
}
diff --git a/src/test/java/org/onap/dmaap/mr/cambria/beans/DMaaPKafkaMetaBrokerTest.java b/src/test/java/org/onap/dmaap/mr/cambria/beans/DMaaPKafkaMetaBrokerTest.java
index d59c839..1c717ca 100644
--- a/src/test/java/org/onap/dmaap/mr/cambria/beans/DMaaPKafkaMetaBrokerTest.java
+++ b/src/test/java/org/onap/dmaap/mr/cambria/beans/DMaaPKafkaMetaBrokerTest.java
@@ -24,6 +24,8 @@ import static org.junit.Assert.assertTrue;
import com.att.nsa.configs.ConfigDb;
import com.att.nsa.configs.ConfigDbException;
import com.att.nsa.configs.ConfigPath;
+import java.util.Properties;
+import org.mockito.ArgumentMatchers;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.clients.admin.AdminClient;
import org.junit.Before;
@@ -47,15 +49,13 @@ import org.powermock.modules.junit4.PowerMockRunner;
@PrepareForTest({ AdminClient.class })
public class DMaaPKafkaMetaBrokerTest {
- @InjectMocks
+ @Mock
private DMaaPKafkaMetaBroker dMaaPKafkaMetaBroker;
@Mock
private ZkClient fZk;
@Mock
private AdminClient fKafkaAdminClient;
@Mock
- private AdminClient client;
- @Mock
private ConfigDb configDb;
@Mock
ConfigPath fBaseTopicData;
@@ -68,18 +68,13 @@ public class DMaaPKafkaMetaBrokerTest {
public void setUp() {
MockitoAnnotations.initMocks(this);
PowerMockito.mockStatic(AdminClient.class);
- // PowerMockito.when(AdminClient.create (any(Properties.class)
- // )).thenReturn(fKafkaAdminClient);
-
- // PowerMockito.mockStatic(AdminUtils.class);
+ PowerMockito.when(AdminClient.create(ArgumentMatchers.any(Properties.class))).thenReturn(fKafkaAdminClient);
PowerMockito.when(configDb.parse("/topics")).thenReturn(fBaseTopicData);
-
}
@Test
public void testBrokercreate() {
DMaaPKafkaMetaBroker broker = new DMaaPKafkaMetaBroker();
-
}
@Test
@@ -90,7 +85,6 @@ public class DMaaPKafkaMetaBrokerTest {
} catch (Exception e) {
assertTrue(true);
}
-
}
@Test
@@ -101,7 +95,6 @@ public class DMaaPKafkaMetaBrokerTest {
// TODO Auto-generated catch block
e.printStackTrace();
}
-
}
@Test
diff --git a/version.properties b/version.properties
index 67a9369..4b00b94 100644
--- a/version.properties
+++ b/version.properties
@@ -26,8 +26,8 @@
# because they are used in Jenkins, whose plug-in doesn't support
major=1
-minor=3
-patch=2
+minor=4
+patch=0
base_version=${major}.${minor}.${patch}