summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authory.busko <y.busko@partner.samsung.com>2021-04-14 16:13:09 +0200
committerk.kedron <k.kedron@partner.samsung.com>2021-04-16 17:43:35 +0200
commit8f21c92d8104ea9bcaa49998cfb0787514a6ddc9 (patch)
treed6c30e8ee411ac1cdcbab8d303400eb895ace35d
parent16c9954de0a4fdcb5324859b21ac928169792037 (diff)
Initial code for DataCollector RAPP
Updated pom files Added swagger templates at /doc/templates Change-Id: I2fb7b26993fa13f07c3cf013aec9df06bd2dca8b Issue-ID: INT-1887 Signed-off-by: Krystian Kedron <k.kedron@partner.samsung.com> Signed-off-by: yauheni busko <y.busko3@partner.samsung.com>
-rw-r--r--.gitignore5
-rw-r--r--datacollector/README.md214
-rw-r--r--datacollector/doc/swagger/templates/markdown.hbs108
-rw-r--r--datacollector/doc/swagger/templates/operation.hbs73
-rw-r--r--datacollector/doc/swagger/templates/security.hbs88
-rw-r--r--datacollector/doc/swagger/templates/strapdown.html.hbs10
-rw-r--r--datacollector/docker/.maven-dockerinclude1
-rw-r--r--datacollector/docker/Dockerfile14
-rw-r--r--datacollector/docker/init.sql61
-rw-r--r--datacollector/docker/start.sh10
-rw-r--r--datacollector/pom.xml131
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/DataCollectorApplication.java33
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/controller/PMController.java77
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/controller/UEController.java48
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/entity/DataAggregationInfo.java25
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/entity/UEInfo.java28
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/entity/pm/AggregatedPM.java29
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/entity/pm/PMData.java29
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/entity/pm/PerformanceData.java28
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/AdditionalMeasurementValues.java48
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/AdditionalMeasurements.java84
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/CommonEventHeader.java66
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/Event.java62
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/EventAPI.java46
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/MeasurementFields.java48
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/RawPayload.java57
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/service/DataAggregationService.java123
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/service/DmaapRestReader.java21
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/service/MariadbDialectResolver.java46
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/service/PMService.java90
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/service/SqlRepository.java24
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/service/SqlRepositoryAPI.java31
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/service/UEHolder.java32
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/service/VesParser.java21
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/service/VesParserImpl.java99
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/service/VesPersister.java35
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/service/VesPersisterSqlImpl.java85
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/service/VesRetrievalService.java99
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DatabaseProperties.java30
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapProperties.java33
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapRestReaderConfiguration.java165
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/SwaggerConfig.java50
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/WebConfig.java27
-rw-r--r--datacollector/src/main/resources/META-INF/spring.factories1
-rw-r--r--datacollector/src/main/resources/application.yml27
-rw-r--r--datacollector/src/test/java/org/onap/rapp/datacollector/controller/PMControllerTest.java303
-rw-r--r--datacollector/src/test/java/org/onap/rapp/datacollector/entity/ves/AdditionalMeasurementValuesTest.java29
-rw-r--r--datacollector/src/test/java/org/onap/rapp/datacollector/entity/ves/AdditionalMeasurementsTest.java53
-rw-r--r--datacollector/src/test/java/org/onap/rapp/datacollector/entity/ves/CommonEventHeaderTest.java89
-rw-r--r--datacollector/src/test/java/org/onap/rapp/datacollector/entity/ves/EventTest.java62
-rw-r--r--datacollector/src/test/java/org/onap/rapp/datacollector/entity/ves/MeasurementFieldsTest.java54
-rw-r--r--datacollector/src/test/java/org/onap/rapp/datacollector/service/DataAggregationServiceTest.java133
-rw-r--r--datacollector/src/test/java/org/onap/rapp/datacollector/service/DmaapRestReaderConfigurationTest.java57
-rw-r--r--datacollector/src/test/java/org/onap/rapp/datacollector/service/VesParserImplTest.java56
-rw-r--r--datacollector/src/test/java/org/onap/rapp/datacollector/service/VesPersisterSqlImplTest.java47
-rw-r--r--datacollector/src/test/java/org/onap/rapp/datacollector/service/VesRetrievalServiceTest.java113
-rw-r--r--datacollector/src/test/resources/sample-pm.json53
-rw-r--r--datacollector/src/test/resources/sample-ves.json36
-rw-r--r--pom.xml90
59 files changed, 3636 insertions, 1 deletions
diff --git a/.gitignore b/.gitignore
index 3a61301..281b36d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -106,4 +106,7 @@ local.properties
#.project
rapp.iml
datacollector.iml
-sleepingcelldetector.iml \ No newline at end of file
+sleepingcelldetector.iml
+
+datacollector/doc/swagger/html/*
+datacollector/doc/swagger/*json \ No newline at end of file
diff --git a/datacollector/README.md b/datacollector/README.md
new file mode 100644
index 0000000..ecd7878
--- /dev/null
+++ b/datacollector/README.md
@@ -0,0 +1,214 @@
+# DataCollector R-APP
+
+DataCollector R-APP is a Java SpringBoot application for consuming PM VES data from DMaaP and store it into DB. Also
+DataCollector R-APP provides API to query aggregated metrics from PM VES data and User Equipment info.
+
+## Use of DataCollector R-APP
+
+### Configuration
+
+DataCollector R-APP needs several parameters to be defined before start. All parameters are passed through environment
+variables
+
+To customize DB connection you need to create database schema. To do this use `init.sql` script under **src/main/docker/**.
+Additionally, you can use `start.sh` after you build DataCollector R-APP (please see **Build DataCollector R-APP** chapter). Those actions will create database schema and run the application.
+Also, you will need to set the following environment variables:
+
+- DATABASE_URL
+- DATABASE_USERNAME
+- DATABASE_PASSWORD
+
+To customize VES connectivity you need to set the following:
+
+- DMAAP_HOST
+- DMAAP_POR
+- DMAAP_MEASUREMENTS_TOPIC
+
+Example configuration in environment variables in application.yml:
+
+```
+server:
+ port: 8087
+dmaap:
+ prtocol: "http"
+ host: "localhost"
+ port: 8181
+ measurements-topic: "measurements"
+database:
+ host: mariadb-host
+ port: 3306
+ name: "ves"
+ username: ves
+ driver-class-name: "org.mariadb.jdbc.Driver"
+logging:
+ level:
+ org:
+ springframework: DEBUG
+ logging.file.name: logs/rapp-datacollector.log
+ pattern:
+ console: "%d %-5level %logger : %msg%n"
+ file: "%d %-5level [%thread] %logger : %msg%n"
+spring:
+ autoconfigure:
+ exclude:
+ - org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration
+ main:
+ allow-bean-definition-overriding: true
+
+```
+
+After startup DataCollector R-APP is ready to read VES Events from DMaaP and stores them in database if they are flowing
+into DMaaP.
+
+VES Event example
+
+```json
+{
+ "event": {
+ "commonEventHeader": {
+ "version": "4.0.1",
+ "vesEventListenerVersion": "7.0.1",
+ "sourceId": "de305d54-75b4-431b-adb2-eb6b9e546014",
+ "reportingEntityName": "ibcx0001vm002oam001",
+ "startEpochMicrosec": 1603293000000000,
+ "eventId": "measurement0000259",
+ "lastEpochMicrosec": 1603292917149000,
+ "priority": "Normal",
+ "sequence": 3,
+ "sourceName": "ibcx0001vm002ssc001",
+ "domain": "measurement",
+ "eventName": "Measurement_vIsbcMmc",
+ "reportingEntityId": "cc305d54-75b4-431b-adb2-eb6b9e541234",
+ "nfcNamingCode": "ssc",
+ "nfNamingCode": "ibcx"
+ },
+ "measurementFields": {
+ "measurementInterval": 5,
+ "measurementFieldsVersion": "4.0",
+ "additionalMeasurements": [
+ {
+ "name": "latency",
+ "hashMap": {
+ "value": "86"
+ }
+ },
+ {
+ "name": "throughput",
+ "hashMap": {
+ "value": "25"
+ }
+ },
+ {
+ "name": "identifier",
+ "hashMap": {
+ "value": "Cell1"
+ }
+ },
+ {
+ "name": "trafficModel",
+ "hashMap": {
+ "mobile_samsung_s10_01": "10"
+ }
+ }
+ ]
+ }
+ }
+}
+```
+
+Information about performance is situated in "additionalMeasurements" section, consisting of latency and throughput
+parameters of performance, identifier of event producer and trafficModel with user equipment information. DataCollector
+R-APP provides two endpoints to access data of stored VES Events:
+
+1. `{datacollectorhost}/v1/pm/ues`
+
+Returns list of user equipments from VES Events stored into database:
+
+```json
+{
+ "ues": [
+ "mobile_samsung_s10_01"
+ ]
+}
+```
+
+2. `{datacollectorhost}/v1/pm/events/aggregatedmetrics`
+
+To call the endpoint you have to define 3 query parameters:
+
+1.Slot - aggregation period (in seconds) for which an average performance metrics are calculated
+
+2.Count - number of aggregated performance metrics that should be returned by the method, one aggergated performance
+metric per each slot. The first performance metrics is average metrics for (startTime, startTime +slot)
+
+3.startTime - ISO 8601 time format as string (e.g., 2020-10-26T06:52:54.01+00:00) for which aggregated performance
+metrics are calculated with the pm ves data starting from startTime. "+" and "." signs must be properly encoded in url
+
+Example
+URL: `{{datacollectorhost}}/v1/pm/events/aggregatedmetrics?slot=10&count=12&startTime=2021-03-31T15%3A00%3A00.0Z`
+This request will return aggregated metrics from two minutes starting from 2021.03.31-15:30:00.0
+
+Aggregated Metrics response example:
+
+```json
+{
+ "pm": [
+ {
+ "cellId": "Cell1",
+ "performance": [
+ {
+ "latency": 50,
+ "throughput": 80
+ },
+ {
+ "latency": 50,
+ "throughput": 80
+ },
+ {
+ "latency": 50,
+ "throughput": 80
+ }
+ ]
+ }
+ ],
+ "itemsLength": 1
+}
+```
+
+# API
+
+The API is documented by the Swagger tool.
+
+## Swagger
+
+JSON file that can be imported to Swagger GUI can be found in *doc/swagger*. Those files are regenerated in each maven
+build, so to generate this file please see **Build DataCollector R-APP** chapter.
+
+# Developer Guide
+
+## Build DataCollector R-APP
+
+Following mvn command (in the current directory) will build DataCollector R-APP:
+
+```bash
+mvn clean install
+```
+
+## Run DataCollector R-APP
+
+Following command will run DataCollector R-APP:
+
+```bash
+java -jar datacollector-0.0.1-SNAPSHOT.jar org.onap.rapp.DataCollectorApplication
+```
+
+## Logging
+
+The log file will be created in the /log path. Parameters of logging are in application.yml file.
+After DataCollector R-APP starts successfully, log/rapp-datacollector.log should start to contain the logs:
+
+```
+.
+└──log
+ └── rapp-datacollector.log
+```
diff --git a/datacollector/doc/swagger/templates/markdown.hbs b/datacollector/doc/swagger/templates/markdown.hbs
new file mode 100644
index 0000000..546f673
--- /dev/null
+++ b/datacollector/doc/swagger/templates/markdown.hbs
@@ -0,0 +1,108 @@
+#{{#info}}{{title}}
+
+
+## {{join schemes " | "}}://{{host}}{{basePath}}
+
+
+{{description}}
+
+{{#contact}}
+[**Contact the developer**](mailto:{{email}})
+{{/contact}}
+
+**Version** {{version}}
+
+[**Terms of Service**]({{termsOfService}})
+
+{{#license}}[**{{name}}**]({{url}}){{/license}}
+
+{{/info}}
+
+{{#if consumes}}**Consumes:** {{join consumes ", "}}{{/if}}
+
+{{#if produces}}**Produces:** {{join produces ", "}}{{/if}}
+
+{{#if securityDefinitions}}
+# Security Definitions
+{{/if}}
+{{> security}}
+
+# APIs
+
+{{#each paths}}
+## {{@key}}
+{{#this}}
+{{#get}}
+### GET
+{{> operation}}
+{{/get}}
+
+{{#put}}
+### PUT
+{{> operation}}
+{{/put}}
+
+{{#post}}
+### POST
+
+{{> operation}}
+
+{{/post}}
+
+{{#delete}}
+### DELETE
+{{> operation}}
+{{/delete}}
+
+{{#option}}
+### OPTION
+{{> operation}}
+{{/option}}
+
+{{#patch}}
+### PATCH
+{{> operation}}
+{{/patch}}
+
+{{#head}}
+### HEAD
+{{> operation}}
+{{/head}}
+
+{{/this}}
+{{/each}}
+
+# Definitions
+{{#each definitions}}
+## <a name="/definitions/{{key}}">{{@key}}</a>
+
+<table border="1">
+ <tr>
+ <th>name</th>
+ <th>type</th>
+ <th>required</th>
+ <th>description</th>
+ <th>example</th>
+ </tr>
+ {{#each this.properties}}
+ <tr>
+ <td>{{@key}}</td>
+ <td>
+ {{#ifeq type "array"}}
+ {{#items.$ref}}
+ {{type}}[<a href="{{items.$ref}}">{{basename items.$ref}}</a>]
+ {{/items.$ref}}
+ {{^items.$ref}}{{type}}[{{items.type}}]{{/items.$ref}}
+ {{else}}
+ {{#$ref}}<a href="{{$ref}}">{{basename $ref}}</a>{{/$ref}}
+ {{^$ref}}{{type}}{{#format}} ({{format}}){{/format}}{{/$ref}}
+ {{/ifeq}}
+ </td>
+ <td>{{#required}}required{{/required}}{{^required}}optional{{/required}}</td>
+ <td>{{#description}}{{{description}}}{{/description}}{{^description}}-{{/description}}</td>
+ <td>{{example}}</td>
+ </tr>
+ {{/each}}
+</table>
+{{/each}}
+
diff --git a/datacollector/doc/swagger/templates/operation.hbs b/datacollector/doc/swagger/templates/operation.hbs
new file mode 100644
index 0000000..a581961
--- /dev/null
+++ b/datacollector/doc/swagger/templates/operation.hbs
@@ -0,0 +1,73 @@
+{{#deprecated}}-deprecated-{{/deprecated}}
+<a id="{{operationId}}">{{summary}}</a>
+
+{{description}}
+
+{{#if externalDocs.url}}{{externalDocs.description}}. [See external documents for more details]({{externalDocs.url}})
+{{/if}}
+
+{{#if security}}
+#### Security
+{{/if}}
+
+{{#security}}
+{{#each this}}
+* {{@key}}
+{{#this}} * {{this}}
+{{/this}}
+{{/each}}
+{{/security}}
+
+#### Request
+
+{{#if consumes}}
+**Content-Type: ** {{join consumes ", "}}{{/if}}
+
+##### Parameters
+{{#if parameters}}
+<table border="1">
+ <tr>
+ <th>Name</th>
+ <th>Located in</th>
+ <th>Required</th>
+ <th>Description</th>
+ <th>Default</th>
+ <th>Schema</th>
+ </tr>
+{{/if}}
+
+{{#parameters}}
+<tr>
+ <th>{{name}}</th>
+ <td>{{in}}</td>
+ <td>{{#if required}}yes{{else}}no{{/if}}</td>
+ <td>{{description}}{{#if pattern}} (**Pattern**: `{{pattern}}`){{/if}}</td>
+ <td> - </td>
+{{#ifeq in "body"}}
+ <td>
+ {{#ifeq schema.type "array"}}Array[<a href="{{schema.items.$ref}}">{{basename schema.items.$ref}}</a>]{{/ifeq}}
+ {{#schema.$ref}}<a href="{{schema.$ref}}">{{basename schema.$ref}}</a> {{/schema.$ref}}
+ </td>
+{{else}}
+ {{#ifeq type "array"}}
+ <td>Array[{{items.type}}] ({{collectionFormat}})</td>
+ {{else}}
+ <td>{{type}} {{#format}}({{format}}){{/format}}</td>
+ {{/ifeq}}
+{{/ifeq}}
+</tr>
+{{/parameters}}
+{{#if parameters}}
+</table>
+{{/if}}
+
+
+#### Response
+
+{{#if produces}}**Content-Type: ** {{join produces ", "}}{{/if}}
+
+
+| Status Code | Reason | Response Model |
+|-------------|-------------|----------------|
+{{#each responses}}| {{@key}} | {{description}} | {{#schema.$ref}}<a href="{{schema.$ref}}">{{basename schema.$ref}}</a>{{/schema.$ref}}{{#ifeq schema.type "array"}}Array[<a href="{{schema.items.$ref}}">{{basename schema.items.$ref}}</a>]{{/ifeq}}{{^schema}} - {{/schema}}|
+{{/each}}
diff --git a/datacollector/doc/swagger/templates/security.hbs b/datacollector/doc/swagger/templates/security.hbs
new file mode 100644
index 0000000..04f86e8
--- /dev/null
+++ b/datacollector/doc/swagger/templates/security.hbs
@@ -0,0 +1,88 @@
+{{#each securityDefinitions}}
+### {{@key}}
+{{#this}}
+{{#ifeq type "oauth2"}}
+<table>
+ <tr>
+ <th>type</th>
+ <th colspan="2">{{type}}</th>
+ </tr>
+{{#if description}}
+ <tr>
+ <th>description</th>
+ <th colspan="2">{{description}}</th>
+ </tr>
+{{/if}}
+{{#if authorizationUrl}}
+ <tr>
+ <th>authorizationUrl</th>
+ <th colspan="2">{{authorizationUrl}}</th>
+ </tr>
+{{/if}}
+{{#if flow}}
+ <tr>
+ <th>flow</th>
+ <th colspan="2">{{flow}}</th>
+ </tr>
+{{/if}}
+{{#if tokenUrl}}
+ <tr>
+ <th>tokenUrl</th>
+ <th colspan="2">{{tokenUrl}}</th>
+ </tr>
+{{/if}}
+{{#if scopes}}
+ <tr>
+ <td rowspan="3">scopes</td>
+{{#each scopes}}
+ <td>{{@key}}</td>
+ <td>{{this}}</td>
+ </tr>
+ <tr>
+{{/each}}
+ </tr>
+{{/if}}
+</table>
+{{/ifeq}}
+{{#ifeq type "apiKey"}}
+<table>
+ <tr>
+ <th>type</th>
+ <th colspan="2">{{type}}</th>
+ </tr>
+{{#if description}}
+ <tr>
+ <th>description</th>
+ <th colspan="2">{{description}}</th>
+ </tr>
+{{/if}}
+{{#if name}}
+ <tr>
+ <th>name</th>
+ <th colspan="2">{{name}}</th>
+ </tr>
+{{/if}}
+{{#if in}}
+ <tr>
+ <th>in</th>
+ <th colspan="2">{{in}}</th>
+ </tr>
+{{/if}}
+</table>
+{{/ifeq}}
+{{#ifeq type "basic"}}
+<table>
+ <tr>
+ <th>type</th>
+ <th colspan="2">{{type}}</th>
+ </tr>
+{{#if description}}
+ <tr>
+ <th>description</th>
+ <th colspan="2">{{description}}</th>
+ </tr>
+{{/if}}
+</table>
+{{/ifeq}}
+{{/this}}
+{{/each}} \ No newline at end of file
diff --git a/datacollector/doc/swagger/templates/strapdown.html.hbs b/datacollector/doc/swagger/templates/strapdown.html.hbs
new file mode 100644
index 0000000..ec02669
--- /dev/null
+++ b/datacollector/doc/swagger/templates/strapdown.html.hbs
@@ -0,0 +1,10 @@
+<!DOCTYPE html>
+<html>
+<title>API Document</title>
+
+<xmp theme="united" style="display:none;">
+{{>markdown}}
+</xmp>
+
+<script src="http://strapdownjs.com/v/0.2/strapdown.js"></script>
+</html> \ No newline at end of file
diff --git a/datacollector/docker/.maven-dockerinclude b/datacollector/docker/.maven-dockerinclude
new file mode 100644
index 0000000..fd6cecd
--- /dev/null
+++ b/datacollector/docker/.maven-dockerinclude
@@ -0,0 +1 @@
+target/*.jar
diff --git a/datacollector/docker/Dockerfile b/datacollector/docker/Dockerfile
new file mode 100644
index 0000000..251673d
--- /dev/null
+++ b/datacollector/docker/Dockerfile
@@ -0,0 +1,14 @@
+FROM nexus3.onap.org:10001/onap/integration-java11:8.0.0
+ARG VERSION=""
+
+USER root
+
+RUN apk add --no-cache mysql-client
+
+USER onap
+
+EXPOSE 8087
+
+COPY *-${VERSION}.jar /app/service.jar
+COPY init.sql start.sh /app/
+ENTRYPOINT ["/bin/sh", "/app/start.sh"]
diff --git a/datacollector/docker/init.sql b/datacollector/docker/init.sql
new file mode 100644
index 0000000..0022f8a
--- /dev/null
+++ b/datacollector/docker/init.sql
@@ -0,0 +1,61 @@
+create database if not exists ves;
+use ves;
+
+create table if not exists ves_measurement (
+ -- our id
+ id INTEGER NOT NULL AUTO_INCREMENT,
+ -- common header
+ event_type TEXT,
+ version TEXT,
+ source_id TEXT,
+ reporting_entity_name TEXT,
+ start_epoch_microsec BIGINT,
+ event_id TEXT,
+ last_epoch_microsec BIGINT,
+ priority TEXT,
+ sequence INT,
+ source_name TEXT,
+ domain TEXT,
+ event_name TEXT,
+ reporting_entity_id TEXT,
+ nfc_naming_code TEXT,
+ nf_naming_code TEXT,
+ time_zone_offset TEXT,
+ rawdata TEXT NOT NULL,
+ CONSTRAINT ves_measurement_pk PRIMARY KEY(id)
+
+);
+
+create table if not exists ves_measurement_fields (
+ event_id INTEGER NOT NULL,
+ measurement_interval LONG,
+ measurement_fields_version VARCHAR(32),
+ CONSTRAINT ves_measurement_fields_pk PRIMARY KEY (event_id),
+ CONSTRAINT ves_measurement_fields_fk1 FOREIGN KEY (event_id) REFERENCES ves_measurement(id) ON UPDATE CASCADE ON DELETE CASCADE
+);
+
+
+create table if not exists additional_measurement (
+ event_id INTEGER NOT NULL,
+ am_name VARCHAR(128) NOT NULL,
+ ves_measurement_fields_key INTEGER,
+ CONSTRAINT additional_measurement_pk PRIMARY KEY(event_id, am_name),
+ CONSTRAINT additional_measurement_fk1 FOREIGN KEY(event_id) REFERENCES ves_measurement(id) ON UPDATE CASCADE ON DELETE CASCADE
+);
+
+create table if not exists additional_measurement_value (
+ event INTEGER NOT NULL,
+ am_name VARCHAR(128) NOT NULL,
+ additional_measurement_key INTEGER,
+ ves_measurement_fields_key INTEGER,
+ am_key TEXT NOT NULL,
+ am_value TEXT,
+ CONSTRAINT additional_measurement_value_fk1 FOREIGN KEY(event, am_name) REFERENCES additional_measurement(event_id, am_name) ON UPDATE CASCADE ON DELETE CASCADE
+);
+
+-- to store raw payload i.e. without any parsing
+create table if not exists payload (
+ event_id INTEGER NOT NULL,
+ payload TEXT NOT NULL
+);
+
diff --git a/datacollector/docker/start.sh b/datacollector/docker/start.sh
new file mode 100644
index 0000000..94ae32e
--- /dev/null
+++ b/datacollector/docker/start.sh
@@ -0,0 +1,10 @@
+#!/usr/bin/env bash
+
+# Initialize needed databases and tables if not already exist
+while ! mysqladmin ping -u$DATABASE_USERNAME -p$DATABASE_PASSWORD -h$DATABASE_HOST --silent; do
+ echo "$DATABASE_HOST not up, wait"
+ sleep 1
+done
+mysql -u$DATABASE_USERNAME -p$DATABASE_PASSWORD -h$DATABASE_HOST < init.sql
+
+exec java $JAVA_SEC_OPTS $JAVA_OPTS -jar /app/service.jar
diff --git a/datacollector/pom.xml b/datacollector/pom.xml
new file mode 100644
index 0000000..141d212
--- /dev/null
+++ b/datacollector/pom.xml
@@ -0,0 +1,131 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright (C) 2021 Samsung Electronics
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License
+ -->
+
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>datacollector</artifactId>
+ <groupId>org.onap.rapp.datacollector</groupId>
+ <version>0.0.1-SNAPSHOT</version>
+ <packaging>jar</packaging>
+
+ <parent>
+ <groupId>org.onap.rapp</groupId>
+ <artifactId>rapp</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+
+ <properties>
+ <docker-image.name.prefix>org.onap.rapp.datacollector</docker-image.name.prefix>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-web</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.json</groupId>
+ <artifactId>json</artifactId>
+ <version>20190722</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>${gson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-data-jdbc</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-data-jpa</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.mariadb.jdbc</groupId>
+ <artifactId>mariadb-java-client</artifactId>
+ <version>${mariadb.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.springfox</groupId>
+ <artifactId>springfox-swagger2</artifactId>
+ <version>${swagger.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.springfox</groupId>
+ <artifactId>springfox-swagger-ui</artifactId>
+ <version>${swagger.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>javax.xml.bind</groupId>
+ <artifactId>jaxb-api</artifactId>
+ <version>${javax.xml.bind.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>com.github.kongchen</groupId>
+ <artifactId>swagger-maven-plugin</artifactId>
+ <version>${swagger.maven.plugin.version}</version>
+ <configuration>
+ <apiSources>
+ <apiSource>
+ <springmvc>true</springmvc>
+ <locations>org.onap.rapp.datacollector</locations>
+ <schemes>http</schemes>
+ <host>localhost:8087</host>
+ <basePath>/</basePath>
+ <info>
+ <title>DataCollector's R-APP REST APIs (Policy Enforcement PoC)</title>
+ <version>${project.version}</version>
+ <description>DataCollector's R-APP REST APIs provides access to PM and RAN elements data</description>
+ <license>
+ <name>(C) Copyright Samsung</name>
+ </license>
+ </info>
+ <swaggerDirectory>${basedir}/doc/swagger/</swaggerDirectory>
+ <swaggerFileName>rapp-datacollector-spec</swaggerFileName>
+ <templatePath>${basedir}/doc/swagger/templates/strapdown.html.hbs</templatePath>
+ <outputPath>${basedir}/doc/swagger/html/rapp-datacollector-spec.html</outputPath>
+ </apiSource>
+ </apiSources>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>generate</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
+
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/DataCollectorApplication.java b/datacollector/src/main/java/org/onap/rapp/datacollector/DataCollectorApplication.java
new file mode 100644
index 0000000..39f4f2d
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/DataCollectorApplication.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+
+package org.onap.rapp.datacollector;
+
+import org.onap.rapp.datacollector.service.configuration.DatabaseProperties;
+import org.onap.rapp.datacollector.service.configuration.DmaapProperties;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.transaction.annotation.EnableTransactionManagement;
+
+@SpringBootApplication
+@EnableScheduling
+@EnableConfigurationProperties({DmaapProperties.class, DatabaseProperties.class})
+@EnableTransactionManagement
+public class DataCollectorApplication {
+ public static void main(String[] args) {
+ SpringApplication.run(DataCollectorApplication.class);
+ }
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/controller/PMController.java b/datacollector/src/main/java/org/onap/rapp/datacollector/controller/PMController.java
new file mode 100644
index 0000000..e79ccb8
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/controller/PMController.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.rapp.datacollector.controller;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import java.time.Instant;
+import java.time.OffsetDateTime;
+import org.onap.rapp.datacollector.entity.pm.AggregatedPM;
+import org.onap.rapp.datacollector.service.PMService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpStatus;
+import org.springframework.stereotype.Controller;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.ResponseBody;
+import org.springframework.web.server.ResponseStatusException;
+
+@Controller("pmController")
+@Api(tags = {"RESTful APIs for DataCollector (current is PM DataCollector) R-APP mS"})
+public class PMController {
+
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ private final PMService pmService;
+
+ public PMController(PMService pmService) {
+ this.pmService = pmService;
+ }
+
+ @ApiOperation(value = "Get the latest aggregated pm ves events from database.",
+ notes = "Returns the latest aggregated pm ves events from database between "
+ + "startTime and now, together with the itemsLength "
+ + "(i.e., total items in the returned pm array, i.e., active cells count)",
+ httpMethod = "GET",
+ produces = "application/json",
+ response = AggregatedPM.class
+ )
+ @GetMapping(value = "/v1/pm/events/aggregatedmetrics")
+ public @ResponseBody
+ AggregatedPM retrievePMData(
+ @ApiParam(value = "aggregation period (in seconds) for which an average performance "
+ + "metrics are calculated", required = true) @RequestParam("slot") int slot,
+ @ApiParam(value = "number of aggregated performance metrics that should be returned by the method, "
+ + "one aggergated performance metric per each slot. The first performance metrics is avarage "
+ + "metrics for (startTime, startTime +slot)", required = true) @RequestParam("count") int count,
+ @ApiParam(value = "ISO 8601 time format as string (e.g., 2020-10-26T06:52:54.01+00:00) for which aggregated "
+ + "performance metrics are calculated with the pm ves data starting from startTime. "
+ + "\"+\" and \".\" signs must be properly encoded in url",
+ required = true) @RequestParam("startTime") String startTime) {
+ OffsetDateTime time = getOffsetDateTime(startTime);
+ logger.debug("Getting {} aggregated metrics for {} second slot, start time {}", count, slot, startTime);
+
+ return pmService.getAggregatedPMDataForTimeInterval(slot, count, time);
+ }
+
+ private OffsetDateTime getOffsetDateTime(String startTime) {
+ OffsetDateTime time = OffsetDateTime.parse(startTime);
+ if (time.toEpochSecond() > Instant.now().getEpochSecond()) {
+ throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "Start time can't be from future.");
+ }
+ return time;
+ }
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/controller/UEController.java b/datacollector/src/main/java/org/onap/rapp/datacollector/controller/UEController.java
new file mode 100644
index 0000000..343de0d
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/controller/UEController.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.rapp.datacollector.controller;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import java.util.Set;
+import org.onap.rapp.datacollector.entity.UEInfo;
+import org.onap.rapp.datacollector.service.UEHolder;
+import org.springframework.stereotype.Controller;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.ResponseBody;
+
+@Controller
+@Api(tags = {"RESTful APIs for DataCollector (current is PM DataCollector) R-APP mS"})
+public class UEController {
+
+ private final UEHolder holder;
+
+ public UEController(UEHolder holder) {
+ this.holder = holder;
+ }
+
+ @ApiOperation(value = "Get all user equipment from topology.",
+ notes = "Returns all user equipment from topology.",
+ httpMethod = "GET",
+ produces = "application/json",
+ response = UEInfo.class
+ )
+ @GetMapping(value = "/v1/pm/ues")
+ public @ResponseBody
+ UEInfo getUserEquipments() {
+ Set<String> ues = holder.getUes();
+ return new UEInfo(ues);
+ }
+
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/entity/DataAggregationInfo.java b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/DataAggregationInfo.java
new file mode 100644
index 0000000..8f16d9f
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/DataAggregationInfo.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.rapp.datacollector.entity;
+
+import lombok.Builder;
+import lombok.Getter;
+
+@Builder
+@Getter
+public class DataAggregationInfo {
+ int slot;
+ long startTime;
+ long endTime;
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/entity/UEInfo.java b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/UEInfo.java
new file mode 100644
index 0000000..0b2c48a
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/UEInfo.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.rapp.datacollector.entity;
+
+import java.util.Set;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+@Getter
+@Setter
+@NoArgsConstructor
+@AllArgsConstructor
+public class UEInfo {
+ Set<String> ues;
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/entity/pm/AggregatedPM.java b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/pm/AggregatedPM.java
new file mode 100644
index 0000000..840a6c6
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/pm/AggregatedPM.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.rapp.datacollector.entity.pm;
+
+import java.util.List;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+
+@Getter
+@NoArgsConstructor
+@AllArgsConstructor
+@Builder
+public class AggregatedPM {
+ List<PMData> pm;
+ int itemsLength;
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/entity/pm/PMData.java b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/pm/PMData.java
new file mode 100644
index 0000000..7edea87
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/pm/PMData.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.rapp.datacollector.entity.pm;
+
+import java.util.List;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+@Getter
+@Setter
+@NoArgsConstructor
+@AllArgsConstructor
+public class PMData {
+ String cellId;
+ List<PerformanceData> performance;
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/entity/pm/PerformanceData.java b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/pm/PerformanceData.java
new file mode 100644
index 0000000..c49f81f
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/pm/PerformanceData.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.rapp.datacollector.entity.pm;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+@Getter
+@Setter
+@NoArgsConstructor
+@AllArgsConstructor
+public class PerformanceData {
+ Integer latency;
+ Integer throughput;
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/AdditionalMeasurementValues.java b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/AdditionalMeasurementValues.java
new file mode 100644
index 0000000..2add40f
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/AdditionalMeasurementValues.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+
+package org.onap.rapp.datacollector.entity.ves;
+
+import lombok.Builder;
+import lombok.Data;
+import lombok.ToString;
+import org.springframework.data.relational.core.mapping.Column;
+import org.springframework.data.relational.core.mapping.Table;
+
+@Data
+@Builder
+@ToString
+@Table("additional_measurement_value")
+public class AdditionalMeasurementValues {
+ @Column("am_name")
+ public final String name;
+ @Column("am_key")
+ public final String parameterName;
+ @Column("am_value")
+ public final String parameterValue;
+
+ public AdditionalMeasurementValues(String name, String parameterName, String parameterValue) {
+ this.name = name;
+ this.parameterName = parameterName;
+ this.parameterValue = parameterValue;
+ }
+
+ public static AdditionalMeasurementValues of(String name, String parameterName, String parameterValue) {
+ return AdditionalMeasurementValues.builder()
+ .name(name)
+ .parameterName(parameterName)
+ .parameterValue(parameterValue)
+ .build();
+ }
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/AdditionalMeasurements.java b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/AdditionalMeasurements.java
new file mode 100644
index 0000000..10d49ff
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/AdditionalMeasurements.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+
+package org.onap.rapp.datacollector.entity.ves;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import lombok.Data;
+import lombok.ToString;
+import org.springframework.data.relational.core.mapping.Column;
+import org.springframework.data.relational.core.mapping.Table;
+
+@Data
+@ToString
+@Table("additional_measurement")
+public class AdditionalMeasurements implements Serializable {
+ final Long eventId;
+ @Column("am_name")
+ public final String name;
+ public final List<AdditionalMeasurementValues> values;
+
+ private AdditionalMeasurements(Long eventId, String name, Map<String, String> hashMap) {
+ this.eventId = eventId;
+ this.name = name;
+ this.values = hashMap.keySet()
+ .stream()
+ .map(key -> AdditionalMeasurementValues.of(this.name, key, hashMap.getOrDefault(key, ""))
+ ).collect(Collectors.toUnmodifiableList());
+ }
+
+ @JsonCreator
+ public static AdditionalMeasurements of(@JsonProperty("name") String name, @JsonProperty("hashMap") Map<String, String> hashMap) {
+ return new AdditionalMeasurements(null, name, hashMap);
+ }
+
+ public static AdditionalMeasurements of(Long eventId, String name, Map<String, String> hashMap) {
+ return new AdditionalMeasurements(eventId, name, hashMap);
+ }
+
+ public static AdditionalMeasurementsBuilder builder() {
+ return new AdditionalMeasurementsBuilder();
+ }
+
+ public static class AdditionalMeasurementsBuilder {
+ private long eventId;
+ private String name;
+ private Map<String, String> hashMap;
+
+ public AdditionalMeasurementsBuilder withEventId(long id) {
+ this.eventId = id;
+ return this;
+ }
+
+ public AdditionalMeasurementsBuilder withName(String name) {
+ this.name = name;
+ return this;
+ }
+
+ public AdditionalMeasurementsBuilder withHashMap(Map<String, String> hashMap) {
+ this.hashMap = Collections.unmodifiableMap(hashMap);
+ return this;
+ }
+
+ public AdditionalMeasurements build() {
+ return new AdditionalMeasurements(eventId, name, hashMap);
+ }
+ }
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/CommonEventHeader.java b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/CommonEventHeader.java
new file mode 100644
index 0000000..e9d3f75
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/CommonEventHeader.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+
+package org.onap.rapp.datacollector.entity.ves;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.springframework.data.annotation.Transient;
+
+@Data
+@EqualsAndHashCode
+@ToString
+@Builder
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class CommonEventHeader {
+ public final String eventType;
+ public final String version;
+ public final String sourceId;
+ public final String reportingEntityName;
+ public final Long startEpochMicrosec;
+ public final String eventId;
+ public final Long lastEpochMicrosec;
+ public final String priority;
+ public final Integer sequence;
+ public final String sourceName;
+ public final String domain;
+ public final String eventName;
+ public final String reportingEntityId;
+ public final String nfcNamingCode;
+ public final String nfNamingCode;
+ @Transient
+ public final String timeZoneOffset;
+
+ protected CommonEventHeader(String eventType, String version, String sourceId, String reportingEntityName, Long startEpochMicrosec, String eventId, Long lastEpochMicrosec, String priority, Integer sequence, String sourceName, String domain, String eventName, String reportingEntityId, String nfcNamingCode, String nfNamingCode, String timeZone) {
+ this.eventType = eventType;
+ this.version = version;
+ this.sourceId = sourceId;
+ this.reportingEntityName = reportingEntityName;
+ this.startEpochMicrosec = startEpochMicrosec;
+ this.eventId = eventId;
+ this.lastEpochMicrosec = lastEpochMicrosec;
+ this.priority = priority;
+ this.sequence = sequence;
+ this.sourceName = sourceName;
+ this.domain = domain;
+ this.eventName = eventName;
+ this.reportingEntityId = reportingEntityId;
+ this.nfcNamingCode = nfcNamingCode;
+ this.nfNamingCode = nfNamingCode;
+ this.timeZoneOffset = timeZone;
+ }
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/Event.java b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/Event.java
new file mode 100644
index 0000000..2d00636
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/Event.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+
+package org.onap.rapp.datacollector.entity.ves;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import lombok.Getter;
+import lombok.ToString;
+import org.springframework.data.annotation.Id;
+import org.springframework.data.relational.core.mapping.Column;
+import org.springframework.data.relational.core.mapping.Embedded;
+import org.springframework.data.relational.core.mapping.Table;
+
+@JsonTypeName("event")
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@Table("ves_measurement")
+@ToString
+@Getter
+public class Event {
+ @Id
+ Long id;
+
+ @Column("rawdata")
+ public volatile String raw;
+
+ @Embedded(onEmpty = Embedded.OnEmpty.USE_NULL)
+ public final CommonEventHeader commonEventHeader;
+
+ @Column("event_id")
+ public final MeasurementFields measurementFields;
+
+ protected Event(final Long id, CommonEventHeader header, MeasurementFields fields, String raw) {
+ this.id = id;
+ this.commonEventHeader = header;
+ this.measurementFields = fields;
+ this.raw = raw;
+ }
+
+ public static Event of(CommonEventHeader header, MeasurementFields fields) {
+ return new Event(null, header, fields, "");
+ }
+
+ public static Event of(final Long id, CommonEventHeader header, MeasurementFields fields) {
+ return new Event(id, header, fields, "");
+ }
+
+ public static Event of(final Long id, CommonEventHeader header, MeasurementFields fields, String raw) {
+ return new Event(id, header, fields, raw);
+ }
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/EventAPI.java b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/EventAPI.java
new file mode 100644
index 0000000..95e9bfb
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/EventAPI.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+
+package org.onap.rapp.datacollector.entity.ves;
+
+import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.Table;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+@NoArgsConstructor
+@AllArgsConstructor
+@Getter
+@Setter
+@EqualsAndHashCode
+@Builder
+@Entity
+@Table(name = "ves_measurement")
+public class EventAPI {
+
+ @Id
+ @GeneratedValue(strategy = GenerationType.AUTO)
+ private long id;
+
+ private String rawdata;
+
+ private Long lastEpochMicrosec;
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/MeasurementFields.java b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/MeasurementFields.java
new file mode 100644
index 0000000..802aace
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/MeasurementFields.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.rapp.datacollector.entity.ves;
+
+import java.util.Collections;
+import java.util.List;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.springframework.data.relational.core.mapping.Column;
+import org.springframework.data.relational.core.mapping.Table;
+
+@Data
+@ToString
+@EqualsAndHashCode
+@Builder
+@Table("ves_measurement_fields")
+public class MeasurementFields {
+ public static final MeasurementFields EMPTY = new MeasurementFields(-1L, -1L, Collections.emptyList());
+ public final Long eventId;
+ public final long measurementInterval;
+ public final String measurementFieldsVersion = "4.0";
+
+ @Column("event_id")
+ public final List<AdditionalMeasurements> additionalMeasurements;
+
+ private MeasurementFields(Long eventId, long measurementInterval, List<AdditionalMeasurements> additionalMeasurements) {
+ this.eventId = eventId;
+ this.measurementInterval = measurementInterval;
+ this.additionalMeasurements = Collections.unmodifiableList(additionalMeasurements);
+ }
+
+ public static MeasurementFields of(Long eventId) {
+ return new MeasurementFields(eventId, -1, Collections.emptyList());
+ }
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/RawPayload.java b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/RawPayload.java
new file mode 100644
index 0000000..0a7fe03
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/RawPayload.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.rapp.datacollector.entity.ves;
+
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.springframework.data.annotation.Id;
+import org.springframework.data.relational.core.mapping.Table;
+
+@ToString
+@EqualsAndHashCode
+@Table("payload")
+public class RawPayload {
+ @Id
+ public final Long eventId;
+ public final String payload;
+
+ private RawPayload(Long eventId, String payload) {
+ this.eventId = eventId;
+ this.payload = payload;
+ }
+
+ public static class RawPayloadBuilder {
+ @Id
+ private Long eventId;
+ private String payload;
+
+ public RawPayloadBuilder withEvent(Long event) {
+ this.eventId = event;
+ return this;
+ }
+
+ public RawPayloadBuilder withPayload(String payload) {
+ this.payload = payload;
+ return this;
+ }
+
+ public RawPayload build() {
+ return new RawPayload(eventId, payload);
+ }
+ }
+
+ public static RawPayloadBuilder builder() {
+ return new RawPayloadBuilder();
+ }
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/DataAggregationService.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/DataAggregationService.java
new file mode 100644
index 0000000..aebdd4c
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/DataAggregationService.java
@@ -0,0 +1,123 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.rapp.datacollector.service;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.OptionalDouble;
+import java.util.stream.Collectors;
+import org.onap.rapp.datacollector.entity.DataAggregationInfo;
+import org.onap.rapp.datacollector.entity.pm.PMData;
+import org.onap.rapp.datacollector.entity.pm.PerformanceData;
+import org.onap.rapp.datacollector.entity.ves.AdditionalMeasurements;
+import org.onap.rapp.datacollector.entity.ves.Event;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+@Service
+public class DataAggregationService {
+
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ private static final String LATENCY_FIELD_NAME = "latency";
+ private static final String THROUGHPUT_FIELD_NAME = "throughput";
+ public static final int MEASUREMENT_INDEX = 0;
+
+ public PMData getAggregatedDataFromEventsForCell(String cellId, List<Event> events, DataAggregationInfo dataAggregationInfo) {
+ logger.info("Cell {}, events size {}", cellId, events.size());
+ Collection<List<Event>> eventsByTime = groupEventsTimeSlots(events, dataAggregationInfo);
+
+ List<PerformanceData> pmDataList = new ArrayList<>();
+ eventsByTime.forEach(slotOfEvents -> {
+ List<Integer> latencyList = getPerformanceData(slotOfEvents, LATENCY_FIELD_NAME);
+ List<Integer> throughputList = getPerformanceData(slotOfEvents, THROUGHPUT_FIELD_NAME);
+
+ Integer latencyAggregatedData = getAverage(latencyList);
+ Integer throughputAggregatedData = getAverage(throughputList);
+
+ PerformanceData pm = new PerformanceData(latencyAggregatedData, throughputAggregatedData);
+ pmDataList.add(pm);
+ });
+
+ return new PMData(cellId, pmDataList);
+ }
+
+ private Collection<List<Event>> groupEventsTimeSlots(List<Event> events, DataAggregationInfo aggregationInfo) {
+ long slotStartTime = aggregationInfo.getStartTime();
+ long slotEndTime = slotStartTime + aggregationInfo.getSlot();
+
+ List<List<Event>> eventsByTime = new ArrayList<>();
+ List<Event> eventsOfSlot = new ArrayList<>();
+
+ for (Event event : events) {
+ if (isInNextSlot(slotEndTime, event)) {
+ eventsByTime.add(eventsOfSlot);
+ eventsOfSlot = new ArrayList<>();
+
+ slotStartTime = slotEndTime;
+ slotEndTime = slotStartTime + aggregationInfo.getSlot();
+
+ while (isInNextSlot(slotEndTime, event)){
+ eventsByTime.add(Collections.emptyList());
+ slotStartTime = slotEndTime;
+ slotEndTime = slotStartTime + aggregationInfo.getSlot();
+ }
+ }
+ eventsOfSlot.add(event);
+ }
+
+ eventsByTime.add(eventsOfSlot);
+ fillEmptyEndIfNeeded(aggregationInfo, slotEndTime, eventsByTime);
+ return eventsByTime;
+ }
+
+ private boolean isInNextSlot(long slotEndTime, Event event) {
+ return event.getCommonEventHeader().getLastEpochMicrosec() > slotEndTime;
+ }
+
+ private void fillEmptyEndIfNeeded(DataAggregationInfo aggregationInfo, long slotEndTime, List<List<Event>> eventsByTime) {
+ while (slotEndTime < aggregationInfo.getEndTime()){
+ eventsByTime.add(Collections.emptyList());
+ slotEndTime = slotEndTime + aggregationInfo.getSlot();
+ }
+ }
+
+ private List<Integer> getPerformanceData(List<Event> events, String measurement) {
+ return events.stream().map(e -> getPerformanceDataFromEvent(e, measurement))
+ .collect(Collectors.toList());
+ }
+
+ private int getPerformanceDataFromEvent(Event event, String name) {
+ AdditionalMeasurements performance = event.getMeasurementFields().getAdditionalMeasurements()
+ .stream().filter(am -> am.getName().equals(name)).findAny().orElseThrow();
+ return Integer.parseInt(performance.getValues().get(MEASUREMENT_INDEX).getParameterValue());
+ }
+
+ private Integer getAverage(List<Integer> values) {
+ if (values.isEmpty()) {
+ return null;
+ }
+ OptionalDouble average = values.stream().mapToDouble(l -> l).average();
+
+ if (average.isPresent()) {
+ return (int) average.getAsDouble();
+ } else {
+ return null;
+ }
+ }
+
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/DmaapRestReader.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/DmaapRestReader.java
new file mode 100644
index 0000000..a6c0fa4
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/DmaapRestReader.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+
+package org.onap.rapp.datacollector.service;
+
+import java.util.Collection;
+
+interface DmaapRestReader {
+ Collection<String> retrieveEvents();
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/MariadbDialectResolver.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/MariadbDialectResolver.java
new file mode 100644
index 0000000..8259ac7
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/MariadbDialectResolver.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+
+package org.onap.rapp.datacollector.service;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.SQLException;
+import java.util.Locale;
+import java.util.Optional;
+import org.springframework.data.jdbc.repository.config.DialectResolver;
+import org.springframework.data.relational.core.dialect.Dialect;
+import org.springframework.data.relational.core.dialect.MySqlDialect;
+import org.springframework.jdbc.core.ConnectionCallback;
+import org.springframework.jdbc.core.JdbcOperations;
+
+public class MariadbDialectResolver implements DialectResolver.JdbcDialectProvider {
+
+ @Override
+ public Optional<Dialect> getDialect(JdbcOperations operations) {
+ return Optional.ofNullable(
+ operations.execute((ConnectionCallback<Dialect>) MariadbDialectResolver::getDialect));
+ }
+
+ private static Dialect getDialect(Connection connection) throws SQLException {
+ DatabaseMetaData metaData = connection.getMetaData();
+ String name = metaData.getDatabaseProductName().toLowerCase(Locale.ROOT);
+ if (name.contains("mariadb")) {
+ return MySqlDialect.INSTANCE;
+ }
+ return null;
+ }
+}
+
+
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/PMService.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/PMService.java
new file mode 100644
index 0000000..7039d2e
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/PMService.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.rapp.datacollector.service;
+
+import java.time.OffsetDateTime;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.onap.rapp.datacollector.entity.DataAggregationInfo;
+import org.onap.rapp.datacollector.entity.pm.AggregatedPM;
+import org.onap.rapp.datacollector.entity.pm.PMData;
+import org.onap.rapp.datacollector.entity.ves.AdditionalMeasurements;
+import org.onap.rapp.datacollector.entity.ves.Event;
+import org.onap.rapp.datacollector.entity.ves.EventAPI;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+@Service("pmService")
+public class PMService {
+
+ private static final Logger logger = LoggerFactory.getLogger(PMService.class);
+
+ public static final String CELL_FIELD_NAME = "identifier";
+ public static final int CELL_INDEX = 0;
+ private static final int MICRO_SECONDS_OF_SECOND = 1_000_000;
+
+ private final VesPersisterSqlImpl vesPersisterSql;
+ private final DataAggregationService aggregationService;
+ private final VesParser parser;
+
+ public PMService(VesPersisterSqlImpl vesPersisterSql, DataAggregationService aggregationService, VesParser parser) {
+ this.vesPersisterSql = vesPersisterSql;
+ this.aggregationService = aggregationService;
+ this.parser = parser;
+ }
+
+ public AggregatedPM getAggregatedPMDataForTimeInterval(int slot, int count, OffsetDateTime startTime) {
+ DataAggregationInfo aggregationInfo = buildDataAggregationInfo(slot, count, startTime);
+ logger.info("Start Time: {}, EndTime: {}", aggregationInfo.getStartTime(), aggregationInfo.getEndTime());
+ List<EventAPI> eventsOfInterval = vesPersisterSql.findEventsByTimeWindow(aggregationInfo.getStartTime(), aggregationInfo.getEndTime());
+ Map<String, List<Event>> eventsByCell = groupByCell(eventsOfInterval);
+ List<PMData> pmDataList = calculateAggregatedData(aggregationInfo, eventsByCell);
+ return new AggregatedPM(pmDataList, pmDataList.size());
+ }
+
+ private DataAggregationInfo buildDataAggregationInfo(int slot, int count, OffsetDateTime startTime) {
+ long timeIntervalStartTime = startTime.toEpochSecond() * MICRO_SECONDS_OF_SECOND;
+ long timeIntervalEndTime = getTimeIntervalEndTime(slot, count, timeIntervalStartTime);
+ return DataAggregationInfo.builder()
+ .startTime(timeIntervalStartTime)
+ .endTime(timeIntervalEndTime)
+ .slot(slot * MICRO_SECONDS_OF_SECOND)
+ .build();
+ }
+
+ private long getTimeIntervalEndTime(int slot, int count, long startDate) {
+ int timeIntervalMicrosec = slot * count * MICRO_SECONDS_OF_SECOND;
+ return startDate + timeIntervalMicrosec;
+ }
+
+ private Map<String, List<Event>> groupByCell(List<EventAPI> events) {
+ return events.stream().map(e -> parser.parse(e.getRawdata()))
+ .collect(Collectors.groupingBy(this::getCellFromVes));
+ }
+
+ private String getCellFromVes(Event event) {
+ AdditionalMeasurements cellField = event.getMeasurementFields().getAdditionalMeasurements()
+ .stream().filter(am -> am.getName().equals(CELL_FIELD_NAME)).findAny().orElseThrow();
+ return cellField.getValues().get(CELL_INDEX).getParameterValue();
+ }
+
+ private List<PMData> calculateAggregatedData(DataAggregationInfo dataAggregationInfo, Map<String, List<Event>> events) {
+ return events.entrySet().stream()
+ .map(e -> aggregationService.getAggregatedDataFromEventsForCell(e.getKey(), e.getValue(), dataAggregationInfo))
+ .collect(Collectors.toList());
+ }
+
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/SqlRepository.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/SqlRepository.java
new file mode 100644
index 0000000..7d7f828
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/SqlRepository.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.rapp.datacollector.service;
+
+import org.springframework.data.repository.CrudRepository;
+import org.springframework.stereotype.Repository;
+import org.onap.rapp.datacollector.entity.ves.Event;
+import org.springframework.transaction.annotation.Transactional;
+
+@Repository("repository")
+@Transactional
+public interface SqlRepository extends CrudRepository<Event, String> {
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/SqlRepositoryAPI.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/SqlRepositoryAPI.java
new file mode 100644
index 0000000..aeb0ec0
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/SqlRepositoryAPI.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.rapp.datacollector.service;
+
+import java.util.List;
+import org.onap.rapp.datacollector.entity.ves.EventAPI;
+import org.springframework.data.jpa.repository.Query;
+import org.springframework.data.repository.CrudRepository;
+import org.springframework.data.repository.query.Param;
+import org.springframework.stereotype.Repository;
+import org.springframework.transaction.annotation.Transactional;
+
+@Repository("repositoryAPI")
+@Transactional
+public interface SqlRepositoryAPI extends CrudRepository<EventAPI, Long> {
+ @Query(value = "SELECT * FROM ves_measurement order by id desc limit :limit", nativeQuery = true)
+ List<EventAPI> findTopNVesEvent(@Param("limit") int limit);
+
+ List<EventAPI> findByLastEpochMicrosecBetweenOrderByLastEpochMicrosecAsc(Long startTime, Long endTime);
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/UEHolder.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/UEHolder.java
new file mode 100644
index 0000000..a9451ef
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/UEHolder.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.rapp.datacollector.service;
+
+import org.springframework.stereotype.Service;
+import java.util.HashSet;
+import java.util.Set;
+
+@Service
+public class UEHolder {
+
+ private Set<String> ues = new HashSet<>();
+
+ public void addUE(String ue) {
+ ues.add(ue);
+ }
+
+ public Set<String> getUes(){
+ return ues;
+ }
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesParser.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesParser.java
new file mode 100644
index 0000000..9a453b4
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesParser.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+
+package org.onap.rapp.datacollector.service;
+
+import org.onap.rapp.datacollector.entity.ves.Event;
+
+public interface VesParser {
+ Event parse(final String event);
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesParserImpl.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesParserImpl.java
new file mode 100644
index 0000000..cf90bfd
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesParserImpl.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+
+package org.onap.rapp.datacollector.service;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.onap.rapp.datacollector.entity.ves.AdditionalMeasurements;
+import org.onap.rapp.datacollector.entity.ves.CommonEventHeader;
+import org.onap.rapp.datacollector.entity.ves.Event;
+import org.onap.rapp.datacollector.entity.ves.MeasurementFields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+@Service
+public class VesParserImpl implements VesParser {
+ private static final Logger logger = LoggerFactory.getLogger(VesParserImpl.class);
+
+ private static class VesEventDeserializer implements JsonDeserializer<Event> {
+ private static class AdditionalMeasurementsRawValue {
+ String name;
+ Map<String, String> hashMap;
+ }
+
+ @Override
+ public Event deserialize(JsonElement jsonElement, Type type, JsonDeserializationContext jsonDeserializationContext) throws JsonParseException {
+ JsonObject obj = jsonElement.getAsJsonObject();
+ obj = obj.getAsJsonObject("event");
+ CommonEventHeader header;
+ Optional<MeasurementFields> measurementFields = Optional.empty();
+ List<AdditionalMeasurements> additionalMeasurements = Collections.emptyList();
+ if (obj.has("commonEventHeader")) {
+ JsonObject h = obj.getAsJsonObject("commonEventHeader");
+ header = jsonDeserializationContext.deserialize(h, CommonEventHeader.class);
+ } else {
+ throw new JsonParseException("Common header not found");
+ }
+ if (obj.has("measurementFields")) {
+ JsonObject h = obj.getAsJsonObject("measurementFields");
+ measurementFields = Optional.ofNullable(jsonDeserializationContext.deserialize(h, MeasurementFields.class));
+ if (h.has("additionalMeasurements")) {
+ JsonArray arr = h.getAsJsonArray("additionalMeasurements");
+ additionalMeasurements = new ArrayList<>();
+ for (int i = 0; i < arr.size(); i++) {
+ AdditionalMeasurementsRawValue tmp = jsonDeserializationContext.deserialize(arr.get(i).getAsJsonObject(), AdditionalMeasurementsRawValue.class);
+ additionalMeasurements.add(AdditionalMeasurements.builder()
+ .withName(tmp.name)
+ .withHashMap(tmp.hashMap)
+ .build());
+ }
+ }
+ }
+ logger.trace("measurement fields {}", measurementFields);
+ logger.trace("additional measurements {}", additionalMeasurements);
+ measurementFields = Optional.of(MeasurementFields.builder()
+ .measurementInterval(measurementFields.orElse(MeasurementFields.EMPTY).measurementInterval)
+ .additionalMeasurements(additionalMeasurements)
+ .build());
+
+ return Event.of(header, measurementFields.get());
+ }
+ }
+
+ private final Gson gson = new GsonBuilder()
+ .registerTypeAdapter(Event.class, new VesEventDeserializer())
+ .create();
+
+ public Event parse(final String event) {
+ logger.debug("parsing ves event {}", event);
+ final Event result = gson.fromJson(event, Event.class);
+ result.raw = event;
+ return result;
+ }
+
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesPersister.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesPersister.java
new file mode 100644
index 0000000..391f762
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesPersister.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.rapp.datacollector.service;
+
+import java.util.List;
+import java.util.Optional;
+import org.onap.rapp.datacollector.entity.ves.Event;
+import org.onap.rapp.datacollector.entity.ves.EventAPI;
+
+public interface VesPersister {
+ void persists(Event event);
+
+ List<EventAPI> findTopNVesEvent(int n);
+
+ List<EventAPI> findAll();
+
+ Optional<EventAPI> findById(Long id);
+
+ void create(Event event);
+
+ void update(Event event, Long id);
+
+ List<EventAPI> findEventsByTimeWindow(long startTime, long endTime);
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesPersisterSqlImpl.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesPersisterSqlImpl.java
new file mode 100644
index 0000000..c30ff41
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesPersisterSqlImpl.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+
+package org.onap.rapp.datacollector.service;
+
+import java.util.List;
+import java.util.Optional;
+import org.onap.rapp.datacollector.entity.ves.Event;
+import org.onap.rapp.datacollector.entity.ves.EventAPI;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+@Service("vesPersisterSqlImpl")
+@Transactional
+public class VesPersisterSqlImpl implements VesPersister {
+ private static final Logger logger = LoggerFactory.getLogger(VesPersisterSqlImpl.class);
+
+ private final SqlRepository repository;
+ private final SqlRepositoryAPI repositoryAPI;
+
+ @Autowired
+ public VesPersisterSqlImpl(SqlRepository repository, SqlRepositoryAPI repositoryAPI) {
+ this.repository = repository;
+ this.repositoryAPI = repositoryAPI;
+ }
+
+ @Override
+ public void persists(Event event) {
+ logger.debug("persisting event {}", event);
+ repository.save(event);
+ }
+
+ @Override
+ public List<EventAPI> findTopNVesEvent(int n) {
+ logger.debug("finding top {} events", n);
+ return repositoryAPI.findTopNVesEvent(n);
+ }
+
+ @Override
+ public List<EventAPI> findAll() {
+ logger.debug("finding all event");
+ return (List<EventAPI>)repositoryAPI.findAll();
+ }
+
+ @Override
+ public Optional<EventAPI> findById(Long id) {
+ logger.debug("finding event by id {}", id);
+ return repositoryAPI.findById(id);
+ }
+
+ @Override
+ public void create(Event event) {
+ logger.debug("creating event {}", event);
+ repository.save(event);
+ }
+
+ @Override
+ public void update(Event event, Long id) {
+ if (!repository.existsById(String.valueOf(id))) {
+ throw new RuntimeException("Event not found");
+ }
+ logger.debug("updating event {} by id {}", event, id);
+ repository.save(event);
+ }
+
+ @Override
+ public List<EventAPI> findEventsByTimeWindow(long startTime, long endTime) {
+ logger.debug("finding top {} events", startTime);
+ return repositoryAPI.findByLastEpochMicrosecBetweenOrderByLastEpochMicrosecAsc(startTime, endTime);
+ }
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesRetrievalService.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesRetrievalService.java
new file mode 100644
index 0000000..517bb8b
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesRetrievalService.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.rapp.datacollector.service;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.onap.rapp.datacollector.entity.ves.AdditionalMeasurementValues;
+import org.onap.rapp.datacollector.entity.ves.AdditionalMeasurements;
+import org.onap.rapp.datacollector.entity.ves.Event;
+import org.onap.rapp.datacollector.service.configuration.DmaapRestReaderConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.ResponseEntity;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Service;
+import org.springframework.web.client.RestClientException;
+import org.springframework.web.client.RestTemplate;
+
+@Service
+public class VesRetrievalService implements DmaapRestReader {
+
+ private static final Logger logger = LoggerFactory.getLogger(VesRetrievalService.class);
+ public static final String UE_FIELD_NAME = "trafficModel";
+
+ private final RestTemplate restTemplate;
+ private final DmaapRestReaderConfiguration config;
+ private final VesParser parser;
+ private final VesPersister persister;
+ private final UEHolder ueHolder;
+
+ @Autowired
+ public VesRetrievalService(RestTemplate restTemplate, VesParser parser, VesPersister persister,
+ DmaapRestReaderConfiguration configuration, UEHolder ueHolder) {
+ this.restTemplate = restTemplate;
+ this.parser = parser;
+ this.persister = persister;
+ this.config = configuration;
+ this.ueHolder = ueHolder;
+ }
+
+ @Override
+ public Collection<String> retrieveEvents() {
+ logger.info("Reaching from dmaap: {}", config.getMeasurementsTopicUrl());
+ try {
+ ResponseEntity<String[]> responseEntity =
+ restTemplate.getForEntity(config.getMeasurementsTopicUrl(), String[].class);
+ if (responseEntity.hasBody()) {
+ String[] events = responseEntity.getBody();
+ return Arrays.stream(events).collect(Collectors.toList());
+ }
+ } catch (RestClientException ex) {
+ logger.error("Failed to reach to dmaap", ex);
+ }
+ return Collections.emptyList();
+ }
+
+ @Scheduled(fixedRate = 5000)
+ public void retrieveAndStoreVesEvents() {
+ retrieveEvents().stream().map(parser::parse).forEach(this::saveEvent);
+ }
+
+ private void saveEvent(Event event) {
+ persister.persists(event);
+ saveUesOfVes(event);
+ }
+
+ private void saveUesOfVes(Event event){
+ Set<String> uesOfVes = getUserEquipmentData(event);
+ uesOfVes.forEach(ueHolder::addUE);
+ }
+
+ private Set<String> getUserEquipmentData(Event event) {
+ Optional<AdditionalMeasurements> ues = event.getMeasurementFields().getAdditionalMeasurements()
+ .stream().filter(am -> am.getName().equals(UE_FIELD_NAME)).findAny();
+ return ues.map(additionalMeasurements -> additionalMeasurements.getValues().stream()
+ .map(AdditionalMeasurementValues::getParameterName)
+ .collect(Collectors.toSet())).orElse(Collections.emptySet());
+ }
+
+}
+
+
+
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DatabaseProperties.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DatabaseProperties.java
new file mode 100644
index 0000000..517666d
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DatabaseProperties.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.rapp.datacollector.service.configuration;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+@Getter
+@Setter
+@ConfigurationProperties(prefix = "database")
+public class DatabaseProperties {
+ private String driverClassName;
+ private String host;
+ private String port;
+ private String name;
+ private String username;
+ private String password;
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapProperties.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapProperties.java
new file mode 100644
index 0000000..adc3695
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapProperties.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.rapp.datacollector.service.configuration;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+@Getter
+@Setter
+@ConfigurationProperties(prefix = "dmaap")
+public class DmaapProperties {
+ private String protocol;
+ private String host;
+ private int port;
+ private String measurementsTopic;
+
+ public String getMeasurementsTopicUrl() {
+ return String.format("%s://%s:%d/%s", protocol, host, port, measurementsTopic);
+ }
+
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapRestReaderConfiguration.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapRestReaderConfiguration.java
new file mode 100644
index 0000000..36dee70
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapRestReaderConfiguration.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+
+package org.onap.rapp.datacollector.service.configuration;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+import javax.sql.DataSource;
+import org.apache.http.client.HttpClient;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.conn.ssl.TrustAllStrategy;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
+import org.springframework.jdbc.datasource.DataSourceTransactionManager;
+import org.springframework.jdbc.datasource.DriverManagerDataSource;
+import org.springframework.transaction.PlatformTransactionManager;
+import org.springframework.web.client.RestTemplate;
+
+@Configuration
+public class DmaapRestReaderConfiguration {
+
+ private final static class TrustAllSSLSocketFactory extends SSLSocketFactory {
+ SSLContext sslContext = SSLContext.getInstance("TLS");
+
+ public TrustAllSSLSocketFactory() throws NoSuchAlgorithmException, KeyManagementException {
+ TrustManager tm = new X509TrustManager() {
+ public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
+ }
+
+ public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
+ }
+
+ public X509Certificate[] getAcceptedIssuers() {
+ return null;
+ }
+ };
+
+ sslContext.init(null, new TrustManager[] { tm }, null);
+ }
+
+ @Override
+ public Socket createSocket(Socket socket, String host, int port, boolean autoClose) throws IOException, UnknownHostException {
+ return sslContext.getSocketFactory().createSocket(socket, host, port, autoClose);
+ }
+
+ @Override
+ public Socket createSocket() throws IOException {
+ return sslContext.getSocketFactory().createSocket();
+ }
+
+ @Override
+ public Socket createSocket(String s, int i) throws IOException, UnknownHostException {
+ return sslContext.getSocketFactory().createSocket(s, i);
+ }
+
+ @Override
+ public Socket createSocket(String s, int i, InetAddress inetAddress, int i1) throws IOException, UnknownHostException {
+ return sslContext.getSocketFactory().createSocket(s, i, inetAddress, i1);
+ }
+
+ @Override
+ public Socket createSocket(InetAddress inetAddress, int i) throws IOException {
+ return sslContext.getSocketFactory().createSocket(inetAddress, i);
+ }
+
+ @Override
+ public Socket createSocket(InetAddress inetAddress, int i, InetAddress inetAddress1, int i1) throws IOException {
+ return sslContext.getSocketFactory().createSocket(inetAddress, i , inetAddress1, i1);
+ }
+
+ @Override
+ public String[] getDefaultCipherSuites() {
+ return new String[] {"ALL"};
+ }
+
+ @Override
+ public String[] getSupportedCipherSuites() {
+ return new String[] {"ALL"};
+ }
+ }
+
+ private final DmaapProperties dmaapProperties;
+ private final DatabaseProperties databaseProperties;
+
+ @Autowired
+ public DmaapRestReaderConfiguration(DmaapProperties dmaapProperties, DatabaseProperties databaseProperties) {
+ this.dmaapProperties = dmaapProperties;
+ this.databaseProperties = databaseProperties;
+ }
+
+ public String getMeasurementsTopicUrl() {
+ return dmaapProperties.getMeasurementsTopicUrl();
+ }
+
+
+ @Bean
+ public DataSource dataSource() {
+ DriverManagerDataSource dataSource = new DriverManagerDataSource();
+ dataSource.setDriverClassName(databaseProperties.getDriverClassName());
+ dataSource.setUrl("jdbc:mysql://" + databaseProperties.getHost() + ":" + databaseProperties.getPort() + "/" + databaseProperties.getName());
+ dataSource.setUsername(databaseProperties.getUsername());
+ dataSource.setPassword(databaseProperties.getPassword());
+ return dataSource;
+ }
+
+ @Bean
+ public PlatformTransactionManager transactionManager(DataSource ds) {
+ return new DataSourceTransactionManager(dataSource());
+ }
+
+
+ @Bean
+ public RestTemplate restTemplate() {
+ SSLConnectionSocketFactory socketFactory = null;
+ try {
+ SSLContext sslContext = new SSLContextBuilder()
+ .loadTrustMaterial(null, new TrustAllStrategy())
+ .build();
+ HostnameVerifier trustAll = new HostnameVerifier() {
+ @Override
+ public boolean verify(String s, SSLSession sslSession) {
+ return true;
+ }
+ };
+ socketFactory = new SSLConnectionSocketFactory(sslContext, trustAll);
+
+ HttpClient httpClient = HttpClients.custom().setSSLSocketFactory(socketFactory).build();
+ HttpComponentsClientHttpRequestFactory httpClientFactory = new HttpComponentsClientHttpRequestFactory(httpClient);
+
+ RestTemplate template = new RestTemplate();
+ template.setRequestFactory(httpClientFactory);
+ return template;
+ } catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/SwaggerConfig.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/SwaggerConfig.java
new file mode 100644
index 0000000..e449e50
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/SwaggerConfig.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+
+package org.onap.rapp.datacollector.service.configuration;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
+import springfox.documentation.builders.ApiInfoBuilder;
+import springfox.documentation.builders.PathSelectors;
+import springfox.documentation.builders.RequestHandlerSelectors;
+import springfox.documentation.service.ApiInfo;
+import springfox.documentation.spi.DocumentationType;
+import springfox.documentation.spring.web.plugins.Docket;
+import springfox.documentation.swagger2.annotations.EnableSwagger2;
+
+@Configuration
+@EnableSwagger2
+public class SwaggerConfig implements WebMvcConfigurer {
+ @Bean
+ public Docket api() {
+ return new Docket(DocumentationType.SWAGGER_2)
+ .apiInfo(apiInfo())
+ .select()
+ .apis(RequestHandlerSelectors.basePackage("org.onap.rapp.datacollector"))
+ .paths(PathSelectors.any())
+ .build();
+ }
+
+ public ApiInfo apiInfo() {
+ final ApiInfoBuilder builder = new ApiInfoBuilder();
+ builder
+ .title("DataCollector's REST APIs (Policy Enforcement PoC)")
+ .description("DataCollector's REST interfaces for serving VES Data to other R-APPs")
+ .version("1.0.0")
+ .license("Copyright (C) 2021 Samsung Electronics");
+ return builder.build();
+ }
+} \ No newline at end of file
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/WebConfig.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/WebConfig.java
new file mode 100644
index 0000000..9196c0f
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/WebConfig.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.rapp.datacollector.service.configuration;
+
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.servlet.config.annotation.CorsRegistry;
+import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
+
+@Configuration
+public class WebConfig implements WebMvcConfigurer {
+
+ @Override
+ public void addCorsMappings(CorsRegistry registry) {
+ registry.addMapping("/**");
+ }
+}
diff --git a/datacollector/src/main/resources/META-INF/spring.factories b/datacollector/src/main/resources/META-INF/spring.factories
new file mode 100644
index 0000000..5b31f5a
--- /dev/null
+++ b/datacollector/src/main/resources/META-INF/spring.factories
@@ -0,0 +1 @@
+org.springframework.data.jdbc.repository.config.DialectResolver$JdbcDialectProvider=org.onap.rapp.datacollector.service.MariadbDialectResolver \ No newline at end of file
diff --git a/datacollector/src/main/resources/application.yml b/datacollector/src/main/resources/application.yml
new file mode 100644
index 0000000..0b8b661
--- /dev/null
+++ b/datacollector/src/main/resources/application.yml
@@ -0,0 +1,27 @@
+server:
+ port: 8087
+dmaap:
+ prtocol: "http"
+ host: "localhost"
+ port: 8181
+ measurements-topic: "measurements"
+database:
+ host: mariadb-host
+ port: 3306
+ name: "ves"
+ username: ves
+ driver-class-name: "org.mariadb.jdbc.Driver"
+logging:
+ level:
+ org:
+ springframework: DEBUG
+ logging.file.name: logs/rapp-datacollector.log
+ pattern:
+ console: "%d %-5level %logger : %msg%n"
+ file: "%d %-5level [%thread] %logger : %msg%n"
+spring:
+ autoconfigure:
+ exclude:
+ - org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration
+ main:
+ allow-bean-definition-overriding: true
diff --git a/datacollector/src/test/java/org/onap/rapp/datacollector/controller/PMControllerTest.java b/datacollector/src/test/java/org/onap/rapp/datacollector/controller/PMControllerTest.java
new file mode 100644
index 0000000..1f23f48
--- /dev/null
+++ b/datacollector/src/test/java/org/onap/rapp/datacollector/controller/PMControllerTest.java
@@ -0,0 +1,303 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.rapp.datacollector.controller;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.onap.rapp.datacollector.entity.pm.AggregatedPM;
+import org.onap.rapp.datacollector.entity.pm.PMData;
+import org.onap.rapp.datacollector.service.PMService;
+import org.hamcrest.Matchers;
+import org.hamcrest.core.IsNull;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.BDDMockito;
+import org.mockito.internal.verification.VerificationModeFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest;
+import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.http.MediaType;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.test.context.junit4.SpringRunner;
+import org.springframework.test.web.servlet.MockMvc;
+import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
+import org.springframework.test.web.servlet.result.MockMvcResultHandlers;
+import org.springframework.test.web.servlet.result.MockMvcResultMatchers;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.time.OffsetDateTime;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
+import static org.mockito.Mockito.when;
+
+@RunWith(SpringRunner.class)
+@WebMvcTest(PMController.class)
+@ActiveProfiles("test")
+public class PMControllerTest {
+
+ private static final int SLOT = 10;
+ private static final int COUNT = 12;
+ private static final String startTime = OffsetDateTime.now().minusSeconds(SLOT * COUNT).toString();
+
+ @Autowired
+ private MockMvc mockMvc;
+
+ @MockBean(name = "pmService")
+ private PMService pmService;
+
+ @Autowired
+ private ObjectMapper mapper;
+
+ protected <T> T mapFromJson(String json, Class<T> clazz) throws IOException {
+ return mapper.readValue(json, clazz);
+ }
+
+ private List<PMData> pmDataList;
+
+ @Before
+ public void setUp() throws Exception {
+ String testPmContent = getSamplePMData();
+ pmDataList = Collections.singletonList(this.mapFromJson(testPmContent, PMData.class));
+ }
+
+ @Test
+ public void retrievePMData() throws Exception {
+ when(pmService.getAggregatedPMDataForTimeInterval(SLOT, COUNT, OffsetDateTime.parse(startTime))).thenReturn(new AggregatedPM(pmDataList, pmDataList.size()));
+
+ BDDMockito
+ .given(pmService.getAggregatedPMDataForTimeInterval(SLOT, COUNT, OffsetDateTime.parse(startTime)))
+ .willReturn(new AggregatedPM(pmDataList, pmDataList.size()));
+
+ mockMvc
+ // when
+ .perform(
+ MockMvcRequestBuilders
+ .get("/v1/pm/events/aggregatedmetrics")
+ .param("slot", String.valueOf(SLOT))
+ .param("count", String.valueOf(COUNT))
+ .param("startTime", startTime)
+ .accept(MediaType.APPLICATION_JSON)
+ )
+ // then
+ .andDo(MockMvcResultHandlers.print())
+ .andExpect(
+ MockMvcResultMatchers.status().isOk()
+ )
+ .andExpect(
+ MockMvcResultMatchers
+ .content()
+ .contentTypeCompatibleWith(MediaType.APPLICATION_JSON)
+ )
+ .andExpect(
+ MockMvcResultMatchers.jsonPath("$", Matchers.notNullValue())
+ )
+ .andExpect(
+ MockMvcResultMatchers.jsonPath("$.pm[0].cellId", Matchers.is("Cell1"))
+ )
+ .andExpect(
+ MockMvcResultMatchers.jsonPath("$.itemsLength", Matchers.is(1))
+ )
+ .andExpect(
+ MockMvcResultMatchers.jsonPath("$.pm[0].performance[0].latency", Matchers.is(20))
+ )
+ .andExpect(
+ MockMvcResultMatchers.jsonPath("$.pm[0].performance[0].throughput", Matchers.is(80))
+ )
+ .andExpect(
+ MockMvcResultMatchers.jsonPath("$.pm[0].performance.*", hasSize(12))
+ )
+ ;
+
+ // verify
+ BDDMockito
+ .verify(pmService, VerificationModeFactory.times(1))
+ .getAggregatedPMDataForTimeInterval(SLOT, COUNT, OffsetDateTime.parse(startTime));
+ }
+
+ @Test
+ public void retrievePMDataWithEmptySlotOnBeginning() throws Exception {
+ pmDataList.get(0).getPerformance().get(0).setLatency(null);
+ pmDataList.get(0).getPerformance().get(0).setThroughput(null);
+
+ when(pmService.getAggregatedPMDataForTimeInterval(SLOT, COUNT, OffsetDateTime.parse(startTime))).thenReturn(new AggregatedPM(pmDataList, pmDataList.size()));
+
+ BDDMockito
+ .given(pmService.getAggregatedPMDataForTimeInterval(SLOT, COUNT, OffsetDateTime.parse(startTime)))
+ .willReturn(new AggregatedPM(pmDataList, pmDataList.size()));
+
+ mockMvc
+ // when
+ .perform(
+ MockMvcRequestBuilders
+ .get("/v1/pm/events/aggregatedmetrics")
+ .param("slot", String.valueOf(SLOT))
+ .param("count", String.valueOf(COUNT))
+ .param("startTime", startTime)
+ .accept(MediaType.APPLICATION_JSON)
+ )
+ // then
+ .andDo(MockMvcResultHandlers.print())
+ .andExpect(
+ MockMvcResultMatchers.status().isOk()
+ )
+ .andExpect(
+ MockMvcResultMatchers
+ .content()
+ .contentTypeCompatibleWith(MediaType.APPLICATION_JSON)
+ )
+ .andExpect(
+ MockMvcResultMatchers.jsonPath("$.pm[0].cellId", Matchers.is("Cell1"))
+ )
+ .andExpect(
+ MockMvcResultMatchers.jsonPath("$.itemsLength", Matchers.is(1))
+ )
+ .andExpect(
+ MockMvcResultMatchers.jsonPath("$.pm[0].performance[0].latency").value(IsNull.nullValue())
+ )
+ .andExpect(
+ MockMvcResultMatchers.jsonPath("$.pm[0].performance[0].throughput").value(IsNull.nullValue())
+ )
+ .andExpect(
+ MockMvcResultMatchers.jsonPath("$.pm[0].performance.*", hasSize(12))
+ )
+ ;
+
+ // verify
+ BDDMockito
+ .verify(pmService, VerificationModeFactory.times(1))
+ .getAggregatedPMDataForTimeInterval(SLOT, COUNT, OffsetDateTime.parse(startTime));
+ }
+
+ @Test
+ public void retrievePMDataWithEmptySlotOnEnd() throws Exception {
+ pmDataList.get(0).getPerformance().get(11).setLatency(null);
+ pmDataList.get(0).getPerformance().get(11).setThroughput(null);
+
+ when(pmService.getAggregatedPMDataForTimeInterval(SLOT, COUNT, OffsetDateTime.parse(startTime))).thenReturn(new AggregatedPM(pmDataList, pmDataList.size()));
+
+ BDDMockito
+ .given(pmService.getAggregatedPMDataForTimeInterval(SLOT, COUNT, OffsetDateTime.parse(startTime)))
+ .willReturn(new AggregatedPM(pmDataList, pmDataList.size()));
+
+ mockMvc
+ // when
+ .perform(
+ MockMvcRequestBuilders
+ .get("/v1/pm/events/aggregatedmetrics")
+ .param("slot", String.valueOf(SLOT))
+ .param("count", String.valueOf(COUNT))
+ .param("startTime", startTime)
+ .accept(MediaType.APPLICATION_JSON)
+ )
+ // then
+ .andDo(MockMvcResultHandlers.print())
+ .andExpect(
+ MockMvcResultMatchers.status().isOk()
+ )
+ .andExpect(
+ MockMvcResultMatchers
+ .content()
+ .contentTypeCompatibleWith(MediaType.APPLICATION_JSON)
+ )
+ .andExpect(
+ MockMvcResultMatchers.jsonPath("$.pm[0].cellId", Matchers.is("Cell1"))
+ )
+ .andExpect(
+ MockMvcResultMatchers.jsonPath("$.itemsLength", Matchers.is(1))
+ )
+ .andExpect(
+ MockMvcResultMatchers.jsonPath("$.pm[0].performance[11].latency").value(IsNull.nullValue())
+ )
+ .andExpect(
+ MockMvcResultMatchers.jsonPath("$.pm[0].performance[11].throughput").value(IsNull.nullValue())
+ )
+ .andExpect(
+ MockMvcResultMatchers.jsonPath("$.pm[0].performance.*", hasSize(12))
+ )
+ ;
+
+ // verify
+ BDDMockito
+ .verify(pmService, VerificationModeFactory.times(1))
+ .getAggregatedPMDataForTimeInterval(SLOT, COUNT, OffsetDateTime.parse(startTime));
+ }
+
+ @Test
+ public void retrievePMDataWithEmptySlotInMiddle() throws Exception {
+ pmDataList.get(0).getPerformance().get(5).setLatency(null);
+ pmDataList.get(0).getPerformance().get(5).setThroughput(null);
+
+ when(pmService.getAggregatedPMDataForTimeInterval(SLOT, COUNT, OffsetDateTime.parse(startTime))).thenReturn(new AggregatedPM(pmDataList, pmDataList.size()));
+
+ BDDMockito
+ .given(pmService.getAggregatedPMDataForTimeInterval(SLOT, COUNT, OffsetDateTime.parse(startTime)))
+ .willReturn(new AggregatedPM(pmDataList, pmDataList.size()));
+
+ mockMvc
+ // when
+ .perform(
+ MockMvcRequestBuilders
+ .get("/v1/pm/events/aggregatedmetrics")
+ .param("slot", String.valueOf(SLOT))
+ .param("count", String.valueOf(COUNT))
+ .param("startTime", startTime)
+ .accept(MediaType.APPLICATION_JSON)
+ )
+ // then
+ .andDo(MockMvcResultHandlers.print())
+ .andExpect(
+ MockMvcResultMatchers.status().isOk()
+ )
+ .andExpect(
+ MockMvcResultMatchers
+ .content()
+ .contentTypeCompatibleWith(MediaType.APPLICATION_JSON)
+ )
+ .andExpect(
+ MockMvcResultMatchers.jsonPath("$.pm[0].cellId", Matchers.is("Cell1"))
+ )
+ .andExpect(
+ MockMvcResultMatchers.jsonPath("$.itemsLength", Matchers.is(1))
+ )
+ .andExpect(
+ MockMvcResultMatchers.jsonPath("$.pm[0].performance[5].latency").value(IsNull.nullValue())
+ )
+ .andExpect(
+ MockMvcResultMatchers.jsonPath("$.pm[0].performance[5].throughput").value(IsNull.nullValue())
+ )
+ .andExpect(
+ MockMvcResultMatchers.jsonPath("$.pm[0].performance.*", hasSize(12))
+ )
+ ;
+
+ // verify
+ BDDMockito
+ .verify(pmService, VerificationModeFactory.times(1))
+ .getAggregatedPMDataForTimeInterval(SLOT, COUNT, OffsetDateTime.parse(startTime));
+ }
+
+ private String getSamplePMData() throws Exception {
+ String testPmContent;
+ InputStream in = this.getClass().getResourceAsStream("/sample-pm.json");
+ try (in) {
+ BufferedReader inr = new BufferedReader(new InputStreamReader(in));
+ testPmContent = inr.lines().collect(Collectors.joining(" "));
+ }
+ return testPmContent;
+ }
+} \ No newline at end of file
diff --git a/datacollector/src/test/java/org/onap/rapp/datacollector/entity/ves/AdditionalMeasurementValuesTest.java b/datacollector/src/test/java/org/onap/rapp/datacollector/entity/ves/AdditionalMeasurementValuesTest.java
new file mode 100644
index 0000000..b1de446
--- /dev/null
+++ b/datacollector/src/test/java/org/onap/rapp/datacollector/entity/ves/AdditionalMeasurementValuesTest.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+
+package org.onap.rapp.datacollector.entity.ves;
+
+import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+
+public class AdditionalMeasurementValuesTest {
+
+ @Test
+ public void of() {
+ AdditionalMeasurementValues actual = AdditionalMeasurementValues.of("test-name", "test-parameter", "test-value");
+ assertEquals("test-name", actual.name);
+ assertEquals("test-parameter", actual.parameterName);
+ assertEquals("test-value", actual.parameterValue);
+ }
+}
diff --git a/datacollector/src/test/java/org/onap/rapp/datacollector/entity/ves/AdditionalMeasurementsTest.java b/datacollector/src/test/java/org/onap/rapp/datacollector/entity/ves/AdditionalMeasurementsTest.java
new file mode 100644
index 0000000..14eacdc
--- /dev/null
+++ b/datacollector/src/test/java/org/onap/rapp/datacollector/entity/ves/AdditionalMeasurementsTest.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+
+package org.onap.rapp.datacollector.entity.ves;
+
+import static org.junit.Assert.assertEquals;
+import java.util.Map;
+import org.junit.Test;
+
+public class AdditionalMeasurementsTest {
+
+ final Map<String, String> m = Map.of("k1", "v1", "k2", "v2");
+
+ @Test
+ public void factories() {
+ AdditionalMeasurements actual = AdditionalMeasurements.of("test-measurements", m);
+ assertEquals("test-measurements", actual.name);
+ actual.values.forEach(v -> assertEquals(m.get(v.parameterName), v.parameterValue));
+
+ actual = AdditionalMeasurements.of(1234L, "test-measurements", m);
+ assertEquals("test-measurements", actual.name);
+ actual.values.forEach(v -> assertEquals(m.get(v.parameterName), v.parameterValue));
+ assertEquals(1234L, actual.eventId.longValue());
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void immutability() {
+ AdditionalMeasurements actual = AdditionalMeasurements.builder()
+ .withEventId(123L)
+ .withName("test-measurements")
+ .withHashMap(m)
+ .build();
+ assertEquals(123L, actual.eventId.longValue());
+ assertEquals("test-measurements", actual.name);
+ actual.values.forEach(v -> assertEquals(m.get(v.parameterName), v.parameterValue));
+
+ actual.values.add(
+ AdditionalMeasurementValues.of("test", "p1", "v1")
+ );
+ }
+
+} \ No newline at end of file
diff --git a/datacollector/src/test/java/org/onap/rapp/datacollector/entity/ves/CommonEventHeaderTest.java b/datacollector/src/test/java/org/onap/rapp/datacollector/entity/ves/CommonEventHeaderTest.java
new file mode 100644
index 0000000..873ce1b
--- /dev/null
+++ b/datacollector/src/test/java/org/onap/rapp/datacollector/entity/ves/CommonEventHeaderTest.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+
+package org.onap.rapp.datacollector.entity.ves;
+
+import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+
+public class CommonEventHeaderTest {
+
+ public static CommonEventHeader createDumyCommonEventHeader() {
+ return CommonEventHeader.builder()
+ .domain("domain")
+ .eventId("eventId")
+ .eventName("eventName")
+ .eventType("eventType")
+ .lastEpochMicrosec(12345L)
+ .nfcNamingCode("nfcNamingCode")
+ .nfNamingCode("nfNamingCode")
+ .priority("priority")
+ .reportingEntityId("entityId")
+ .reportingEntityName("reportingEntityName")
+ .sequence(567)
+ .sourceId("sourceId")
+ .sourceName("sourceName")
+ .startEpochMicrosec(123456789L)
+ .version("version")
+ .timeZoneOffset("UTC+2")
+ .build();
+
+ }
+
+ public static CommonEventHeader createDumyCommonEventHeaderWithLastEpochMicro(Long lastEpochMicro) {
+ return CommonEventHeader.builder()
+ .domain("domain")
+ .eventId("eventId")
+ .eventName("eventName")
+ .eventType("eventType")
+ .lastEpochMicrosec(lastEpochMicro)
+ .nfcNamingCode("nfcNamingCode")
+ .nfNamingCode("nfNamingCode")
+ .priority("priority")
+ .reportingEntityId("entityId")
+ .reportingEntityName("reportingEntityName")
+ .sequence(567)
+ .sourceId("sourceId")
+ .sourceName("sourceName")
+ .startEpochMicrosec(123456789L)
+ .version("version")
+ .timeZoneOffset("UTC+2")
+ .build();
+
+ }
+
+ @Test
+ public void builder() {
+ CommonEventHeader actual = createDumyCommonEventHeader();
+
+ assertEquals("version", actual.version);
+ assertEquals("domain", actual.domain);
+ assertEquals("eventId", actual.eventId);
+ assertEquals("eventName", actual.eventName);
+ assertEquals("eventType", actual.eventType);
+ assertEquals(12345L, actual.lastEpochMicrosec.longValue());
+ assertEquals("nfcNamingCode", actual.nfcNamingCode);
+ assertEquals("nfNamingCode", actual.nfNamingCode);
+ assertEquals("priority", actual.priority);
+ assertEquals("entityId", actual.reportingEntityId);
+ assertEquals("reportingEntityName", actual.reportingEntityName);
+ assertEquals(567, actual.sequence.intValue());
+ assertEquals("sourceId", actual.sourceId);
+ assertEquals("sourceName", actual.sourceName);
+ assertEquals(123456789L, actual.startEpochMicrosec.longValue());
+ assertEquals("UTC+2", actual.timeZoneOffset);
+
+
+ }
+} \ No newline at end of file
diff --git a/datacollector/src/test/java/org/onap/rapp/datacollector/entity/ves/EventTest.java b/datacollector/src/test/java/org/onap/rapp/datacollector/entity/ves/EventTest.java
new file mode 100644
index 0000000..9b6700b
--- /dev/null
+++ b/datacollector/src/test/java/org/onap/rapp/datacollector/entity/ves/EventTest.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+
+package org.onap.rapp.datacollector.entity.ves;
+
+import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+
+public class EventTest {
+
+ public static Event createDumyEvent() {
+ CommonEventHeader header =
+ CommonEventHeaderTest.createDumyCommonEventHeader();
+ MeasurementFields fields =
+ MeasurementFieldsTest.createDummy(MeasurementFieldsTest.createDummyAdditionalMeasurements());
+ return Event.of(header, fields);
+ }
+
+ public static Event createDumyEventWithUe() {
+ CommonEventHeader header =
+ CommonEventHeaderTest.createDumyCommonEventHeader();
+ MeasurementFields fields =
+ MeasurementFieldsTest.createDummy(MeasurementFieldsTest.createDummyAdditionalMeasurementsWithTrafficModel());
+ return Event.of(header, fields);
+ }
+
+ @Test
+ public void of() {
+ CommonEventHeader header =
+ CommonEventHeaderTest.createDumyCommonEventHeader();
+ MeasurementFields fields =
+ MeasurementFieldsTest.createDummy(MeasurementFieldsTest.createDummyAdditionalMeasurements());
+ Event actual = Event.of(header, fields);
+
+ assertEquals(header, actual.commonEventHeader);
+ assertEquals(fields, actual.measurementFields);
+ }
+
+ @Test
+ public void testOf() {
+ CommonEventHeader header =
+ CommonEventHeaderTest.createDumyCommonEventHeader();
+ MeasurementFields fields =
+ MeasurementFieldsTest.createDummy(MeasurementFieldsTest.createDummyAdditionalMeasurements());
+ Event actual = Event.of(12345L, header, fields);
+
+ assertEquals(12345L, actual.id.longValue());
+ assertEquals(header, actual.commonEventHeader);
+ assertEquals(fields, actual.measurementFields);
+ }
+} \ No newline at end of file
diff --git a/datacollector/src/test/java/org/onap/rapp/datacollector/entity/ves/MeasurementFieldsTest.java b/datacollector/src/test/java/org/onap/rapp/datacollector/entity/ves/MeasurementFieldsTest.java
new file mode 100644
index 0000000..b3ca614
--- /dev/null
+++ b/datacollector/src/test/java/org/onap/rapp/datacollector/entity/ves/MeasurementFieldsTest.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+
+package org.onap.rapp.datacollector.entity.ves;
+
+import static org.junit.Assert.assertEquals;
+import java.util.List;
+import java.util.Map;
+import org.junit.Test;
+
+public class MeasurementFieldsTest {
+ static AdditionalMeasurements createDummyAdditionalMeasurements() {
+ return AdditionalMeasurements.builder()
+ .withName("test-measurements")
+ .withHashMap(Map.of("k1", "v1", "k2", "v2", "k3", "v3"))
+ .build();
+ }
+
+ public static AdditionalMeasurements createDummyAdditionalMeasurementsWithTrafficModel() {
+ return AdditionalMeasurements.builder()
+ .withName("trafficModel")
+ .withHashMap(Map.of("emergency_samsung_01", "v1", "mobile_samsung_s10", "v2"))
+ .build();
+ }
+
+ static MeasurementFields createDummy(AdditionalMeasurements v) {
+ return MeasurementFields.builder()
+ .measurementInterval(1234567L)
+ .additionalMeasurements(List.of(v))
+ .build();
+
+ }
+
+ @Test
+ public void test() {
+ AdditionalMeasurements v = createDummyAdditionalMeasurements();
+ MeasurementFields actual = createDummy(v);
+
+ assertEquals(1234567L, actual.measurementInterval);
+ assertEquals("4.0", actual.measurementFieldsVersion);
+ assertEquals(List.of(v), actual.additionalMeasurements);
+ }
+}
diff --git a/datacollector/src/test/java/org/onap/rapp/datacollector/service/DataAggregationServiceTest.java b/datacollector/src/test/java/org/onap/rapp/datacollector/service/DataAggregationServiceTest.java
new file mode 100644
index 0000000..01dbd0b
--- /dev/null
+++ b/datacollector/src/test/java/org/onap/rapp/datacollector/service/DataAggregationServiceTest.java
@@ -0,0 +1,133 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.rapp.datacollector.service;
+
+import org.onap.rapp.datacollector.entity.DataAggregationInfo;
+import org.onap.rapp.datacollector.entity.pm.PMData;
+import org.onap.rapp.datacollector.entity.ves.AdditionalMeasurements;
+import org.onap.rapp.datacollector.entity.ves.CommonEventHeader;
+import org.onap.rapp.datacollector.entity.ves.CommonEventHeaderTest;
+import org.onap.rapp.datacollector.entity.ves.Event;
+import org.onap.rapp.datacollector.entity.ves.MeasurementFields;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public class DataAggregationServiceTest {
+
+ public static final int MICRO_SECONDS_OF_SECOND = 1_000_000;
+
+ List<Event> events;
+ DataAggregationInfo dataAggregationInfo;
+
+ DataAggregationService dataAggregationService;
+
+ @Before
+ public void init() {
+ events = new ArrayList<>();
+ dataAggregationService = new DataAggregationService();
+ dataAggregationInfo = createAggregationInfo();
+ long startTime = dataAggregationInfo.getStartTime();
+
+
+ for (int i = 0; i < 25; i++) {
+ CommonEventHeader header = CommonEventHeaderTest.createDumyCommonEventHeaderWithLastEpochMicro(startTime);
+ MeasurementFields measurements = createMeasurementFields();
+ events.add(Event.of(header, measurements));
+ startTime = startTime + MICRO_SECONDS_OF_SECOND;
+ }
+ }
+
+ private DataAggregationInfo createAggregationInfo() {
+ long startTime = Instant.now().getEpochSecond() * MICRO_SECONDS_OF_SECOND;
+ return DataAggregationInfo.builder()
+ .slot(5 * MICRO_SECONDS_OF_SECOND)
+ .startTime(startTime)
+ .endTime(startTime + 5 * 5 * MICRO_SECONDS_OF_SECOND)
+ .build();
+ }
+
+ private MeasurementFields createMeasurementFields() {
+ AdditionalMeasurements latency = AdditionalMeasurements.of("latency",
+ Map.of("latency", "20"));
+ AdditionalMeasurements throughput = AdditionalMeasurements.of("throughput",
+ Map.of("throughput", "80"));
+ return MeasurementFields.builder()
+ .additionalMeasurements(List.of(latency, throughput))
+ .build();
+ }
+
+ @Test
+ public void verifyAggregationData() {
+ PMData pmEntity = dataAggregationService.getAggregatedDataFromEventsForCell("Cell1", events, this.dataAggregationInfo);
+
+ pmEntity.getPerformance().forEach(pm ->{
+ Assert.assertEquals(Optional.of(pm.getLatency()), Optional.of(20));
+ Assert.assertEquals(Optional.of(pm.getThroughput()), Optional.of(80));
+ });
+ }
+
+ @Test
+ public void verifyAggregationDataWithEmptySlotOnBeginning() {
+ long startTime = this.dataAggregationInfo.getStartTime() - 10 * MICRO_SECONDS_OF_SECOND;
+ DataAggregationInfo dataAggregationInfo = DataAggregationInfo.builder()
+ .slot(5 * MICRO_SECONDS_OF_SECOND)
+ .startTime(startTime)
+ .endTime(startTime + 5 * 6 * MICRO_SECONDS_OF_SECOND)
+ .build();
+
+ PMData pmEntity = dataAggregationService.getAggregatedDataFromEventsForCell("Cell1", events, dataAggregationInfo);
+
+ Assert.assertNull(pmEntity.getPerformance().get(0).getLatency());
+ Assert.assertNull(pmEntity.getPerformance().get(0).getThroughput());
+ Assert.assertEquals(pmEntity.getCellId(), "Cell1");
+ }
+
+ @Test
+ public void verifyAggregationDataWithEmptySlotOnEnd() {
+ long startTime = this.dataAggregationInfo.getStartTime();
+ DataAggregationInfo dataAggregationInfo = DataAggregationInfo.builder()
+ .slot(5 * MICRO_SECONDS_OF_SECOND)
+ .startTime(this.dataAggregationInfo.getStartTime())
+ .endTime(startTime + 5 * 6 * MICRO_SECONDS_OF_SECOND)
+ .build();
+
+ PMData pmEntity = dataAggregationService.getAggregatedDataFromEventsForCell("Cell1", events, dataAggregationInfo);
+
+ Assert.assertNull(pmEntity.getPerformance().get(5).getLatency());
+ Assert.assertNull(pmEntity.getPerformance().get(5).getThroughput());
+ }
+
+ @Test
+ public void verifyAggregationDataWithEmptySlotInMiddle() {
+ removeSecondSlotEvents();
+ PMData pmEntity = dataAggregationService.getAggregatedDataFromEventsForCell("Cell1", events, dataAggregationInfo);
+
+ Assert.assertNull(pmEntity.getPerformance().get(1).getLatency());
+ Assert.assertNull(pmEntity.getPerformance().get(1).getThroughput());
+ }
+
+ private void removeSecondSlotEvents() {
+ for (int i = 0; i < 6; i++) {
+ events.remove(5);
+ }
+ }
+
+}
diff --git a/datacollector/src/test/java/org/onap/rapp/datacollector/service/DmaapRestReaderConfigurationTest.java b/datacollector/src/test/java/org/onap/rapp/datacollector/service/DmaapRestReaderConfigurationTest.java
new file mode 100644
index 0000000..5a593f8
--- /dev/null
+++ b/datacollector/src/test/java/org/onap/rapp/datacollector/service/DmaapRestReaderConfigurationTest.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+
+package org.onap.rapp.datacollector.service;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.onap.rapp.datacollector.service.configuration.DatabaseProperties;
+import org.onap.rapp.datacollector.service.configuration.DmaapProperties;
+import org.onap.rapp.datacollector.service.configuration.DmaapRestReaderConfiguration;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.TestPropertySource;
+import org.springframework.test.context.junit4.SpringRunner;
+
+@RunWith(SpringRunner.class)
+@ComponentScan("org.onap.rapp.datacollector.service.configuration")
+@EnableConfigurationProperties({DmaapProperties.class, DatabaseProperties.class})
+@ContextConfiguration(classes = {DmaapRestReaderConfiguration.class})
+@TestPropertySource(properties = {"dmaap.host=localhost",
+ "dmaap.protocol=http",
+ "dmaap.port=8080",
+ "dmaap.measurements-topic=a-topic",
+ "database.url=jdbc:mysql://172.17.0.2:3306/ves?createDatabaseIfNotExist=true",
+ "database.username=root",
+ "database.password=mypass",
+ "database.driver-class-name=org.mariadb.jdbc.Driver"
+})
+
+public class DmaapRestReaderConfigurationTest {
+
+ @Autowired
+ private DmaapRestReaderConfiguration config;
+
+ @Test
+ public void testUrlConstruction() {
+ final String actual = config.getMeasurementsTopicUrl();
+ final String expected = "http://localhost:8080/a-topic";
+
+ assertEquals(expected, actual);
+ }
+}
diff --git a/datacollector/src/test/java/org/onap/rapp/datacollector/service/VesParserImplTest.java b/datacollector/src/test/java/org/onap/rapp/datacollector/service/VesParserImplTest.java
new file mode 100644
index 0000000..f6cd0e5
--- /dev/null
+++ b/datacollector/src/test/java/org/onap/rapp/datacollector/service/VesParserImplTest.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+
+package org.onap.rapp.datacollector.service;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.stream.Collectors;
+import org.junit.Before;
+import org.junit.Test;
+import com.google.gson.JsonParseException;
+import org.onap.rapp.datacollector.entity.ves.Event;
+
+public class VesParserImplTest {
+ String testVesContent;
+ VesParser parser = new VesParserImpl();
+
+ @Before
+ public void setUp() throws Exception {
+ InputStream in = this.getClass().getResourceAsStream("/sample-ves.json");
+ try (in) {
+ BufferedReader inr = new BufferedReader(new InputStreamReader(in));
+ testVesContent = inr.lines().collect(Collectors.joining(" "));
+ }
+ }
+
+ @Test
+ public void parse() {
+ Event actual = parser.parse(testVesContent);
+ assertEquals("4.0.1", actual.commonEventHeader.version);
+ assertEquals(1413378172000000L, (long) actual.commonEventHeader.lastEpochMicrosec);
+ assertEquals(1413378172000000L, (long) actual.commonEventHeader.startEpochMicrosec);
+ assertEquals(3, (int) actual.commonEventHeader.sequence);
+ assertEquals("measurement", actual.commonEventHeader.domain);
+ assertEquals("UTC-05:30", actual.commonEventHeader.timeZoneOffset);
+ }
+
+ @Test(expected = JsonParseException.class)
+ public void parseEmpty() {
+ Event actual = parser.parse("{\"event\":{}}");
+ }
+} \ No newline at end of file
diff --git a/datacollector/src/test/java/org/onap/rapp/datacollector/service/VesPersisterSqlImplTest.java b/datacollector/src/test/java/org/onap/rapp/datacollector/service/VesPersisterSqlImplTest.java
new file mode 100644
index 0000000..72b31c9
--- /dev/null
+++ b/datacollector/src/test/java/org/onap/rapp/datacollector/service/VesPersisterSqlImplTest.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.rapp.datacollector.service;
+
+import org.onap.rapp.datacollector.entity.ves.Event;
+import org.onap.rapp.datacollector.entity.ves.EventTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.BDDMockito;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.internal.verification.VerificationModeFactory;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class VesPersisterSqlImplTest {
+
+ @InjectMocks
+ private VesPersisterSqlImpl vesPersisterSql;
+
+ @Mock
+ private SqlRepository repository;
+
+ final Event event = EventTest.createDumyEvent();
+
+ @Test
+ public void persists() {
+
+ vesPersisterSql.persists(event);
+
+ // verify
+ BDDMockito
+ .verify(repository, VerificationModeFactory.times(1))
+ .save(event);
+ }
+} \ No newline at end of file
diff --git a/datacollector/src/test/java/org/onap/rapp/datacollector/service/VesRetrievalServiceTest.java b/datacollector/src/test/java/org/onap/rapp/datacollector/service/VesRetrievalServiceTest.java
new file mode 100644
index 0000000..0126019
--- /dev/null
+++ b/datacollector/src/test/java/org/onap/rapp/datacollector/service/VesRetrievalServiceTest.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.rapp.datacollector.service;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.onap.rapp.datacollector.entity.ves.Event;
+import org.onap.rapp.datacollector.service.configuration.DmaapRestReaderConfiguration;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.client.RestClientException;
+import org.springframework.web.client.RestTemplate;
+import org.onap.rapp.datacollector.entity.ves.EventTest;
+
+@RunWith(MockitoJUnitRunner.class)
+public class VesRetrievalServiceTest {
+
+ @Mock
+ private RestTemplate restTemplate;
+
+ @Mock
+ private DmaapRestReaderConfiguration config;
+
+ @Mock
+ private VesParser parser;
+
+ @Mock
+ private VesPersister persister;
+
+ @Mock
+ UEHolder ueHolder;
+
+ private VesRetrievalService service;
+
+ @Before
+ public void init() {
+ Mockito.when(config.getMeasurementsTopicUrl()).thenReturn("http://localhost/a-topic");
+ String[] response = new String[]{"a", "b"};
+ Mockito.when(restTemplate.getForEntity("http://localhost/a-topic", String[].class))
+ .thenReturn(new ResponseEntity<>(response, HttpStatus.OK));
+
+ service = new VesRetrievalService(restTemplate, parser, persister, config, ueHolder);
+ }
+
+ @Test
+ public void givenMockingIsDoneByMockRestServiceServer_whenGetIsCalled_thenReturnsMockedObject() {
+ HashSet<String> actual = new HashSet<>(service.retrieveEvents());
+ Set<String> expected = Set.of("a", "b");
+ Assert.assertEquals(actual, expected);
+ }
+
+ @Test
+ public void whenGetIsCalled_thenExceptionIsThrown() {
+ Mockito.when(config.getMeasurementsTopicUrl()).thenReturn("http://localhost/a-topic");
+ Mockito.when(restTemplate.getForEntity("http://localhost/a-topic", String[].class))
+ .thenThrow(new RestClientException("An test exception"));
+
+ service = new VesRetrievalService(restTemplate, parser, persister, config, ueHolder);
+ Collection<String> actual = service.retrieveEvents();
+ Assert.assertEquals(0, actual.size());
+ }
+
+ @Test
+ public void whenRetrievedThenAlsoStored() {
+ Mockito.when(config.getMeasurementsTopicUrl()).thenReturn("http://localhost/a-topic");
+ Mockito.when(restTemplate.getForEntity("http://localhost/a-topic", String[].class))
+ .thenReturn(new ResponseEntity<String[]>(new String[]{"dead", "beef"}, HttpStatus.OK));
+ Mockito.when(parser.parse(Mockito.any(String.class)))
+ .thenReturn(EventTest.createDumyEvent());
+
+ service = new VesRetrievalService(restTemplate, parser, persister, config, ueHolder);
+ service.retrieveAndStoreVesEvents();
+
+ Mockito.verify(persister, Mockito.times(2)).persists(Mockito.any(Event.class));
+ }
+
+ @Test
+ public void whenRetrievedThenAlsoStoredWithUE() {
+ Mockito.when(config.getMeasurementsTopicUrl()).thenReturn("http://localhost/a-topic");
+ Mockito.when(restTemplate.getForEntity("http://localhost/a-topic", String[].class))
+ .thenReturn(new ResponseEntity<String[]>(new String[]{"dead", "beef"}, HttpStatus.OK));
+ Mockito.when(parser.parse(Mockito.any(String.class)))
+ .thenReturn(EventTest.createDumyEventWithUe());
+
+ UEHolder ueHolder = new UEHolder();
+
+ service = new VesRetrievalService(restTemplate, parser, persister, config, ueHolder);
+ service.retrieveAndStoreVesEvents();
+
+ Mockito.verify(persister, Mockito.times(2)).persists(Mockito.any(Event.class));
+ Assert.assertEquals(ueHolder.getUes(), Set.of("emergency_samsung_01", "mobile_samsung_s10"));
+ }
+}
+
diff --git a/datacollector/src/test/resources/sample-pm.json b/datacollector/src/test/resources/sample-pm.json
new file mode 100644
index 0000000..d03c067
--- /dev/null
+++ b/datacollector/src/test/resources/sample-pm.json
@@ -0,0 +1,53 @@
+{
+ "cellId": "Cell1",
+ "performance": [
+ {
+ "latency": 20,
+ "throughput": 80
+ },
+ {
+ "latency": 20,
+ "throughput": 80
+ },
+ {
+ "latency": 20,
+ "throughput": 80
+ },
+ {
+ "latency": 20,
+ "throughput": 80
+ },
+ {
+ "latency": 20,
+ "throughput": 80
+ },
+ {
+ "latency": 20,
+ "throughput": 80
+ },
+ {
+ "latency": 20,
+ "throughput": 80
+ },
+ {
+ "latency": 20,
+ "throughput": 80
+ },
+ {
+ "latency": 20,
+ "throughput": 80
+ },
+ {
+ "latency": 20,
+ "throughput": 80
+ },
+ {
+ "latency": 20,
+ "throughput": 80
+ },
+ {
+ "latency": 20,
+ "throughput": 80
+ }
+ ]
+}
diff --git a/datacollector/src/test/resources/sample-ves.json b/datacollector/src/test/resources/sample-ves.json
new file mode 100644
index 0000000..114f592
--- /dev/null
+++ b/datacollector/src/test/resources/sample-ves.json
@@ -0,0 +1,36 @@
+{
+ "event": {
+ "commonEventHeader": {
+ "version": "4.0.1",
+ "vesEventListenerVersion": "7.0.1",
+ "domain": "measurement",
+ "eventName": "Measurement_vIsbcMmc",
+ "eventId": "measurement0000259",
+ "sequence": 3,
+ "priority": "Normal",
+ "reportingEntityId": "cc305d54-75b4-431b-adb2-eb6b9e541234",
+ "reportingEntityName": "ibcx0001vm002oam001",
+ "sourceId": "de305d54-75b4-431b-adb2-eb6b9e546014",
+ "sourceName": "ibcx0001vm002ssc001",
+ "nfVendorName": "Samsung",
+ "nfNamingCode": "ibcx",
+ "nfcNamingCode": "ssc",
+ "startEpochMicrosec": 1413378172000000,
+ "lastEpochMicrosec": 1413378172000000,
+ "timeZoneOffset": "UTC-05:30"
+ },
+ "measurementFields": {
+ "additionalMeasurements": [
+ {
+ "name": "UE-1",
+ "hashMap": {
+ "latency": "20",
+ "throughput": "100"
+ }
+ }
+ ],
+ "measurementInterval": 5,
+ "measurementFieldsVersion": "4.0"
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..3c9ee03
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright (C) 2021 Samsung Electronics
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.onap.rapp</groupId>
+ <artifactId>rapp</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ <packaging>pom</packaging>
+ <name>rapp</name>
+ <description>rapp applications</description>
+ <modules>
+ <module>datacollector</module>
+ </modules>
+
+ <parent>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-parent</artifactId>
+ <version>2.3.0.RELEASE</version>
+ </parent>
+
+ <properties>
+ <java.version>11</java.version>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <maven.build.timestamp.format>yyyyMMdd'T'HHmmss</maven.build.timestamp.format>
+
+ <lombok.version>1.18.8</lombok.version>
+ <mariadb.version>2.6.0</mariadb.version>
+ <gson.version>2.8.6</gson.version>
+ <javax.xml.bind.version>2.3.0</javax.xml.bind.version>
+ <swagger.version>2.9.2</swagger.version>
+ <swagger.maven.plugin.version>3.1.7</swagger.maven.plugin.version>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.8.1</version>
+ <configuration>
+ <annotationProcessorPaths>
+ <path>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ <version>${lombok.version}</version>
+ </path>
+ </annotationProcessorPaths>
+ <compilerArgs>
+ <arg>-sourcepath</arg>
+ <arg>
+ ${project.basedir}/src/main/java${path.separator}${project.basedir}/target/generated-sources/annotations${path.separator}/
+ </arg>
+ </compilerArgs>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ <version>${lombok.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+</project>