summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--cds-ui/designer-client/angular.json18
-rw-r--r--cds-ui/designer-client/package-lock.json37
-rw-r--r--cds-ui/designer-client/package.json1
-rw-r--r--cds-ui/designer-client/src/app/app.module.ts3
-rw-r--r--cds-ui/designer-client/src/app/modules/feature-modules/packages/configuration-dashboard/configuration-dashboard.component.html2
-rw-r--r--cds-ui/designer-client/src/app/modules/feature-modules/packages/package-creation/metadata-tab/metadata-tab.component.html2
-rw-r--r--cds-ui/designer-client/src/app/modules/feature-modules/packages/package-creation/package-creation.component.html2
-rw-r--r--cds-ui/designer-client/src/app/modules/feature-modules/packages/package-creation/template-mapping/templ-mapp-creation/templ-mapp-creation.component.html16
-rw-r--r--cds-ui/designer-client/src/app/modules/feature-modules/packages/packages-dashboard/package-list/package-list.component.html177
-rw-r--r--cds-ui/designer-client/src/app/modules/shared-modules/shared-modules.module.ts4
-rw-r--r--cds-ui/designer-client/src/styles.css110
-rwxr-xr-xms/blueprintsprocessor/application/src/main/resources/application-dev.properties12
-rwxr-xr-xms/blueprintsprocessor/application/src/main/resources/application.properties43
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt20
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt4
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt219
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt283
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt141
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt (renamed from ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt)27
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt (renamed from ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt)38
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt (renamed from ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt)15
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt60
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt85
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt72
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt (renamed from ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt)18
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/application-test.properties13
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/test.keystore.jksbin0 -> 4998 bytes
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/test.truststore.jksbin0 -> 1648 bytes
28 files changed, 1103 insertions, 319 deletions
diff --git a/cds-ui/designer-client/angular.json b/cds-ui/designer-client/angular.json
index a84b95e39..256c35c1e 100644
--- a/cds-ui/designer-client/angular.json
+++ b/cds-ui/designer-client/angular.json
@@ -61,10 +61,12 @@
},
"configurations": {
"production": {
- "fileReplacements": [{
- "replace": "src/environments/environment.ts",
- "with": "src/environments/environment.prod.ts"
- }],
+ "fileReplacements": [
+ {
+ "replace": "src/environments/environment.ts",
+ "with": "src/environments/environment.prod.ts"
+ }
+ ],
"optimization": true,
"outputHashing": "all",
"sourceMap": false,
@@ -74,7 +76,8 @@
"extractLicenses": true,
"vendorChunk": false,
"buildOptimizer": true,
- "budgets": [{
+ "budgets": [
+ {
"type": "initial",
"maximumWarning": "2mb",
"maximumError": "5mb"
@@ -122,10 +125,7 @@
"./node_modules/@angular/material/prebuilt-themes/purple-green.css",
"./node_modules/font-awesome/css/font-awesome.css"
],
- "scripts": [
-
-
- ]
+ "scripts": []
}
},
"lint": {
diff --git a/cds-ui/designer-client/package-lock.json b/cds-ui/designer-client/package-lock.json
index 9df5d033f..46997c4ab 100644
--- a/cds-ui/designer-client/package-lock.json
+++ b/cds-ui/designer-client/package-lock.json
@@ -361,6 +361,8 @@
"dev": true,
"optional": true,
"requires": {
+ "bindings": "^1.5.0",
+ "nan": "^2.12.1",
"node-pre-gyp": "*"
},
"dependencies": {
@@ -2998,6 +3000,16 @@
"integrity": "sha512-Phlt0plgpIIBOGTT/ehfFnbNlfsDEiqmzE2KRXoX1bLIlir4X/MR+zSyBEkL05ffWgnRSf/DXv+WrUAVr93/ow==",
"dev": true
},
+ "bindings": {
+ "version": "1.5.0",
+ "resolved": "https://registry.npmjs.org/bindings/-/bindings-1.5.0.tgz",
+ "integrity": "sha512-p2q/t/mhvuOj/UeLlV6566GD/guowlr0hHxClI0W9m7MWYkL1F0hLo+0Aexs9HSPCtR1SXQ0TD3MMKrXZajbiQ==",
+ "dev": true,
+ "optional": true,
+ "requires": {
+ "file-uri-to-path": "1.0.0"
+ }
+ },
"blob": {
"version": "0.0.5",
"resolved": "https://registry.npmjs.org/blob/-/blob-0.0.5.tgz",
@@ -5142,6 +5154,13 @@
"resolved": "https://registry.npmjs.org/file-saver/-/file-saver-2.0.2.tgz",
"integrity": "sha512-Wz3c3XQ5xroCxd1G8b7yL0Ehkf0TC9oYC6buPFkNnU9EnaPlifeAFCyCh+iewXTyFRcg0a6j3J7FmJsIhlhBdw=="
},
+ "file-uri-to-path": {
+ "version": "1.0.0",
+ "resolved": "https://registry.npmjs.org/file-uri-to-path/-/file-uri-to-path-1.0.0.tgz",
+ "integrity": "sha512-0Zt+s3L7Vf1biwWZ29aARiVYLx7iMGnEUl9x33fbB/j3jR81u/O2LbqK+Bm1CDSNDKVtJ/YjwY7TUd5SkeLQLw==",
+ "dev": true,
+ "optional": true
+ },
"fileset": {
"version": "2.0.3",
"resolved": "https://registry.npmjs.org/fileset/-/fileset-2.0.3.tgz",
@@ -6751,6 +6770,8 @@
"dev": true,
"optional": true,
"requires": {
+ "bindings": "^1.5.0",
+ "nan": "^2.12.1",
"node-pre-gyp": "*"
},
"dependencies": {
@@ -8015,6 +8036,13 @@
"resolved": "https://registry.npmjs.org/mute-stream/-/mute-stream-0.0.8.tgz",
"integrity": "sha512-nnbWWOkoWyUsTjKrhgD0dcz22mdkSnpYqbEjIm2nhwhuxlSkpywJmBo8h0ZqJdkp73mb90SssHkN4rsRaBAfAA=="
},
+ "nan": {
+ "version": "2.14.1",
+ "resolved": "https://registry.npmjs.org/nan/-/nan-2.14.1.tgz",
+ "integrity": "sha512-isWHgVjnFjh2x2yuJ/tj3JbwoHu3UC2dX5G/88Cm24yB6YopVgxvBObDY7n5xW6ExmFhJpSEQqFPvq9zaXc8Jw==",
+ "dev": true,
+ "optional": true
+ },
"nanomatch": {
"version": "1.2.13",
"resolved": "https://registry.npmjs.org/nanomatch/-/nanomatch-1.2.13.tgz",
@@ -8060,6 +8088,11 @@
"brace": "^0.11.1"
}
},
+ "ngx-bootstrap": {
+ "version": "5.6.1",
+ "resolved": "https://registry.npmjs.org/ngx-bootstrap/-/ngx-bootstrap-5.6.1.tgz",
+ "integrity": "sha512-8fDs3VaaWgKpupakPKS0QaUc+1E/JMBGJDxUUODjyIkLtFr1A8vH4cjXiV3AfrPvhK27GH0oyTPyKWKcCjEtVg=="
+ },
"ngx-file-drop": {
"version": "8.0.8",
"resolved": "https://registry.npmjs.org/ngx-file-drop/-/ngx-file-drop-8.0.8.tgz",
@@ -11575,6 +11608,8 @@
"dev": true,
"optional": true,
"requires": {
+ "bindings": "^1.5.0",
+ "nan": "^2.12.1",
"node-pre-gyp": "*"
},
"dependencies": {
@@ -12405,6 +12440,8 @@
"dev": true,
"optional": true,
"requires": {
+ "bindings": "^1.5.0",
+ "nan": "^2.12.1",
"node-pre-gyp": "*"
},
"dependencies": {
diff --git a/cds-ui/designer-client/package.json b/cds-ui/designer-client/package.json
index ffad818c0..ec968ba7a 100644
--- a/cds-ui/designer-client/package.json
+++ b/cds-ui/designer-client/package.json
@@ -41,6 +41,7 @@
"lodash": "^4.17.15",
"ng-sidebar": "^9.1.1",
"ng2-ace-editor": "^0.3.9",
+ "ngx-bootstrap": "^5.6.1",
"ngx-file-drop": "^8.0.8",
"rxjs": "~6.4.0",
"stream": "0.0.2",
diff --git a/cds-ui/designer-client/src/app/app.module.ts b/cds-ui/designer-client/src/app/app.module.ts
index d6e9b36df..da7ddfbd0 100644
--- a/cds-ui/designer-client/src/app/app.module.ts
+++ b/cds-ui/designer-client/src/app/app.module.ts
@@ -35,6 +35,7 @@ import {SharedModulesModule} from './modules/shared-modules/shared-modules.modul
import {NgxFileDropModule} from 'ngx-file-drop';
import {ResourceDictionaryModule} from './modules/feature-modules/resource-dictionary/resource-dictionary.module';
+
@NgModule({
declarations: [
AppComponent,
@@ -51,7 +52,7 @@ import {ResourceDictionaryModule} from './modules/feature-modules/resource-dicti
SharedModulesModule,
NgxFileDropModule,
ResourceDictionaryModule,
- SidebarModule
+ SidebarModule,
],
providers: [ApiService],
diff --git a/cds-ui/designer-client/src/app/modules/feature-modules/packages/configuration-dashboard/configuration-dashboard.component.html b/cds-ui/designer-client/src/app/modules/feature-modules/packages/configuration-dashboard/configuration-dashboard.component.html
index 1ff03b221..ceb2bd497 100644
--- a/cds-ui/designer-client/src/app/modules/feature-modules/packages/configuration-dashboard/configuration-dashboard.component.html
+++ b/cds-ui/designer-client/src/app/modules/feature-modules/packages/configuration-dashboard/configuration-dashboard.component.html
@@ -23,7 +23,7 @@
<div class="container">
<div class="creat-action-container">
- <a href="#" class="action-button" (click)="editBluePrint()">
+ <a href="#" class="action-button save" (click)="editBluePrint()">
<i class="icon-save-sm" aria-hidden="true"></i>
<span>Save</span>
</a>
diff --git a/cds-ui/designer-client/src/app/modules/feature-modules/packages/package-creation/metadata-tab/metadata-tab.component.html b/cds-ui/designer-client/src/app/modules/feature-modules/packages/package-creation/metadata-tab/metadata-tab.component.html
index de2b3831d..6622f6700 100644
--- a/cds-ui/designer-client/src/app/modules/feature-modules/packages/package-creation/metadata-tab/metadata-tab.component.html
+++ b/cds-ui/designer-client/src/app/modules/feature-modules/packages/package-creation/metadata-tab/metadata-tab.component.html
@@ -62,7 +62,7 @@
<input type="input" (keyup.enter)="addTag($event)" [(ngModel)]="metaDataTab.tags"
placeholder="Ex., vDNS-CDS">
</div>
- <div class="model-note-container tag-notes">Seprate tags with comma or space</div>
+ <div class="model-note-container tag-notes">Use ENTER to add tag</div>
<div class="model-note-container tages-container">
<span *ngFor="let tag of tags" class="single-tage">{{tag}}
<i (click)="removeTag(tag)" class="fa fa-times-circle"></i>
diff --git a/cds-ui/designer-client/src/app/modules/feature-modules/packages/package-creation/package-creation.component.html b/cds-ui/designer-client/src/app/modules/feature-modules/packages/package-creation/package-creation.component.html
index 1253dc10b..ee4368711 100644
--- a/cds-ui/designer-client/src/app/modules/feature-modules/packages/package-creation/package-creation.component.html
+++ b/cds-ui/designer-client/src/app/modules/feature-modules/packages/package-creation/package-creation.component.html
@@ -20,7 +20,7 @@
<div class="container-fluid body-container">
<div class="container">
<div class="creat-action-container">
- <a href="#" class="action-button" (click)="saveBluePrint()">
+ <a href="#" class="action-button save" (click)="saveBluePrint()">
<i class="icon-save-sm" aria-hidden="true"></i>
<span>Save</span>
</a>
diff --git a/cds-ui/designer-client/src/app/modules/feature-modules/packages/package-creation/template-mapping/templ-mapp-creation/templ-mapp-creation.component.html b/cds-ui/designer-client/src/app/modules/feature-modules/packages/package-creation/template-mapping/templ-mapp-creation/templ-mapp-creation.component.html
index a33087461..16c3101f2 100644
--- a/cds-ui/designer-client/src/app/modules/feature-modules/packages/package-creation/template-mapping/templ-mapp-creation/templ-mapp-creation.component.html
+++ b/cds-ui/designer-client/src/app/modules/feature-modules/packages/package-creation/template-mapping/templ-mapp-creation/templ-mapp-creation.component.html
@@ -58,7 +58,7 @@
<a href="#" data-toggle="modal" (click)="allowedExt=[getFileExtension()]"
data-target="#templateModal"><b>Import
File</b></a></div>
- <div class="editor-container">
+ <div class="editor-container mb-4">
<app-source-editor (textChange)="textChanges($event,templateInfo.fileName)"
[(text)]="templateFileContent"></app-source-editor>
</div>
@@ -87,7 +87,7 @@
class="mapping-source-load">
<i class="icon-upload-attributes"></i>
<br />
- <div>Upload attribute list</div>
+ <div>Upload Attributes List</div>
<div class="source-load-note">(Should be comma delimited file)</div>
</a>
<!-- <a href="#" class="mapping-source-load">
@@ -119,8 +119,8 @@
<tbody>
<tr *ngFor="let dict of resourceDictionaryRes">
<td>
- <img *ngIf="dict.definition?.property?.required" src="/assets/img/icon-required-no.svg">
- <img *ngIf="!dict.definition?.property?.required" src="/assets/img/icon-required-yes.svg">
+ <img *ngIf="dict.definition?.property?.required" src="/assets/img/icon-required-yes.svg">
+ <img *ngIf="!dict.definition?.property?.required" src="/assets/img/icon-required-no.svg">
</td>
<td>{{ dict.name }}</td>
<td>{{ dict.name }}</td>
@@ -166,8 +166,8 @@
<tbody>
<tr *ngFor="let dict of mappingRes">
<td>
- <i *ngIf="dict.definition?.property?.required" class="fa fa-check-square mx-2"></i>
- <i *ngIf="!dict.definition?.property?.required" class="fa fa-square mx-2"></i>
+ <img *ngIf="dict.definition?.property?.required" src="/assets/img/icon-required-yes.svg">
+ <img *ngIf="!dict.definition?.property?.required" src="/assets/img/icon-required-no.svg">
</td>
<td>{{ dict['name'] }}</td>
<td>{{ dict['name'] }}</td>
@@ -194,8 +194,8 @@
</div>
<div class="template-mapping-action">
- <button class="btn btn-sm btn-outline-secondary">Cancel</button>
- <button (click)="saveToStore()" class="btn btn-sm btn-primary">Submit</button>
+ <button class="btn btn-outline-secondary">Cancel</button>
+ <button (click)="saveToStore()" class="btn btn-primary">Submit</button>
</div>
</div>
</div>
diff --git a/cds-ui/designer-client/src/app/modules/feature-modules/packages/packages-dashboard/package-list/package-list.component.html b/cds-ui/designer-client/src/app/modules/feature-modules/packages/packages-dashboard/package-list/package-list.component.html
index ed67dff40..49c80d3ac 100644
--- a/cds-ui/designer-client/src/app/modules/feature-modules/packages/packages-dashboard/package-list/package-list.component.html
+++ b/cds-ui/designer-client/src/app/modules/feature-modules/packages/packages-dashboard/package-list/package-list.component.html
@@ -8,10 +8,12 @@
<div class="card-footer row">
<div class="col text-center">
<a routerLink="/packages/createPackage" role="button" aria-pressed="true"
- class="btn-create-package float"><i class="icon-create-white" aria-hidden="true"></i>Create Package
+ class="btn-create-package float"><i class="icon-create-white" aria-hidden="true"></i>Create
+ Package
</a>
- <br/>
- <a href="#" role="button" aria-pressed="true" class="btn-import-package float mb-3"><i class="icon-import-blue" aria-hidden="true"></i>Import Package
+ <br />
+ <a href="#" role="button" aria-pressed="true" class="btn-import-package float mb-3"><i
+ class="icon-import-blue" aria-hidden="true"></i>Import Package
</a>
</div>
</div>
@@ -19,102 +21,105 @@
</div>
<div class="col-lg-3 col-md-6 d-flex" *ngFor="let bluePrint of viewedPackages">
<!--Card 1-->
- <div class="card">
- <div class="card-body">
- <div class="row">
- <div class="col-9 pr-0">
- <a class="card-title" [routerLink]="['/packages/package', bluePrint.id]"
- (click)="testDispatch(bluePrint)">
- <!-- <img class="icon-deployed" src="/assets/img/icon-deploy.svg"> -->
- {{bluePrint.artifactName}}<span>.vLB.CDS</span>
- <span class="package-version">Version 1.0.2</span>
- </a>
-
- </div>
- <div class="col-3">
+ <div class="card">
+ <div class="card-body">
+ <div class="row">
+ <div class="col-10 pr-0">
+ <a class="card-title" [routerLink]="['/packages/package', bluePrint.id]"
+ (click)="testDispatch(bluePrint)">
+ <!-- <img class="icon-deployed" src="/assets/img/icon-deploy.svg"> -->
+ <p class="packageName" tooltip="{{bluePrint.artifactName}}" placement="bottom">
+ {{bluePrint.artifactName}}<span>.vLB.CDS</span></p>
+ <span class="package-version">V 1.0.2</span>
+ </a>
- <div class="dropdown">
- <input class="dropdown-toggle" type="text">
- <div class="dropdown-text">
- <!-- <img src="/assets/img/icon-menuDots.svg" title="Actions"> -->
- <i class="icon-menuDots" aria-hidden="true"></i>
- </div>
- <ul class="dropdown-content">
- <li class="action-clone">
- <a href="#">
- <i class="icon-clone-sm" aria-hidden="true"></i>
- Clone
- </a>
- </li>
- <li class="action-archive">
- <a href="#">
- <i class="icon-archive-sm" aria-hidden="true"></i>
- Archive
- </a>
- </li>
- <li class="action-archive">
- <a href="#">
- <i class="icon-download" aria-hidden="true"></i>
- Download
- </a>
- </li>
- <li class="action-delete">
- <a href="#">
- <i class="icon-delete-sm" aria-hidden="true"></i>
- Delete
- </a>
- </li>
- </ul>
- </div>
-
- </div>
</div>
- <div class="row">
- <div class="col">
- <p class="mb-0 mt-1">Last modified {{ bluePrint.createdDate | date:'short' }}
- </p>
- <p class="mb-2">By {{bluePrint.updatedBy}}</p>
- <p class="package-desc tooltip-bottom" data-tooltip="DESCRIPTION:
- The standard chunk of Lorem Ipsum used since the 1500s is reproduced below for those interested. Sections 1.10.32 and 1.10.33 from de Finibus Bonorum et Malorum by Cicero are also reproduced in their exact original form, accompanied by English versions from the 1914 translation by H. Rackham.">Description text quisquam est dolorem, velit...</p>
-
- <ul class="package-contributers">
- <li>
- <button type="button" class="border-fade" data-toggle="tooltip"
- data-placement="bottom" title="User name">
- <img src="/assets/img/img-user1.jpeg">
- </button>
+ <div class="col-2">
+ <div class="dropdown">
+ <input class="dropdown-toggle" type="text">
+ <div class="dropdown-text">
+ <!-- <img src="/assets/img/icon-menuDots.svg" title="Actions"> -->
+ <i class="icon-menuDots" aria-hidden="true"></i>
+ </div>
+ <ul class="dropdown-content">
+ <li class="action-clone">
+ <a href="#">
+ <i class="icon-clone-sm" aria-hidden="true"></i>
+ Clone
+ </a>
</li>
- <li>
- <button type="button" data-toggle="tooltip" data-placement="bottom"
- title="User name">
- <img src="/assets/img/img-user2.jpg">
- </button>
+ <li class="action-archive">
+ <a href="#">
+ <i class="icon-archive-sm" aria-hidden="true"></i>
+ Archive
+ </a>
</li>
- <li>
- <button type="button" data-toggle="tooltip" data-placement="bottom"
- title="User name">
- <img src="/assets/img/img-user3.jpg">
- </button>
+ <li class="action-archive">
+ <a href="#">
+ <i class="icon-download" aria-hidden="true"></i>
+ Download
+ </a>
</li>
- <li>
- <a href="">5 contributors</a>
+ <li class="action-delete">
+ <a href="#">
+ <i class="icon-delete-sm" aria-hidden="true"></i>
+ Delete
+ </a>
</li>
</ul>
</div>
+
</div>
- <div class="card-footer">
- <div class="row">
- <div class="col">
- <button type="button" class="btn btn-card-topology"><i class="icon-btn-card-topology" aria-hidden="true"></i>Designer Mode
+ </div>
+ <div class="row">
+ <div class="col">
+ <p class="mb-0 mt-1">Last modified {{ bluePrint.createdDate | date:'short' }}
+ </p>
+ <p class="mb-2">By {{bluePrint.updatedBy}}</p>
+ <p class="package-desc" [delay]="300"
+ tooltip="DESCRIPTION:
+ The standard chunk of Lorem Ipsum used since the 1500s is reproduced below for those interested. Sections 1.10.32 and 1.10.33 from de Finibus Bonorum et Malorum by Cicero are also reproduced in their exact original form, accompanied by English versions from the 1914 translation by H. Rackham."
+ placement="auto">Description text quisquam est dolorem, velitThe standard chunk of Lorem
+ Ipsum used since the 1500s is reproduced below for those interested. Sections 1.10.32 and
+ 1.10.33 from de Finibus Bonorum et</p>
+ <ul class="package-contributers">
+ <li>
+ <button type="button" class="border-fade" data-toggle="tooltip" data-placement="bottom"
+ title="User name">
+ <img src="/assets/img/img-user1.jpeg">
</button>
- </div>
- <div class="col">
- <button type="button" (click)="view(bluePrint.id)" class="btn btn-card-config"><i class="icon-btn-card-config" aria-hidden="true"></i>Configuration</button>
- </div>
+ </li>
+ <li>
+ <button type="button" data-toggle="tooltip" data-placement="bottom" title="User name">
+ <img src="/assets/img/img-user2.jpg">
+ </button>
+ </li>
+ <li>
+ <button type="button" data-toggle="tooltip" data-placement="bottom" title="User name">
+ <img src="/assets/img/img-user3.jpg">
+ </button>
+ </li>
+ <li>
+ <a href="">5 contributors</a>
+ </li>
+ </ul>
+ </div>
+ </div>
+ <div class="card-footer">
+ <div class="row">
+ <div class="col">
+ <button type="button" class="btn btn-card-topology"><i class="icon-btn-card-topology"
+ aria-hidden="true"></i>Designer Mode
+ </button>
+ </div>
+ <div class="col">
+ <button type="button" (click)="view(bluePrint.id)" class="btn btn-card-config"><i
+ class="icon-btn-card-config" aria-hidden="true"></i>Configuration</button>
</div>
</div>
</div>
-
</div>
+
+ </div>
</div>
</div> \ No newline at end of file
diff --git a/cds-ui/designer-client/src/app/modules/shared-modules/shared-modules.module.ts b/cds-ui/designer-client/src/app/modules/shared-modules/shared-modules.module.ts
index 7229891f5..7663fb8dc 100644
--- a/cds-ui/designer-client/src/app/modules/shared-modules/shared-modules.module.ts
+++ b/cds-ui/designer-client/src/app/modules/shared-modules/shared-modules.module.ts
@@ -2,13 +2,15 @@ import { NgModule } from '@angular/core';
import { CommonModule } from '@angular/common';
import { HeaderComponent } from './header/header.component';
import { RouterModule } from '@angular/router';
+import { TooltipModule } from 'ngx-bootstrap/tooltip';
@NgModule({
declarations: [HeaderComponent],
imports: [
CommonModule,
RouterModule,
+ TooltipModule.forRoot(),
],
- exports : [HeaderComponent]
+ exports : [HeaderComponent, TooltipModule, ]
})
export class SharedModulesModule { }
diff --git a/cds-ui/designer-client/src/styles.css b/cds-ui/designer-client/src/styles.css
index b44a36235..0f79bc1b7 100644
--- a/cds-ui/designer-client/src/styles.css
+++ b/cds-ui/designer-client/src/styles.css
@@ -470,13 +470,14 @@ height: 40px;
}
.packages-card .card-title{
margin-bottom: 0 !important;
- font-size: 15px;
+ font-size: 13px;
font-weight: bold;
color: #1B3E6F;
}
.packages-card .card-title span{
color: #1B3E6F;
font-size: 11px;
+ vertical-align: bottom;
}
.packages-card p{
font-size: 12px;
@@ -485,6 +486,10 @@ height: 40px;
.packages-card p.package-desc{
font-size: 13px;
text-align: left;
+ display: -webkit-box;
+ -webkit-line-clamp: 1;
+ -webkit-box-orient: vertical;
+ overflow: hidden;
}
.packages-card p.package-desc:hover{
color: #1B3E6F !important;
@@ -1046,10 +1051,23 @@ height: 40px;
}
.packages-card .card-title{
margin-bottom: 0 !important;
- font-size: 15px;
+ font-size: 14px;
font-weight: bold;
}
-.packages-card .card-title:hover{
+.packages-card .card-title .packageName{
+ margin-bottom: 0;
+ display: inline-block;
+ width: auto;
+ max-width: 76%;
+ white-space: nowrap;
+ overflow: hidden;
+ text-overflow: ellipsis;
+ font-size: 13px;
+ vertical-align: bottom;
+ color: #1B3E6F;
+}
+.packages-card .card-title:hover,
+.packages-card .card-title:hover .packageName{
text-decoration: none;
color: #1273EB;
}
@@ -1157,10 +1175,13 @@ height: 40px;
background: #1B3E6F;
box-shadow: none;
}
+.package-version{
+ color: #C3CDDB !important;
+}
.package-version::before{
content: "|";
- margin-left: 12px;
- margin-right: 10px;
+ margin-left: 8px;
+ margin-right: 6px;
vertical-align: text-bottom;
}
@@ -1579,6 +1600,32 @@ ul.package-contributers{
margin-left: 0;
margin-bottom: -16px;
}
+
+/*TooltipModule - Shady*/
+.tooltip:before,
+.tooltip:after{
+ width: 100% !important;
+}
+.tooltip .tooltip-inner{
+ max-width: 280px !important;
+ width: 100% !important;
+ text-align: left!important;
+ color:#1B3E6F;
+ background-color: #F4F9FE;
+ border: solid 1px #E6EDF5;
+ border-radius: 3px !important;
+ font-size: 11px;
+}
+.bs-tooltip-bottom .arrow::before{
+ border-bottom-color: #E6EDF5 !important;
+}

+.bs-tooltip-top .arrow::before{
+ border-top-color: #E6EDF5 !important;
+}

+
+
+
+
.btn{
padding-right: 20px !important;
padding-left: 20px !important;
@@ -1911,6 +1958,9 @@ hr{
font-size: 11px;
color: #C3CDDB;
}
+.action-button.save{
+ color: #1273EB;
+}
.action-button.delete{
color: #BABBC3;
}
@@ -2494,6 +2544,8 @@ margin-right: 5px;
}
.template-mapping-action button{
border-radius: 50px;
+ padding: 6px 20px;
+ font-size: 14px;
}
.template-mapping-action .btn-primary{
background:#5DBDBA !important ;
@@ -2521,22 +2573,22 @@ margin-right: 5px;
.dataTables_empty {
display: none;
}
-#DataTables_Table_0_length,
-#DataTables_Table_0_filter{
+.dataTables_length,
+.dataTables_filter{
margin-bottom: 6px;
- color: #1B3E6F;
+ color: #1B3E6F !important;
font-size: 13px;
font-weight: bold;
}
-#DataTables_Table_0_filter input{
+.dataTables_filter input{
color: #1B3E6F;
background: url(../src/assets/img/icon-search.svg) 9px center no-repeat;
padding: 4px 9px 4px 24px;
border: solid 1px #ECEDF2;
border-radius: 4px;
}
-#DataTables_Table_0_filter input:focus{
+.dataTables_filter input:focus{
box-shadow: 0 2px 6px 0 rgba(47, 83, 151, 0.1);
}
#mapping-table th,
@@ -2565,41 +2617,45 @@ table.dataTable.no-footer{
margin-bottom: 9px;
border-bottom: solid 1px #ECEDF2 !important;
}
-#DataTables_Table_0_info{
+.dataTables_info{
padding-top: 12px;
- color: #1B3E6F;
+ color: #1B3E6F !important;
font-size: 13px;
font-weight: bold;
}
-#DataTables_Table_0_paginate,
-#DataTables_Table_0_paginate a{
+.dataTables_wrapper .dataTables_paginate,
+.dataTables_wrapper .dataTables_paginate a.paginate_button,
+.dataTables_wrapper .dataTables_paginate a.paginate_button.current,
+.dataTables_wrapper .dataTables_paginate a.paginate_button.disabled{
margin-bottom: 3px;
color: #1B3E6F !important;
font-size: 13px;
font-weight: bold;
+ border: 0 !important;
}
-#DataTables_Table_0_paginate .paginate_button,
-#DataTables_Table_0_paginate .paginate_button:hover{
+.dataTables_wrapper .dataTables_paginate a.paginate_button:hover{
border: 0;
- background: none;
+ color: #1B3E6F !important;
+ background: none !important;
}
-.dataTables_wrapper .dataTables_paginate .paginate_button{
+.dataTables_wrapper .dataTables_paginate a.paginate_button{
padding: 0.4em .9em !important;
}
-#DataTables_Table_0_paginate .paginate_button.current{
+.dataTables_wrapper .dataTables_paginate a.paginate_button.current{
color: #1B3E6F !important;
background: #F4F9FE !important;
- border: solid 1px #EEF4F9;
+ border: solid 1px #EEF4F9 !important;
border-radius: 100% !important;
box-shadow: 0 2px 6px 0 rgba(47, 83, 151, 0.1);
outline: 0;
}
-#DataTables_Table_0_paginate a.paginate_button.disabled{
+.dataTables_wrapper .dataTables_paginate a.paginate_button.disabled{
opacity: .6;
-
}
-#DataTables_Table_0_paginate a.paginate_button.disabled:hover{
+.dataTables_wrapper .dataTables_paginate a.paginate_button.disabled:hover{
cursor: default;
+ background: none !important;
+ border: 0 !important;
}
#mapping-table .form-control,
#mapping-table .custom-select{
@@ -2620,7 +2676,13 @@ table.dataTable.no-footer{
background-color: #F4F9FE;
color: #1B3E6F;
}
-
+#mapping-table .form-control:disabled{
+ padding-left: 0;
+ box-shadow: none;
+ border: 0;
+ background: transparent;
+ color: #1B3E6F;
+}
/* Extra small devices (portrait phones, less than 576px) */
@media (max-width: 575.98px) {
.page-title{
diff --git a/ms/blueprintsprocessor/application/src/main/resources/application-dev.properties b/ms/blueprintsprocessor/application/src/main/resources/application-dev.properties
index ad38883f7..bf5e23bc9 100755
--- a/ms/blueprintsprocessor/application/src/main/resources/application-dev.properties
+++ b/ms/blueprintsprocessor/application/src/main/resources/application-dev.properties
@@ -131,19 +131,31 @@ blueprintsprocessor.messageconsumer.self-service-api.groupId=receiver-id
blueprintsprocessor.messageconsumer.self-service-api.topic=receiver.t
blueprintsprocessor.messageconsumer.self-service-api.clientId=request-receiver-client-id
blueprintsprocessor.messageconsumer.self-service-api.pollMillSec=1000
+#### Security settings
+#### SSL
+#blueprintsprocessor.messageconsumer.self-service-api.truststore=/path/to/truststore.jks
+#blueprintsprocessor.messageconsumer.self-service-api.truststorePassword=truststorePassword
+#blueprintsprocessor.messageconsumer.self-service-api.keystore=/path/to/keystore.jks
+#blueprintsprocessor.messageconsumer.self-service-api.keystorePassword=keystorePassword
+#### SCRAM
+#blueprintsprocessor.messageconsumer.self-service-api.scramUsername=test-user
+#blueprintsprocessor.messageconsumer.self-service-api.scramPassword=testUserPassword
# Kafka audit service Configurations
+## Audit request
blueprintsprocessor.messageproducer.self-service-api.audit.kafkaEnable=false
blueprintsprocessor.messageproducer.self-service-api.audit.request.type=kafka-basic-auth
blueprintsprocessor.messageproducer.self-service-api.audit.request.bootstrapServers=127.0.0.1:9092
blueprintsprocessor.messageproducer.self-service-api.audit.request.clientId=audit-request-producer-client-id
blueprintsprocessor.messageproducer.self-service-api.audit.request.topic=audit-request-producer.t
+## Audit response
blueprintsprocessor.messageproducer.self-service-api.audit.response.type=kafka-basic-auth
blueprintsprocessor.messageproducer.self-service-api.audit.response.bootstrapServers=127.0.0.1:9092
blueprintsprocessor.messageproducer.self-service-api.audit.response.clientId=audit-response-producer-client-id
blueprintsprocessor.messageproducer.self-service-api.audit.response.topic=audit-response-producer.t
+
# Message prioritization kakfa properties, Enable if Prioritization service is needed
# Deploy message-prioritization function along with blueprintsprocessor application.
#blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-basic-auth
diff --git a/ms/blueprintsprocessor/application/src/main/resources/application.properties b/ms/blueprintsprocessor/application/src/main/resources/application.properties
index 74549b0ae..6fb737edc 100755
--- a/ms/blueprintsprocessor/application/src/main/resources/application.properties
+++ b/ms/blueprintsprocessor/application/src/main/resources/application.properties
@@ -103,20 +103,35 @@ blueprintsprocessor.restclient.aai-data.additionalHeaders.X-TransactionId=cds-tr
blueprintsprocessor.restclient.aai-data.additionalHeaders.X-FromAppId=cds-app-id
blueprintsprocessor.restclient.aai-data.additionalHeaders.Accept=application/json
-# Kafka-message-lib Configuration
-blueprintsprocessor.messageconsumer.self-service-api.kafkaEnable=false
-blueprintsprocessor.messageconsumer.self-service-api.type=kafka-basic-auth
-blueprintsprocessor.messageconsumer.self-service-api.bootstrapServers=127.0.0.1:9092
-blueprintsprocessor.messageconsumer.self-service-api.topic=receiver.t
-blueprintsprocessor.messageconsumer.self-service-api.groupId=receiver-id
-blueprintsprocessor.messageconsumer.self-service-api.clientId=default-client-id
-blueprintsprocessor.messageconsumer.self-service-api.pollMillSec=1000
-
-blueprintsprocessor.messageproducer.self-service-api.type=kafka-basic-auth
-blueprintsprocessor.messageproducer.self-service-api.bootstrapServers=127.0.0.1:9092
-blueprintsprocessor.messageproducer.self-service-api.clientId=default-client-id
-blueprintsprocessor.messageproducer.self-service-api.topic=producer.t
-
+# Kafka audit service Configurations
+## Audit request
+blueprintsprocessor.messageproducer.self-service-api.audit.kafkaEnable=false
+blueprintsprocessor.messageproducer.self-service-api.audit.request.type=kafka-basic-auth
+blueprintsprocessor.messageproducer.self-service-api.audit.request.bootstrapServers=127.0.0.1:9092
+blueprintsprocessor.messageproducer.self-service-api.audit.request.clientId=audit-request-producer-client-id
+blueprintsprocessor.messageproducer.self-service-api.audit.request.topic=audit-request-producer.t
+#### Security settings
+#### SSL
+#blueprintsprocessor.messageproducer.self-service-api.audit.request.truststore=/path/to/truststore.jks
+#blueprintsprocessor.messageproducer.self-service-api.audit.request.truststorePassword=truststorePassword
+#blueprintsprocessor.messageproducer.self-service-api.audit.request.keystore=/path/to/keystore.jks
+#blueprintsprocessor.messageproducer.self-service-api.audit.request.keystorePassword=keystorePassword
+#### SCRAM
+#blueprintsprocessor.messageproducer.self-service-api.audit.request.scramUsername=test-user
+#blueprintsprocessor.messageproducer.self-service-api.audit.request.scramPassword=testUserPassword
+
+## Audit response
+blueprintsprocessor.messageproducer.self-service-api.audit.response.type=kafka-basic-auth
+blueprintsprocessor.messageproducer.self-service-api.audit.response.bootstrapServers=127.0.0.1:9092
+blueprintsprocessor.messageproducer.self-service-api.audit.response.clientId=audit-response-producer-client-id
+blueprintsprocessor.messageproducer.self-service-api.audit.response.topic=audit-response-producer.t
+
+# Message prioritization kakfa properties, Enable if Prioritization service is needed
+# Deploy message-prioritization function along with blueprintsprocessor application.
+#blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-basic-auth
+#blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092
+#blueprintsprocessor.messageconsumer.prioritize-input.applicationId=cds-controller
+#blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic
blueprintprocessor.remoteScriptCommand.enabled=true
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
index 35bc49402..7e6bf68be 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
@@ -38,7 +38,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.s
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample
import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
-import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaBasicAuthMessageProducerService
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaMessageProducerService
import org.onap.ccsdk.cds.blueprintsprocessor.nats.BluePrintNatsLibConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService
import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils
@@ -72,15 +72,27 @@ import kotlin.test.assertNotNull
"spring.jpa.properties.hibernate.show_sql=false",
"spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl",
- "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-basic-auth",
+ "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-scram-ssl-auth",
"blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092",
"blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application",
"blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic",
+ "blueprintsprocessor.messageconsumer.prioritize-input.truststore=/path/to/truststore.jks",
+ "blueprintsprocessor.messageconsumer.prioritize-input.truststorePassword=truststorePassword",
+ "blueprintsprocessor.messageconsumer.prioritize-input.keystore=/path/to/keystore.jks",
+ "blueprintsprocessor.messageconsumer.prioritize-input.keystorePassword=keystorePassword",
+ "blueprintsprocessor.messageconsumer.prioritize-input.scramUsername=test-user",
+ "blueprintsprocessor.messageconsumer.prioritize-input.scramPassword=testUserPassword",
// To send initial test message
- "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-basic-auth",
+ "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-scram-ssl-auth",
"blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092",
"blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic",
+ "blueprintsprocessor.messageproducer.prioritize-input.truststore=/path/to/truststore.jks",
+ "blueprintsprocessor.messageproducer.prioritize-input.truststorePassword=truststorePassword",
+ "blueprintsprocessor.messageproducer.prioritize-input.keystore=/path/to/keystore.jks",
+ "blueprintsprocessor.messageproducer.prioritize-input.keystorePassword=keystorePassword",
+ "blueprintsprocessor.messageproducer.prioritize-input.scramUsername=test-user",
+ "blueprintsprocessor.messageproducer.prioritize-input.scramPassword=testUserPassword",
"blueprintsprocessor.nats.cds-controller.type=token-auth",
"blueprintsprocessor.nats.cds-controller.host=nats://localhost:4222",
@@ -241,7 +253,7 @@ open class MessagePrioritizationConsumerTest {
/** Send sample message with every 1 sec */
val blueprintMessageProducerService = bluePrintMessageLibPropertyService
- .blueprintMessageProducerService("prioritize-input") as KafkaBasicAuthMessageProducerService
+ .blueprintMessageProducerService("prioritize-input") as KafkaMessageProducerService
launch {
MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
delay(100)
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt
index cc4c7fa4a..c6587c799 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt
@@ -61,6 +61,10 @@ class MessageLibConstants {
const val PROPERTY_MESSAGE_CONSUMER_PREFIX = "blueprintsprocessor.messageconsumer."
const val PROPERTY_MESSAGE_PRODUCER_PREFIX = "blueprintsprocessor.messageproducer."
const val TYPE_KAFKA_BASIC_AUTH = "kafka-basic-auth"
+ const val TYPE_KAFKA_SCRAM_SSL_AUTH = "kafka-scram-ssl-auth"
+ const val TYPE_KAFKA_SSL_AUTH = "kafka-ssl-auth"
const val TYPE_KAFKA_STREAMS_BASIC_AUTH = "kafka-streams-basic-auth"
+ const val TYPE_KAFKA_STREAMS_SSL_AUTH = "kafka-streams-ssl-auth"
+ const val TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH = "kafka-streams-scram-ssl-auth"
}
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
index 005223d9b..ac35fbf2c 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
@@ -17,49 +17,238 @@
package org.onap.ccsdk.cds.blueprintsprocessor.message
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.config.SslConfigs
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.security.scram.ScramLoginModule
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.apache.kafka.common.serialization.ByteArraySerializer
+import org.apache.kafka.common.serialization.StringDeserializer
+import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.streams.StreamsConfig
-/** Producer Properties **/
-open class MessageProducerProperties {
+/** Common Properties **/
+abstract class CommonProperties {
lateinit var type: String
+ lateinit var topic: String
+ lateinit var bootstrapServers: String
+
+ open fun getConfig(): HashMap<String, Any> {
+ val configProps = hashMapOf<String, Any>()
+ configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
+ return configProps
+ }
}
+/** Message Producer */
+/** Message Producer Properties **/
+abstract class MessageProducerProperties : CommonProperties()
+
+/** Basic Auth */
open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() {
- lateinit var bootstrapServers: String
- var topic: String? = null
+
var clientId: String? = null
// strongest producing guarantee
var acks: String = "all"
var retries: Int = 0
// ensure we don't push duplicates
var enableIdempotence: Boolean = true
+
+ override fun getConfig(): HashMap<String, Any> {
+ val configProps = super.getConfig()
+ configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
+ configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java
+ configProps[ProducerConfig.ACKS_CONFIG] = acks
+ configProps[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG] = enableIdempotence
+ if (clientId != null) {
+ configProps[ProducerConfig.CLIENT_ID_CONFIG] = clientId!!
+ }
+ return configProps
+ }
}
-/** Consumer Properties **/
+/** SSL Auth */
+open class KafkaSslAuthMessageProducerProperties : KafkaBasicAuthMessageProducerProperties() {
+ lateinit var truststore: String
+ lateinit var truststorePassword: String
+ var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
+ var keystore: String? = null
+ var keystorePassword: String? = null
+ var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
+ var sslEndpointIdentificationAlgorithm: String = ""
-open class MessageConsumerProperties {
- lateinit var type: String
+ override fun getConfig(): HashMap<String, Any> {
+ val configProps = super.getConfig()
+ configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
+ configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
+ configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!!
+ configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!!
+ if (keystore != null) {
+ configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
+ configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
+ configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!!
+ }
+ configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm
+
+ return configProps
+ }
}
-open class KafkaStreamsConsumerProperties : MessageConsumerProperties() {
- lateinit var bootstrapServers: String
+/** (SASL) SCRAM SSL Auth */
+class KafkaScramSslAuthMessageProducerProperties : KafkaSslAuthMessageProducerProperties() {
+ var saslMechanism: String = "SCRAM-SHA-512"
+ lateinit var scramUsername: String
+ lateinit var scramPassword: String
+
+ override fun getConfig(): HashMap<String, Any> {
+ val configProps = super.getConfig()
+ configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
+ configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
+ configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
+ "username=\"${scramUsername}\" " +
+ "password=\"${scramPassword}\";"
+ return configProps
+ }
+}
+
+/** Consumer */
+abstract class MessageConsumerProperties : CommonProperties()
+/** Kafka Streams */
+/** Streams properties */
+
+/** Basic Auth */
+open class KafkaStreamsBasicAuthConsumerProperties : MessageConsumerProperties() {
lateinit var applicationId: String
- lateinit var topic: String
var autoOffsetReset: String = "latest"
var processingGuarantee: String = StreamsConfig.EXACTLY_ONCE
+
+ override fun getConfig(): HashMap<String, Any> {
+ val configProperties = super.getConfig()
+ configProperties[StreamsConfig.APPLICATION_ID_CONFIG] = applicationId
+ configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset
+ configProperties[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = processingGuarantee
+ return configProperties
+ }
}
-open class KafkaStreamsBasicAuthConsumerProperties : KafkaStreamsConsumerProperties()
+/** SSL Auth */
+open class KafkaStreamsSslAuthConsumerProperties : KafkaStreamsBasicAuthConsumerProperties() {
+ lateinit var truststore: String
+ lateinit var truststorePassword: String
+ var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
+ var keystore: String? = null
+ var keystorePassword: String? = null
+ var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
+ var sslEndpointIdentificationAlgorithm: String = ""
-open class KafkaMessageConsumerProperties : MessageConsumerProperties() {
- lateinit var bootstrapServers: String
+ override fun getConfig(): HashMap<String, Any> {
+ val configProps = super.getConfig()
+ configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
+ configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
+ configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!!
+ configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!!
+ if (keystore != null) {
+ configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
+ configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
+ configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!!
+ }
+ configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm
+ return configProps
+ }
+}
+
+/** (SASL) SCRAM SSL Auth */
+class KafkaStreamsScramSslAuthConsumerProperties : KafkaStreamsSslAuthConsumerProperties() {
+ var saslMechanism: String = "SCRAM-SHA-512"
+ lateinit var scramUsername: String
+ lateinit var scramPassword: String
+
+ override fun getConfig(): HashMap<String, Any> {
+ val configProps = super.getConfig()
+ configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
+ configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
+ configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
+ "username=\"${scramUsername}\" " +
+ "password=\"${scramPassword}\";"
+ return configProps
+ }
+}
+
+/** Message Consumer */
+/** Message Consumer Properties **/
+/** Basic Auth */
+open class KafkaBasicAuthMessageConsumerProperties : MessageConsumerProperties() {
lateinit var groupId: String
lateinit var clientId: String
- var topic: String? = null
var autoCommit: Boolean = true
var autoOffsetReset: String = "latest"
var pollMillSec: Long = 1000
var pollRecords: Int = -1
+
+ override fun getConfig(): HashMap<String, Any> {
+ val configProperties = super.getConfig()
+ configProperties[ConsumerConfig.GROUP_ID_CONFIG] = groupId
+ configProperties[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = autoCommit
+ /**
+ * earliest: automatically reset the offset to the earliest offset
+ * latest: automatically reset the offset to the latest offset
+ */
+ configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset
+ configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
+ configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java
+ configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = clientId
+
+ /** To handle Back pressure, Get only configured record for processing */
+ if (pollRecords > 0) {
+ configProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = pollRecords
+ }
+
+ return configProperties
+ }
}
-open class KafkaBasicAuthMessageConsumerProperties : KafkaMessageConsumerProperties()
+/** SSL Auth */
+open class KafkaSslAuthMessageConsumerProperties : KafkaBasicAuthMessageConsumerProperties() {
+ lateinit var truststore: String
+ lateinit var truststorePassword: String
+ var truststoreType: String = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
+ var keystore: String? = null
+ var keystorePassword: String? = null
+ var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
+ var sslEndpointIdentificationAlgorithm: String = ""
+
+ override fun getConfig(): HashMap<String, Any> {
+ val configProps = super.getConfig()
+ configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SSL.toString()
+ configProps[SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG] = truststoreType
+ configProps[SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG] = truststore!!
+ configProps[SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG] = truststorePassword!!
+ if (keystore != null) {
+ configProps[SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG] = keystore!!
+ configProps[SslConfigs.SSL_KEYSTORE_TYPE_CONFIG] = keystoreType
+ configProps[SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG] = keystorePassword!!
+ }
+ configProps[SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG] = sslEndpointIdentificationAlgorithm
+ return configProps
+ }
+}
+
+/** (SASL) SCRAM SSL Auth */
+class KafkaScramSslAuthMessageConsumerProperties : KafkaSslAuthMessageConsumerProperties() {
+ var saslMechanism: String = "SCRAM-SHA-512"
+ lateinit var scramUsername: String
+ lateinit var scramPassword: String
+
+ override fun getConfig(): HashMap<String, Any> {
+ val configProps = super.getConfig()
+ configProps[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = SecurityProtocol.SASL_SSL.toString()
+ configProps[SaslConfigs.SASL_MECHANISM] = saslMechanism
+ configProps[SaslConfigs.SASL_JAAS_CONFIG] = "${ScramLoginModule::class.java.canonicalName} required " +
+ "username=\"${scramUsername}\" " +
+ "password=\"${scramPassword}\";"
+ return configProps
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt
index 88039466d..c659fdb8b 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt
@@ -99,6 +99,20 @@ class MessageProducerRelationshipTemplateBuilder(name: String, description: Stri
BluePrintTypes.kafkaBasicAuthMessageProducerProperties(block)
)
}
+
+ fun kafkaSslAuth(block: KafkaSslAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit) {
+ property(
+ BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
+ BluePrintTypes.kafkaSslAuthMessageProducerProperties(block)
+ )
+ }
+
+ fun kafkaScramSslAuth(block: KafkaScramSslAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit) {
+ property(
+ BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
+ BluePrintTypes.kafkaScramSslAuthMessageProducerProperties(block)
+ )
+ }
}
fun BluePrintTypes.kafkaBasicAuthMessageProducerProperties(block: KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit): JsonNode {
@@ -108,9 +122,23 @@ fun BluePrintTypes.kafkaBasicAuthMessageProducerProperties(block: KafkaBasicAuth
return assignments.asJsonType()
}
+fun BluePrintTypes.kafkaSslAuthMessageProducerProperties(block: KafkaSslAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit): JsonNode {
+ val assignments = KafkaSslAuthMessageProducerPropertiesAssignmentBuilder().apply(block).build()
+ assignments[KafkaSslAuthMessageProducerProperties::type.name] =
+ MessageLibConstants.TYPE_KAFKA_SSL_AUTH.asJsonPrimitive()
+ return assignments.asJsonType()
+}
+
+fun BluePrintTypes.kafkaScramSslAuthMessageProducerProperties(block: KafkaScramSslAuthMessageProducerPropertiesAssignmentBuilder.() -> Unit): JsonNode {
+ val assignments = KafkaScramSslAuthMessageProducerPropertiesAssignmentBuilder().apply(block).build()
+ assignments[KafkaScramSslAuthMessageProducerProperties::type.name] =
+ MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH.asJsonPrimitive()
+ return assignments.asJsonType()
+}
+
open class MessageProducerPropertiesAssignmentBuilder : PropertiesAssignmentBuilder()
-class KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder : MessageProducerPropertiesAssignmentBuilder() {
+open class KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder : MessageProducerPropertiesAssignmentBuilder() {
fun bootstrapServers(bootstrapServers: String) = bootstrapServers(bootstrapServers.asJsonPrimitive())
@@ -141,6 +169,61 @@ class KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder : MessageProducer
property(KafkaBasicAuthMessageProducerProperties::enableIdempotence, enableIdempotence)
}
+open class KafkaSslAuthMessageProducerPropertiesAssignmentBuilder : KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder() {
+ fun truststore(truststore: String) = truststore(truststore.asJsonPrimitive())
+
+ fun truststore(truststore: JsonNode) =
+ property(KafkaSslAuthMessageProducerProperties::truststore, truststore)
+
+ fun truststorePassword(truststorePassword: String) = truststorePassword(truststorePassword.asJsonPrimitive())
+
+ fun truststorePassword(truststorePassword: JsonNode) =
+ property(KafkaSslAuthMessageProducerProperties::truststorePassword, truststorePassword)
+
+ fun truststoreType(truststoreType: String) = truststoreType(truststoreType.asJsonPrimitive())
+
+ fun truststoreType(truststoreType: JsonNode) =
+ property(KafkaSslAuthMessageProducerProperties::truststoreType, truststoreType)
+
+ fun keystore(keystore: String) = keystore(keystore.asJsonPrimitive())
+
+ fun keystore(keystore: JsonNode) =
+ property(KafkaSslAuthMessageProducerProperties::keystore, keystore)
+
+ fun keystorePassword(keystorePassword: String) = keystorePassword(keystorePassword.asJsonPrimitive())
+
+ fun keystorePassword(keystorePassword: JsonNode) =
+ property(KafkaSslAuthMessageProducerProperties::keystorePassword, keystorePassword)
+
+ fun keystoreType(keystoreType: String) = keystoreType(keystoreType.asJsonPrimitive())
+
+ fun keystoreType(keystoreType: JsonNode) =
+ property(KafkaSslAuthMessageProducerProperties::keystoreType, keystoreType)
+
+ fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: String) =
+ sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm.asJsonPrimitive())
+
+ fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: JsonNode) =
+ property(KafkaSslAuthMessageProducerProperties::sslEndpointIdentificationAlgorithm, sslEndpointIdentificationAlgorithm)
+}
+
+class KafkaScramSslAuthMessageProducerPropertiesAssignmentBuilder : KafkaSslAuthMessageProducerPropertiesAssignmentBuilder() {
+ fun saslMechanism(saslMechanism: String) = saslMechanism(saslMechanism.asJsonPrimitive())
+
+ fun saslMechanism(saslMechanism: JsonNode) =
+ property(KafkaScramSslAuthMessageProducerProperties::saslMechanism, saslMechanism)
+
+ fun scramUsername(scramUsername: String) = scramUsername(scramUsername.asJsonPrimitive())
+
+ fun scramUsername(scramUsername: JsonNode) =
+ property(KafkaScramSslAuthMessageProducerProperties::scramUsername, scramUsername)
+
+ fun scramPassword(scramPassword: String) = scramPassword(scramPassword.asJsonPrimitive())
+
+ fun scramPassword(scramPassword: JsonNode) =
+ property(KafkaScramSslAuthMessageProducerProperties::scramPassword, scramPassword)
+}
+
/** Relationships Templates DSL for Message Consumer */
fun TopologyTemplateBuilder.relationshipTemplateMessageConsumer(
name: String,
@@ -166,12 +249,40 @@ class MessageConsumerRelationshipTemplateBuilder(name: String, description: Stri
)
}
+ fun kafkaSslAuth(block: KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit) {
+ property(
+ BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
+ BluePrintTypes.kafkaSslAuthMessageConsumerProperties(block)
+ )
+ }
+
+ fun kafkaScramSslAuth(block: KafkaScramSslAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit) {
+ property(
+ BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
+ BluePrintTypes.kafkaScramSslAuthMessageConsumerProperties(block)
+ )
+ }
+
fun kafkaStreamsBasicAuth(block: KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder.() -> Unit) {
property(
BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
BluePrintTypes.kafkaStreamsBasicAuthConsumerProperties(block)
)
}
+
+ fun kafkaStreamsSslAuth(block: KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder.() -> Unit) {
+ property(
+ BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
+ BluePrintTypes.kafkaStreamsSslAuthConsumerProperties(block)
+ )
+ }
+
+ fun kafkaStreamsScramSslAuth(block: KafkaStreamsScramSslAuthConsumerPropertiesAssignmentBuilder.() -> Unit) {
+ property(
+ BluePrintConstants.PROPERTY_CONNECTION_CONFIG,
+ BluePrintTypes.kafkaStreamsScramSslAuthConsumerProperties(block)
+ )
+ }
}
fun BluePrintTypes.kafkaBasicAuthMessageConsumerProperties(block: KafkaBasicAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode {
@@ -181,6 +292,20 @@ fun BluePrintTypes.kafkaBasicAuthMessageConsumerProperties(block: KafkaBasicAuth
return assignments.asJsonType()
}
+fun BluePrintTypes.kafkaSslAuthMessageConsumerProperties(block: KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode {
+ val assignments = KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder().apply(block).build()
+ assignments[KafkaSslAuthMessageConsumerProperties::type.name] =
+ MessageLibConstants.TYPE_KAFKA_SSL_AUTH.asJsonPrimitive()
+ return assignments.asJsonType()
+}
+
+fun BluePrintTypes.kafkaScramSslAuthMessageConsumerProperties(block: KafkaScramSslAuthMessageConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode {
+ val assignments = KafkaScramSslAuthMessageConsumerPropertiesAssignmentBuilder().apply(block).build()
+ assignments[KafkaScramSslAuthMessageConsumerProperties::type.name] =
+ MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH.asJsonPrimitive()
+ return assignments.asJsonType()
+}
+
fun BluePrintTypes.kafkaStreamsBasicAuthConsumerProperties(block: KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode {
val assignments = KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder().apply(block).build()
assignments[KafkaStreamsBasicAuthConsumerProperties::type.name] =
@@ -188,81 +313,201 @@ fun BluePrintTypes.kafkaStreamsBasicAuthConsumerProperties(block: KafkaStreamsBa
return assignments.asJsonType()
}
+fun BluePrintTypes.kafkaStreamsSslAuthConsumerProperties(block: KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode {
+ val assignments = KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder().apply(block).build()
+ assignments[KafkaStreamsSslAuthConsumerProperties::type.name] =
+ MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH.asJsonPrimitive()
+ return assignments.asJsonType()
+}
+
+fun BluePrintTypes.kafkaStreamsScramSslAuthConsumerProperties(block: KafkaStreamsScramSslAuthConsumerPropertiesAssignmentBuilder.() -> Unit): JsonNode {
+ val assignments = KafkaStreamsScramSslAuthConsumerPropertiesAssignmentBuilder().apply(block).build()
+ assignments[KafkaStreamsScramSslAuthConsumerProperties::type.name] =
+ MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH.asJsonPrimitive()
+ return assignments.asJsonType()
+}
+
open class MessageConsumerPropertiesAssignmentBuilder : PropertiesAssignmentBuilder()
-open class KafkaMessageConsumerPropertiesAssignmentBuilder : MessageConsumerPropertiesAssignmentBuilder() {
+/** KafkaBasicAuthMessageConsumerProperties assignment builder */
+open class KafkaBasicAuthMessageConsumerPropertiesAssignmentBuilder : MessageConsumerPropertiesAssignmentBuilder() {
fun bootstrapServers(bootstrapServers: String) = bootstrapServers(bootstrapServers.asJsonPrimitive())
fun bootstrapServers(bootstrapServers: JsonNode) =
- property(KafkaMessageConsumerProperties::bootstrapServers, bootstrapServers)
+ property(KafkaBasicAuthMessageConsumerProperties::bootstrapServers, bootstrapServers)
fun groupId(groupId: String) = groupId(groupId.asJsonPrimitive())
fun groupId(groupId: JsonNode) =
- property(KafkaMessageConsumerProperties::groupId, groupId)
+ property(KafkaBasicAuthMessageConsumerProperties::groupId, groupId)
fun clientId(clientId: String) = clientId(clientId.asJsonPrimitive())
fun clientId(clientId: JsonNode) =
- property(KafkaMessageConsumerProperties::clientId, clientId)
+ property(KafkaBasicAuthMessageConsumerProperties::clientId, clientId)
fun topic(topic: String) = topic(topic.asJsonPrimitive())
fun topic(topic: JsonNode) =
- property(KafkaMessageConsumerProperties::topic, topic)
+ property(KafkaBasicAuthMessageConsumerProperties::topic, topic)
fun autoCommit(autoCommit: Boolean) = autoCommit(autoCommit.asJsonPrimitive())
fun autoCommit(autoCommit: JsonNode) =
- property(KafkaMessageConsumerProperties::autoCommit, autoCommit)
+ property(KafkaBasicAuthMessageConsumerProperties::autoCommit, autoCommit)
fun autoOffsetReset(autoOffsetReset: String) = autoOffsetReset(autoOffsetReset.asJsonPrimitive())
fun autoOffsetReset(autoOffsetReset: JsonNode) =
- property(KafkaMessageConsumerProperties::autoOffsetReset, autoOffsetReset)
+ property(KafkaBasicAuthMessageConsumerProperties::autoOffsetReset, autoOffsetReset)
fun pollMillSec(pollMillSec: Int) = pollMillSec(pollMillSec.asJsonPrimitive())
fun pollMillSec(pollMillSec: JsonNode) =
- property(KafkaMessageConsumerProperties::pollMillSec, pollMillSec)
+ property(KafkaBasicAuthMessageConsumerProperties::pollMillSec, pollMillSec)
fun pollRecords(pollRecords: Int) = pollRecords(pollRecords.asJsonPrimitive())
fun pollRecords(pollRecords: JsonNode) =
- property(KafkaMessageConsumerProperties::pollRecords, pollRecords)
+ property(KafkaBasicAuthMessageConsumerProperties::pollRecords, pollRecords)
}
-/** KafkaBasicAuthMessageConsumerProperties assignment builder */
-class KafkaBasicAuthMessageConsumerPropertiesAssignmentBuilder : KafkaMessageConsumerPropertiesAssignmentBuilder()
+open class KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder : KafkaBasicAuthMessageConsumerPropertiesAssignmentBuilder() {
+ fun truststore(truststore: String) = truststore(truststore.asJsonPrimitive())
+
+ fun truststore(truststore: JsonNode) =
+ property(KafkaSslAuthMessageConsumerProperties::truststore, truststore)
+
+ fun truststorePassword(truststorePassword: String) = truststorePassword(truststorePassword.asJsonPrimitive())
+
+ fun truststorePassword(truststorePassword: JsonNode) =
+ property(KafkaSslAuthMessageConsumerProperties::truststorePassword, truststorePassword)
+
+ fun truststoreType(truststoreType: String) = truststoreType(truststoreType.asJsonPrimitive())
+
+ fun truststoreType(truststoreType: JsonNode) =
+ property(KafkaSslAuthMessageConsumerProperties::truststoreType, truststoreType)
+
+ fun keystore(keystore: String) = keystore(keystore.asJsonPrimitive())
+
+ fun keystore(keystore: JsonNode) =
+ property(KafkaSslAuthMessageProducerProperties::keystore, keystore)
+
+ fun keystorePassword(keystorePassword: String) = keystorePassword(keystorePassword.asJsonPrimitive())
+
+ fun keystorePassword(keystorePassword: JsonNode) =
+ property(KafkaSslAuthMessageProducerProperties::keystorePassword, keystorePassword)
+
+ fun keystoreType(keystoreType: String) = keystoreType(keystoreType.asJsonPrimitive())
+
+ fun keystoreType(keystoreType: JsonNode) =
+ property(KafkaSslAuthMessageProducerProperties::keystoreType, keystoreType)
+
+ fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: String) =
+ sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm.asJsonPrimitive())
+
+ fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: JsonNode) =
+ property(KafkaSslAuthMessageConsumerProperties::sslEndpointIdentificationAlgorithm, sslEndpointIdentificationAlgorithm)
+}
+
+class KafkaScramSslAuthMessageConsumerPropertiesAssignmentBuilder : KafkaSslAuthMessageConsumerPropertiesAssignmentBuilder() {
+ fun saslMechanism(saslMechanism: String) = saslMechanism(saslMechanism.asJsonPrimitive())
+
+ fun saslMechanism(saslMechanism: JsonNode) =
+ property(KafkaScramSslAuthMessageConsumerProperties::saslMechanism, saslMechanism)
+
+ fun scramUsername(scramUsername: String) = scramUsername(scramUsername.asJsonPrimitive())
+
+ fun scramUsername(scramUsername: JsonNode) =
+ property(KafkaScramSslAuthMessageConsumerProperties::scramUsername, scramUsername)
+
+ fun scramPassword(scramPassword: String) = scramPassword(scramPassword.asJsonPrimitive())
+
+ fun scramPassword(scramPassword: JsonNode) =
+ property(KafkaScramSslAuthMessageConsumerProperties::scramPassword, scramPassword)
+}
/** KafkaStreamsConsumerProperties assignment builder */
-open class KafkaStreamsConsumerPropertiesAssignmentBuilder : MessageConsumerPropertiesAssignmentBuilder() {
+open class KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder : MessageConsumerPropertiesAssignmentBuilder() {
fun bootstrapServers(bootstrapServers: String) = bootstrapServers(bootstrapServers.asJsonPrimitive())
fun bootstrapServers(bootstrapServers: JsonNode) =
- property(KafkaStreamsConsumerProperties::bootstrapServers, bootstrapServers)
+ property(KafkaStreamsBasicAuthConsumerProperties::bootstrapServers, bootstrapServers)
fun applicationId(applicationId: String) = bootstrapServers(applicationId.asJsonPrimitive())
fun applicationId(applicationId: JsonNode) =
- property(KafkaStreamsConsumerProperties::applicationId, applicationId)
+ property(KafkaStreamsBasicAuthConsumerProperties::applicationId, applicationId)
fun topic(topic: String) = topic(topic.asJsonPrimitive())
fun topic(topic: JsonNode) =
- property(KafkaStreamsConsumerProperties::topic, topic)
+ property(KafkaStreamsBasicAuthConsumerProperties::topic, topic)
fun autoOffsetReset(autoOffsetReset: String) = autoOffsetReset(autoOffsetReset.asJsonPrimitive())
fun autoOffsetReset(autoOffsetReset: JsonNode) =
- property(KafkaStreamsConsumerProperties::autoOffsetReset, autoOffsetReset)
+ property(KafkaStreamsBasicAuthConsumerProperties::autoOffsetReset, autoOffsetReset)
fun processingGuarantee(processingGuarantee: String) = processingGuarantee(processingGuarantee.asJsonPrimitive())
fun processingGuarantee(processingGuarantee: JsonNode) =
- property(KafkaStreamsConsumerProperties::processingGuarantee, processingGuarantee)
+ property(KafkaStreamsBasicAuthConsumerProperties::processingGuarantee, processingGuarantee)
}
-class KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder : KafkaStreamsConsumerPropertiesAssignmentBuilder()
+open class KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder : KafkaStreamsBasicAuthConsumerPropertiesAssignmentBuilder() {
+ fun truststore(truststore: String) = truststore(truststore.asJsonPrimitive())
+
+ fun truststore(truststore: JsonNode) =
+ property(KafkaStreamsSslAuthConsumerProperties::truststore, truststore)
+
+ fun truststorePassword(truststorePassword: String) = truststorePassword(truststorePassword.asJsonPrimitive())
+
+ fun truststorePassword(truststorePassword: JsonNode) =
+ property(KafkaStreamsSslAuthConsumerProperties::truststorePassword, truststorePassword)
+
+ fun truststoreType(truststoreType: String) = truststoreType(truststoreType.asJsonPrimitive())
+
+ fun truststoreType(truststoreType: JsonNode) =
+ property(KafkaStreamsSslAuthConsumerProperties::truststoreType, truststoreType)
+
+ fun keystore(keystore: String) = keystore(keystore.asJsonPrimitive())
+
+ fun keystore(keystore: JsonNode) =
+ property(KafkaSslAuthMessageProducerProperties::keystore, keystore)
+
+ fun keystorePassword(keystorePassword: String) = keystorePassword(keystorePassword.asJsonPrimitive())
+
+ fun keystorePassword(keystorePassword: JsonNode) =
+ property(KafkaSslAuthMessageProducerProperties::keystorePassword, keystorePassword)
+
+ fun keystoreType(keystoreType: String) = keystoreType(keystoreType.asJsonPrimitive())
+
+ fun keystoreType(keystoreType: JsonNode) =
+ property(KafkaSslAuthMessageProducerProperties::keystoreType, keystoreType)
+
+ fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: String) =
+ sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm.asJsonPrimitive())
+
+ fun sslEndpointIdentificationAlgorithm(sslEndpointIdentificationAlgorithm: JsonNode) =
+ property(KafkaStreamsSslAuthConsumerProperties::sslEndpointIdentificationAlgorithm, sslEndpointIdentificationAlgorithm)
+}
+
+class KafkaStreamsScramSslAuthConsumerPropertiesAssignmentBuilder : KafkaStreamsSslAuthConsumerPropertiesAssignmentBuilder() {
+ fun saslMechanism(saslMechanism: String) = saslMechanism(saslMechanism.asJsonPrimitive())
+
+ fun saslMechanism(saslMechanism: JsonNode) =
+ property(KafkaStreamsScramSslAuthConsumerProperties::saslMechanism, saslMechanism)
+
+ fun scramUsername(scramUsername: String) = scramUsername(scramUsername.asJsonPrimitive())
+
+ fun scramUsername(scramUsername: JsonNode) =
+ property(KafkaStreamsScramSslAuthConsumerProperties::scramUsername, scramUsername)
+
+ fun scramPassword(scramPassword: String) = scramPassword(scramPassword.asJsonPrimitive())
+
+ fun scramPassword(scramPassword: JsonNode) =
+ property(KafkaStreamsScramSslAuthConsumerProperties::scramPassword, scramPassword)
+}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt
index 44b50af44..67fbef580 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt
@@ -21,7 +21,13 @@ import com.fasterxml.jackson.databind.JsonNode
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaScramSslAuthMessageConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaScramSslAuthMessageProducerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaSslAuthMessageConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaSslAuthMessageProducerProperties
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsScramSslAuthConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsSslAuthConsumerProperties
import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties
@@ -34,20 +40,32 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer
fun blueprintMessageProducerService(jsonNode: JsonNode): BlueprintMessageProducerService {
val messageClientProperties = messageProducerProperties(jsonNode)
- return blueprintMessageProducerService(messageClientProperties)
+ return KafkaMessageProducerService(messageClientProperties)
}
fun blueprintMessageProducerService(selector: String): BlueprintMessageProducerService {
val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}$selector"
val messageClientProperties = messageProducerProperties(prefix)
- return blueprintMessageProducerService(messageClientProperties)
+ return KafkaMessageProducerService(messageClientProperties)
}
fun messageProducerProperties(prefix: String): MessageProducerProperties {
val type = bluePrintPropertiesService.propertyBeanType("$prefix.type", String::class.java)
return when (type) {
MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
- kafkaBasicAuthMessageProducerProperties(prefix)
+ bluePrintPropertiesService.propertyBeanType(
+ prefix, KafkaBasicAuthMessageProducerProperties::class.java
+ )
+ }
+ MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
+ bluePrintPropertiesService.propertyBeanType(
+ prefix, KafkaSslAuthMessageProducerProperties::class.java
+ )
+ }
+ MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
+ bluePrintPropertiesService.propertyBeanType(
+ prefix, KafkaScramSslAuthMessageProducerProperties::class.java
+ )
}
else -> {
throw BluePrintProcessorException("Message adaptor($type) is not supported")
@@ -61,31 +79,18 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer
MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageProducerProperties::class.java)!!
}
- else -> {
- throw BluePrintProcessorException("Message adaptor($type) is not supported")
+ MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
+ JacksonUtils.readValue(jsonNode, KafkaSslAuthMessageProducerProperties::class.java)!!
}
- }
- }
-
- private fun blueprintMessageProducerService(MessageProducerProperties: MessageProducerProperties):
- BlueprintMessageProducerService {
-
- when (MessageProducerProperties) {
- is KafkaBasicAuthMessageProducerProperties -> {
- return KafkaBasicAuthMessageProducerService(MessageProducerProperties)
+ MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
+ JacksonUtils.readValue(jsonNode, KafkaScramSslAuthMessageProducerProperties::class.java)!!
}
else -> {
- throw BluePrintProcessorException("couldn't get Message client service for")
+ throw BluePrintProcessorException("Message adaptor($type) is not supported")
}
}
}
- private fun kafkaBasicAuthMessageProducerProperties(prefix: String): KafkaBasicAuthMessageProducerProperties {
- return bluePrintPropertiesService.propertyBeanType(
- prefix, KafkaBasicAuthMessageProducerProperties::class.java
- )
- }
-
/** Consumer Property Lib Service Implementation **/
/** Return Message Consumer Service for [jsonNode] definitions. */
@@ -105,11 +110,37 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer
fun messageConsumerProperties(prefix: String): MessageConsumerProperties {
val type = bluePrintPropertiesService.propertyBeanType("$prefix.type", String::class.java)
return when (type) {
+ /** Message Consumer */
MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
- kafkaBasicAuthMessageConsumerProperties(prefix)
+ bluePrintPropertiesService.propertyBeanType(
+ prefix, KafkaBasicAuthMessageConsumerProperties::class.java
+ )
+ }
+ MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
+ bluePrintPropertiesService.propertyBeanType(
+ prefix, KafkaSslAuthMessageConsumerProperties::class.java
+ )
}
+ MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
+ bluePrintPropertiesService.propertyBeanType(
+ prefix, KafkaScramSslAuthMessageConsumerProperties::class.java
+ )
+ }
+ /** Stream Consumer */
MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
- kafkaStreamsBasicAuthMessageConsumerProperties(prefix)
+ bluePrintPropertiesService.propertyBeanType(
+ prefix, KafkaStreamsBasicAuthConsumerProperties::class.java
+ )
+ }
+ MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> {
+ bluePrintPropertiesService.propertyBeanType(
+ prefix, KafkaStreamsSslAuthConsumerProperties::class.java
+ )
+ }
+ MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> {
+ bluePrintPropertiesService.propertyBeanType(
+ prefix, KafkaStreamsScramSslAuthConsumerProperties::class.java
+ )
}
else -> {
throw BluePrintProcessorException("Message adaptor($type) is not supported")
@@ -120,12 +151,26 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer
fun messageConsumerProperties(jsonNode: JsonNode): MessageConsumerProperties {
val type = jsonNode.get("type").textValue()
return when (type) {
+ /** Message Consumer */
MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageConsumerProperties::class.java)!!
}
+ MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
+ JacksonUtils.readValue(jsonNode, KafkaSslAuthMessageConsumerProperties::class.java)!!
+ }
+ MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
+ JacksonUtils.readValue(jsonNode, KafkaScramSslAuthMessageConsumerProperties::class.java)!!
+ }
+ /** Stream Consumer */
MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
JacksonUtils.readValue(jsonNode, KafkaStreamsBasicAuthConsumerProperties::class.java)!!
}
+ MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> {
+ JacksonUtils.readValue(jsonNode, KafkaStreamsSslAuthConsumerProperties::class.java)!!
+ }
+ MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> {
+ JacksonUtils.readValue(jsonNode, KafkaStreamsScramSslAuthConsumerProperties::class.java)!!
+ }
else -> {
throw BluePrintProcessorException("Message adaptor($type) is not supported")
}
@@ -135,28 +180,42 @@ open class BluePrintMessageLibPropertyService(private var bluePrintPropertiesSer
private fun blueprintMessageConsumerService(messageConsumerProperties: MessageConsumerProperties):
BlueprintMessageConsumerService {
- when (messageConsumerProperties) {
- is KafkaBasicAuthMessageConsumerProperties -> {
- return KafkaBasicAuthMessageConsumerService(messageConsumerProperties)
+ when (messageConsumerProperties.type) {
+ /** Message Consumer */
+ MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
+ return KafkaMessageConsumerService(
+ messageConsumerProperties as KafkaBasicAuthMessageConsumerProperties
+ )
+ }
+ MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
+ return KafkaMessageConsumerService(
+ messageConsumerProperties as KafkaSslAuthMessageConsumerProperties
+ )
+ }
+ MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
+ return KafkaMessageConsumerService(
+ messageConsumerProperties as KafkaScramSslAuthMessageConsumerProperties
+ )
}
- is KafkaStreamsBasicAuthConsumerProperties -> {
- return KafkaStreamsBasicAuthConsumerService(messageConsumerProperties)
+ /** Stream Consumer */
+ MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
+ return KafkaStreamsConsumerService(
+ messageConsumerProperties as KafkaStreamsBasicAuthConsumerProperties
+ )
+ }
+ MessageLibConstants.TYPE_KAFKA_STREAMS_SSL_AUTH -> {
+ return KafkaStreamsConsumerService(
+ messageConsumerProperties as KafkaStreamsSslAuthConsumerProperties
+ )
+ }
+ MessageLibConstants.TYPE_KAFKA_STREAMS_SCRAM_SSL_AUTH -> {
+ return KafkaStreamsConsumerService(
+ messageConsumerProperties as KafkaStreamsScramSslAuthConsumerProperties
+ )
}
else -> {
- throw BluePrintProcessorException("couldn't get Message client service for")
+ throw BluePrintProcessorException("couldn't get message client service for ${messageConsumerProperties.type}")
}
}
}
-
- private fun kafkaBasicAuthMessageConsumerProperties(prefix: String): KafkaBasicAuthMessageConsumerProperties {
- return bluePrintPropertiesService.propertyBeanType(
- prefix, KafkaBasicAuthMessageConsumerProperties::class.java
- )
- }
-
- private fun kafkaStreamsBasicAuthMessageConsumerProperties(prefix: String): KafkaStreamsBasicAuthConsumerProperties {
- return bluePrintPropertiesService.propertyBeanType(
- prefix, KafkaStreamsBasicAuthConsumerProperties::class.java
- )
- }
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt
index 3415c8d0d..cdcd4197c 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt
@@ -21,24 +21,20 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
-import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.Consumer
-import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.apache.kafka.common.serialization.ByteArrayDeserializer
-import org.apache.kafka.common.serialization.StringDeserializer
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import java.nio.charset.Charset
import java.time.Duration
import kotlin.concurrent.thread
-open class KafkaBasicAuthMessageConsumerService(
+open class KafkaMessageConsumerService(
private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties
) :
BlueprintMessageConsumerService {
- val log = logger(KafkaBasicAuthMessageConsumerService::class)
+ val log = logger(KafkaMessageConsumerService::class)
val channel = Channel<String>()
var kafkaConsumer: Consumer<String, ByteArray>? = null
@@ -46,24 +42,7 @@ open class KafkaBasicAuthMessageConsumerService(
var keepGoing = true
fun kafkaConsumer(additionalConfig: Map<String, Any>? = null): Consumer<String, ByteArray> {
- val configProperties = hashMapOf<String, Any>()
- configProperties[CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG] = messageConsumerProperties.bootstrapServers
- configProperties[ConsumerConfig.GROUP_ID_CONFIG] = messageConsumerProperties.groupId
- configProperties[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = messageConsumerProperties.autoCommit
- /**
- * earliest: automatically reset the offset to the earliest offset
- * latest: automatically reset the offset to the latest offset
- */
- configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = messageConsumerProperties.autoOffsetReset
- configProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
- configProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java
- configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId
-
- /** To handle Back pressure, Get only configured record for processing */
- if (messageConsumerProperties.pollRecords > 0) {
- configProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = messageConsumerProperties.pollRecords
- }
- // TODO("Security Implementation based on type")
+ val configProperties = messageConsumerProperties.getConfig()
/** add or override already set properties */
additionalConfig?.let { configProperties.putAll(it) }
/** Create Kafka consumer */
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
index 8416282af..931f052ed 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
@@ -20,28 +20,20 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service
import org.apache.commons.lang.builder.ToStringBuilder
import org.apache.kafka.clients.producer.Callback
import org.apache.kafka.clients.producer.KafkaProducer
-import org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.CLIENT_ID_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.header.internals.RecordHeader
-import org.apache.kafka.common.serialization.ByteArraySerializer
-import org.apache.kafka.common.serialization.StringSerializer
-import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties
import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID
import org.slf4j.LoggerFactory
import java.nio.charset.Charset
-class KafkaBasicAuthMessageProducerService(
- private val messageProducerProperties: KafkaBasicAuthMessageProducerProperties
+class KafkaMessageProducerService(
+ private val messageProducerProperties: MessageProducerProperties
) :
BlueprintMessageProducerService {
- private val log = LoggerFactory.getLogger(KafkaBasicAuthMessageProducerService::class.java)!!
+ private val log = LoggerFactory.getLogger(KafkaMessageProducerService::class.java)!!
private var kafkaProducer: KafkaProducer<String, ByteArray>? = null
@@ -81,26 +73,16 @@ class KafkaBasicAuthMessageProducerService(
}
fun messageTemplate(additionalConfig: Map<String, ByteArray>? = null): KafkaProducer<String, ByteArray> {
- log.trace("Client Properties : ${ToStringBuilder.reflectionToString(messageProducerProperties)}")
- val configProps = hashMapOf<String, Any>()
- configProps[BOOTSTRAP_SERVERS_CONFIG] = messageProducerProperties.bootstrapServers
- configProps[KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
- configProps[VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java
- configProps[ACKS_CONFIG] = messageProducerProperties.acks
- configProps[ENABLE_IDEMPOTENCE_CONFIG] = messageProducerProperties.enableIdempotence
- if (messageProducerProperties.clientId != null) {
- configProps[CLIENT_ID_CONFIG] = messageProducerProperties.clientId!!
- }
- // TODO("Security Implementation based on type")
+ log.trace("Producer client properties : ${ToStringBuilder.reflectionToString(messageProducerProperties)}")
+ val configProps = messageProducerProperties.getConfig()
- // Add additional Properties
- if (additionalConfig != null) {
+ /** Add additional Properties */
+ if (additionalConfig != null)
configProps.putAll(additionalConfig)
- }
- if (kafkaProducer == null) {
+ if (kafkaProducer == null)
kafkaProducer = KafkaProducer(configProps)
- }
+
return kafkaProducer!!
}
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt
index 0b353d58b..60f2dfa05 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt
@@ -17,27 +17,22 @@
package org.onap.ccsdk.cds.blueprintsprocessor.message.service
import kotlinx.coroutines.channels.Channel
-import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.streams.KafkaStreams
-import org.apache.kafka.streams.StreamsConfig
-import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import java.util.Properties
-open class KafkaStreamsBasicAuthConsumerService(private val messageConsumerProperties: KafkaStreamsBasicAuthConsumerProperties) :
+open class KafkaStreamsConsumerService(private val messageConsumerProperties: MessageConsumerProperties) :
BlueprintMessageConsumerService {
- val log = logger(KafkaStreamsBasicAuthConsumerService::class)
+ val log = logger(KafkaStreamsConsumerService::class)
lateinit var kafkaStreams: KafkaStreams
private fun streamsConfig(additionalConfig: Map<String, Any>? = null): Properties {
val configProperties = Properties()
- configProperties[StreamsConfig.APPLICATION_ID_CONFIG] = messageConsumerProperties.applicationId
- configProperties[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = messageConsumerProperties.bootstrapServers
- configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = messageConsumerProperties.autoOffsetReset
- configProperties[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = messageConsumerProperties.processingGuarantee
- // TODO("Security Implementation based on type")
+ /** set consumer properties */
+ messageConsumerProperties.getConfig().let { configProperties.putAll(it) }
/** add or override already set properties */
additionalConfig?.let { configProperties.putAll(it) }
/** Create Kafka consumer */
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt
index b10e1023b..612a57d23 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt
@@ -27,17 +27,27 @@ import kotlin.test.assertNotNull
class MessagePropertiesDSLTest {
@Test
- fun testMessageProducerDSL() {
+ fun testScramSslMessageProducerDSL() {
val serviceTemplate = serviceTemplate("message-properties-test", "1.0.0", "xxx.@xx.com", "message") {
topologyTemplate {
- relationshipTemplateMessageProducer("sample-basic-auth", "Message Producer") {
- kafkaBasicAuth {
+ relationshipTemplateMessageProducer("sample-scram-ssl-auth", "Message Producer") {
+ kafkaScramSslAuth {
bootstrapServers("sample-bootstrapServers")
clientId("sample-client-id")
acks("all")
retries(3)
enableIdempotence(true)
topic("sample-topic")
+ truststore("/path/to/truststore.jks")
+ truststorePassword("secretpassword")
+ truststoreType("JKS")
+ keystore("/path/to/keystore.jks")
+ keystorePassword("secretpassword")
+ keystoreType("JKS")
+ sslEndpointIdentificationAlgorithm("")
+ saslMechanism("SCRAM-SHA-512")
+ scramUsername("sample-user")
+ scramPassword("secretpassword")
}
}
}
@@ -50,27 +60,27 @@ class MessagePropertiesDSLTest {
val relationshipTemplates = serviceTemplate.topologyTemplate?.relationshipTemplates
assertNotNull(relationshipTemplates, "failed to get relationship templates")
assertEquals(1, relationshipTemplates.size, "relationshipTemplates doesn't match")
- assertNotNull(relationshipTemplates["sample-basic-auth"], "failed to get sample-basic-auth")
+ assertNotNull(relationshipTemplates["sample-scram-ssl-auth"], "failed to get sample-scram-ssl-auth")
val relationshipTypes = serviceTemplate.relationshipTypes
assertNotNull(relationshipTypes, "failed to get relationship types")
assertEquals(2, relationshipTypes.size, "relationshipTypes doesn't match")
assertNotNull(
- relationshipTypes[BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO],
- "failed to get ${BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO}"
+ relationshipTypes[BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO],
+ "failed to get ${BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO}"
)
assertNotNull(
- relationshipTypes[BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER],
- "failed to get ${BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER}"
+ relationshipTypes[BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER],
+ "failed to get ${BluePrintConstants.MODEL_TYPE_RELATIONSHIPS_CONNECTS_TO_MESSAGE_PRODUCER}"
)
}
@Test
- fun testMessageConsumerDSL() {
+ fun testScramSslAuthMessageConsumerDSL() {
val serviceTemplate = serviceTemplate("message-properties-test", "1.0.0", "xxx.@xx.com", "message") {
topologyTemplate {
- relationshipTemplateMessageConsumer("sample-basic-auth", "Message Consumer") {
- kafkaBasicAuth {
+ relationshipTemplateMessageConsumer("sample-scram-ssl-auth", "Message Consumer") {
+ kafkaScramSslAuth {
bootstrapServers("sample-bootstrapServers")
clientId("sample-client-id")
groupId("sample-group-id")
@@ -79,15 +89,35 @@ class MessagePropertiesDSLTest {
autoOffsetReset("latest")
pollMillSec(5000)
pollRecords(20)
+ truststore("/path/to/truststore.jks")
+ truststorePassword("secretpassword")
+ truststoreType("JKS")
+ keystore("/path/to/keystore.jks")
+ keystorePassword("secretpassword")
+ keystoreType("JKS")
+ sslEndpointIdentificationAlgorithm("")
+ saslMechanism("SCRAM-SHA-512")
+ scramUsername("sample-user")
+ scramPassword("secretpassword")
}
}
- relationshipTemplateMessageConsumer("sample-stream-basic-auth", "Message Consumer") {
- kafkaStreamsBasicAuth {
+ relationshipTemplateMessageConsumer("sample-stream-scram-ssl-auth", "Message Consumer") {
+ kafkaStreamsScramSslAuth {
bootstrapServers("sample-bootstrapServers")
applicationId("sample-application-id")
autoOffsetReset("latest")
processingGuarantee(StreamsConfig.EXACTLY_ONCE)
topic("sample-streaming-topic")
+ truststore("/path/to/truststore.jks")
+ truststorePassword("secretpassword")
+ truststoreType("JKS")
+ keystore("/path/to/keystore.jks")
+ keystorePassword("secretpassword")
+ keystoreType("JKS")
+ sslEndpointIdentificationAlgorithm("")
+ saslMechanism("SCRAM-SHA-512")
+ scramUsername("sample-user")
+ scramPassword("secretpassword")
}
}
}
@@ -100,8 +130,8 @@ class MessagePropertiesDSLTest {
val relationshipTemplates = serviceTemplate.topologyTemplate?.relationshipTemplates
assertNotNull(relationshipTemplates, "failed to get relationship templates")
assertEquals(2, relationshipTemplates.size, "relationshipTemplates doesn't match")
- assertNotNull(relationshipTemplates["sample-basic-auth"], "failed to get sample-basic-auth")
- assertNotNull(relationshipTemplates["sample-stream-basic-auth"], "failed to get sample-stream-basic-auth")
+ assertNotNull(relationshipTemplates["sample-scram-ssl-auth"], "failed to get sample-scram-ssl-auth")
+ assertNotNull(relationshipTemplates["sample-stream-scram-ssl-auth"], "failed to get sample-stream-scram-ssl-auth")
val relationshipTypes = serviceTemplate.relationshipTypes
assertNotNull(relationshipTypes, "failed to get relationship types")
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
index 823ba7dee..ac08dc7b7 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
@@ -23,24 +23,35 @@ import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
+import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.Consumer
+import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.MockConsumer
import org.apache.kafka.clients.consumer.OffsetResetStrategy
+import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.config.SslConfigs
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.security.scram.ScramLoginModule
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.apache.kafka.common.serialization.StringDeserializer
import org.junit.Test
import org.junit.runner.RunWith
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.test.annotation.DirtiesContext
import org.springframework.test.context.ContextConfiguration
import org.springframework.test.context.TestPropertySource
import org.springframework.test.context.junit4.SpringRunner
+import kotlin.test.assertEquals
import kotlin.test.assertNotNull
import kotlin.test.assertTrue
@@ -52,18 +63,30 @@ import kotlin.test.assertTrue
)
@TestPropertySource(
properties =
- ["blueprintsprocessor.messageconsumer.sample.type=kafka-basic-auth",
+ ["blueprintsprocessor.messageconsumer.sample.type=kafka-scram-ssl-auth",
"blueprintsprocessor.messageconsumer.sample.bootstrapServers=127.0.0.1:9092",
"blueprintsprocessor.messageconsumer.sample.groupId=sample-group",
"blueprintsprocessor.messageconsumer.sample.topic=default-topic",
"blueprintsprocessor.messageconsumer.sample.clientId=default-client-id",
"blueprintsprocessor.messageconsumer.sample.pollMillSec=10",
"blueprintsprocessor.messageconsumer.sample.pollRecords=1",
+ "blueprintsprocessor.messageconsumer.sample.truststore=/path/to/truststore.jks",
+ "blueprintsprocessor.messageconsumer.sample.truststorePassword=secretpassword",
+ "blueprintsprocessor.messageconsumer.sample.keystore=/path/to/keystore.jks",
+ "blueprintsprocessor.messageconsumer.sample.keystorePassword=secretpassword",
+ "blueprintsprocessor.messageconsumer.sample.scramUsername=sample-user",
+ "blueprintsprocessor.messageconsumer.sample.scramPassword=secretpassword",
- "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
+ "blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth",
"blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
"blueprintsprocessor.messageproducer.sample.topic=default-topic",
- "blueprintsprocessor.messageproducer.sample.clientId=default-client-id"
+ "blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
+ "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks",
+ "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword",
+ "blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks",
+ "blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword",
+ "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user",
+ "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword"
]
)
open class BlueprintMessageConsumerServiceTest {
@@ -77,7 +100,7 @@ open class BlueprintMessageConsumerServiceTest {
fun testKafkaBasicAuthConsumerService() {
runBlocking {
val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
- .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
+ .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
@@ -124,7 +147,7 @@ open class BlueprintMessageConsumerServiceTest {
fun testKafkaBasicAuthConsumerWithDynamicFunction() {
runBlocking {
val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
- .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
+ .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
val spyBlueprintMessageConsumerService = spyk(blueprintMessageConsumerService, recordPrivateCalls = true)
@@ -173,12 +196,60 @@ open class BlueprintMessageConsumerServiceTest {
}
}
+ @Test
+ fun testKafkaScramSslAuthConfig() {
+
+ val expectedConfig = mapOf<String, Any>(
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
+ ConsumerConfig.GROUP_ID_CONFIG to "sample-group",
+ ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true,
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
+ ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id",
+ CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(),
+ SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS",
+ SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks",
+ SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG to "secretpassword",
+ SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks",
+ SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS",
+ SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword",
+ SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to "",
+ SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512",
+ SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " +
+ "username=\"sample-user\" " +
+ "password=\"secretpassword\";"
+ )
+
+ val messageConsumerProperties = bluePrintMessageLibPropertyService
+ .messageConsumerProperties("${MessageLibConstants.PROPERTY_MESSAGE_CONSUMER_PREFIX}sample")
+
+ val configProps = messageConsumerProperties.getConfig()
+
+ assertEquals(messageConsumerProperties.topic,
+ "default-topic",
+ "Topic doesn't match the expected value"
+ )
+ assertEquals(messageConsumerProperties.type,
+ "kafka-scram-ssl-auth",
+ "Authentication type doesn't match the expected value")
+
+ expectedConfig.forEach {
+ assertTrue(configProps.containsKey(it.key),
+ "Missing expected kafka config key : ${it.key}")
+ assertEquals(configProps[it.key],
+ it.value,
+ "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}"
+ )
+ }
+ }
+
/** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
// @Test
fun testKafkaIntegration() {
runBlocking {
val blueprintMessageConsumerService = bluePrintMessageLibPropertyService
- .blueprintMessageConsumerService("sample") as KafkaBasicAuthMessageConsumerService
+ .blueprintMessageConsumerService("sample") as KafkaMessageConsumerService
assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageConsumerService")
val channel = blueprintMessageConsumerService.subscribe(null)
@@ -190,7 +261,7 @@ open class BlueprintMessageConsumerServiceTest {
/** Send message with every 1 sec */
val blueprintMessageProducerService = bluePrintMessageLibPropertyService
- .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
+ .blueprintMessageProducerService("sample") as KafkaMessageProducerService
launch {
repeat(5) {
delay(100)
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
index b824189d2..72a47ed56 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
@@ -20,12 +20,22 @@ import io.mockk.every
import io.mockk.mockk
import io.mockk.spyk
import kotlinx.coroutines.runBlocking
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.RecordMetadata
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.config.SslConfigs
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.security.scram.ScramLoginModule
+import org.apache.kafka.common.serialization.ByteArraySerializer
+import org.apache.kafka.common.serialization.StringSerializer
import org.junit.runner.RunWith
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.test.annotation.DirtiesContext
import org.springframework.test.context.ContextConfiguration
@@ -33,6 +43,7 @@ import org.springframework.test.context.TestPropertySource
import org.springframework.test.context.junit4.SpringRunner
import java.util.concurrent.Future
import kotlin.test.Test
+import kotlin.test.assertEquals
import kotlin.test.assertTrue
@RunWith(SpringRunner::class)
@@ -43,10 +54,16 @@ import kotlin.test.assertTrue
)
@TestPropertySource(
properties =
- ["blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
+ ["blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth",
"blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
"blueprintsprocessor.messageproducer.sample.topic=default-topic",
- "blueprintsprocessor.messageproducer.sample.clientId=default-client-id"
+ "blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
+ "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks",
+ "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword",
+ "blueprintsprocessor.messageproducer.sample.keystore=/path/to/keystore.jks",
+ "blueprintsprocessor.messageproducer.sample.keystorePassword=secretpassword",
+ "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user",
+ "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword"
]
)
open class BlueprintMessageProducerServiceTest {
@@ -55,10 +72,10 @@ open class BlueprintMessageProducerServiceTest {
lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
@Test
- fun testKafkaBasicAuthProducertService() {
+ fun testKafkaScramSslAuthProducerService() {
runBlocking {
val blueprintMessageProducerService = bluePrintMessageLibPropertyService
- .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
+ .blueprintMessageProducerService("sample") as KafkaMessageProducerService
val mockKafkaTemplate = mockk<KafkaProducer<String, ByteArray>>()
@@ -75,4 +92,51 @@ open class BlueprintMessageProducerServiceTest {
assertTrue(response, "failed to get command response")
}
}
+
+ @Test
+ fun testKafkaScramSslAuthConfig() {
+ val expectedConfig = mapOf<String, Any>(
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
+ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
+ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to ByteArraySerializer::class.java,
+ ProducerConfig.ACKS_CONFIG to "all",
+ ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to true,
+ ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id",
+ CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(),
+ SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG to "JKS",
+ SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG to "/path/to/truststore.jks",
+ SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG to "secretpassword",
+ SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks",
+ SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS",
+ SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword",
+ SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to "",
+ SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512",
+ SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " +
+ "username=\"sample-user\" " +
+ "password=\"secretpassword\";"
+ )
+
+ val messageProducerProperties = bluePrintMessageLibPropertyService
+ .messageProducerProperties("${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}sample")
+
+ val configProps = messageProducerProperties.getConfig()
+
+ assertEquals(messageProducerProperties.topic,
+ "default-topic",
+ "Topic doesn't match the expected value"
+ )
+ assertEquals(messageProducerProperties.type,
+ "kafka-scram-ssl-auth",
+ "Authentication type doesn't match the expected value")
+
+ expectedConfig.forEach {
+ assertTrue(configProps.containsKey(it.key),
+ "Missing expected kafka config key : ${it.key}"
+ )
+ assertEquals(configProps[it.key],
+ it.value,
+ "Unexpected value for ${it.key} got ${configProps[it.key]} instead of ${it.value}"
+ )
+ }
+ }
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt
index 1657d70b4..c30ab9b02 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt
@@ -47,19 +47,27 @@ import kotlin.test.assertNotNull
@TestPropertySource(
properties =
[
- "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
+ "blueprintsprocessor.messageproducer.sample.type=kafka-scram-ssl-auth",
"blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
"blueprintsprocessor.messageproducer.sample.topic=default-stream-topic",
"blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
+ "blueprintsprocessor.messageproducer.sample.truststore=/path/to/truststore.jks",
+ "blueprintsprocessor.messageproducer.sample.truststorePassword=secretpassword",
+ "blueprintsprocessor.messageproducer.sample.scramUsername=sample-user",
+ "blueprintsprocessor.messageproducer.sample.scramPassword=secretpassword",
- "blueprintsprocessor.messageconsumer.stream-consumer.type=kafka-streams-basic-auth",
+ "blueprintsprocessor.messageconsumer.stream-consumer.type=kafka-streams-scram-ssl-auth",
"blueprintsprocessor.messageconsumer.stream-consumer.bootstrapServers=127.0.0.1:9092",
"blueprintsprocessor.messageconsumer.stream-consumer.applicationId=test-streams-application",
- "blueprintsprocessor.messageconsumer.stream-consumer.topic=default-stream-topic"
+ "blueprintsprocessor.messageconsumer.stream-consumer.topic=default-stream-topic",
+ "blueprintsprocessor.messageproducer.stream-consumer.truststore=/path/to/truststore.jks",
+ "blueprintsprocessor.messageproducer.stream-consumer.truststorePassword=secretpassword",
+ "blueprintsprocessor.messageproducer.stream-consumer.scramUsername=sample-user",
+ "blueprintsprocessor.messageproducer.stream-consumer.scramPassword=secretpassword"
]
)
-class KafkaStreamsBasicAuthConsumerServiceTest {
+class KafkaStreamsConsumerServiceTest {
@Autowired
lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
@@ -117,7 +125,7 @@ class KafkaStreamsBasicAuthConsumerServiceTest {
/** Send message with every 1 sec */
val blueprintMessageProducerService = bluePrintMessageLibPropertyService
- .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
+ .blueprintMessageProducerService("sample") as KafkaMessageProducerService
launch {
repeat(5) {
delay(1000)
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/application-test.properties b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/application-test.properties
index fb2189ffb..77b61a421 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/application-test.properties
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/application-test.properties
@@ -39,20 +39,31 @@ blueprints.processor.functions.python.executor.modulePaths=./../../../../compone
# Kafka-message-lib Configurations
blueprintsprocessor.messageconsumer.self-service-api.kafkaEnable=false
-blueprintsprocessor.messageconsumer.self-service-api.type=kafka-basic-auth
+blueprintsprocessor.messageconsumer.self-service-api.type=kafka-scram-ssl-auth
blueprintsprocessor.messageconsumer.self-service-api.bootstrapServers=127.0.0.1:9092
blueprintsprocessor.messageconsumer.self-service-api.groupId=receiver-id
blueprintsprocessor.messageconsumer.self-service-api.topic=receiver.t
blueprintsprocessor.messageconsumer.self-service-api.clientId=request-receiver-client-id
blueprintsprocessor.messageconsumer.self-service-api.pollMillSec=1000
+### Security settings
+### SSL
+blueprintsprocessor.messageconsumer.self-service-api.truststore=src/test/resources/test.truststore.jks
+blueprintsprocessor.messageconsumer.self-service-api.truststorePassword=secretpassword
+blueprintsprocessor.messageconsumer.self-service-api.keystore=src/test/resources/test.keystore.jks
+blueprintsprocessor.messageconsumer.self-service-api.keystorePassword=secretpassword
+### SCRAM
+blueprintsprocessor.messageconsumer.self-service-api.scramUsername=test-user
+blueprintsprocessor.messageconsumer.self-service-api.scramPassword=testUserPassword
# Kafka audit service Configurations
+## Audit request
blueprintsprocessor.messageproducer.self-service-api.audit.kafkaEnable=false
blueprintsprocessor.messageproducer.self-service-api.audit.request.type=kafka-basic-auth
blueprintsprocessor.messageproducer.self-service-api.audit.request.bootstrapServers=127.0.0.1:9092
blueprintsprocessor.messageproducer.self-service-api.audit.request.clientId=audit-request-producer-client-id
blueprintsprocessor.messageproducer.self-service-api.audit.request.topic=audit-request-producer.t
+## Audit response
blueprintsprocessor.messageproducer.self-service-api.audit.response.type=kafka-basic-auth
blueprintsprocessor.messageproducer.self-service-api.audit.response.bootstrapServers=127.0.0.1:9092
blueprintsprocessor.messageproducer.self-service-api.audit.response.clientId=audit-response-producer-client-id
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/test.keystore.jks b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/test.keystore.jks
new file mode 100644
index 000000000..1a4150952
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/test.keystore.jks
Binary files differ
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/test.truststore.jks b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/test.truststore.jks
new file mode 100644
index 000000000..b094a1f8a
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/test.truststore.jks
Binary files differ