summaryrefslogtreecommitdiffstats
path: root/datacollector/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'datacollector/src/main')
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/DataCollectorApplication.java33
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/controller/PMController.java77
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/controller/UEController.java48
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/entity/DataAggregationInfo.java25
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/entity/UEInfo.java28
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/entity/pm/AggregatedPM.java29
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/entity/pm/PMData.java29
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/entity/pm/PerformanceData.java28
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/AdditionalMeasurementValues.java48
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/AdditionalMeasurements.java84
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/CommonEventHeader.java66
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/Event.java62
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/EventAPI.java46
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/MeasurementFields.java48
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/RawPayload.java57
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/service/DataAggregationService.java123
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/service/DmaapRestReader.java21
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/service/MariadbDialectResolver.java46
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/service/PMService.java90
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/service/SqlRepository.java24
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/service/SqlRepositoryAPI.java31
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/service/UEHolder.java32
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/service/VesParser.java21
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/service/VesParserImpl.java99
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/service/VesPersister.java35
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/service/VesPersisterSqlImpl.java85
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/service/VesRetrievalService.java99
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DatabaseProperties.java30
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapProperties.java33
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapRestReaderConfiguration.java165
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/SwaggerConfig.java50
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/WebConfig.java27
-rw-r--r--datacollector/src/main/resources/META-INF/spring.factories1
-rw-r--r--datacollector/src/main/resources/application.yml27
34 files changed, 1747 insertions, 0 deletions
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/DataCollectorApplication.java b/datacollector/src/main/java/org/onap/rapp/datacollector/DataCollectorApplication.java
new file mode 100644
index 0000000..39f4f2d
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/DataCollectorApplication.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+
+package org.onap.rapp.datacollector;
+
+import org.onap.rapp.datacollector.service.configuration.DatabaseProperties;
+import org.onap.rapp.datacollector.service.configuration.DmaapProperties;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.transaction.annotation.EnableTransactionManagement;
+
+@SpringBootApplication
+@EnableScheduling
+@EnableConfigurationProperties({DmaapProperties.class, DatabaseProperties.class})
+@EnableTransactionManagement
+public class DataCollectorApplication {
+ public static void main(String[] args) {
+ SpringApplication.run(DataCollectorApplication.class);
+ }
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/controller/PMController.java b/datacollector/src/main/java/org/onap/rapp/datacollector/controller/PMController.java
new file mode 100644
index 0000000..e79ccb8
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/controller/PMController.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.rapp.datacollector.controller;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import java.time.Instant;
+import java.time.OffsetDateTime;
+import org.onap.rapp.datacollector.entity.pm.AggregatedPM;
+import org.onap.rapp.datacollector.service.PMService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpStatus;
+import org.springframework.stereotype.Controller;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.ResponseBody;
+import org.springframework.web.server.ResponseStatusException;
+
+@Controller("pmController")
+@Api(tags = {"RESTful APIs for DataCollector (current is PM DataCollector) R-APP mS"})
+public class PMController {
+
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ private final PMService pmService;
+
+ public PMController(PMService pmService) {
+ this.pmService = pmService;
+ }
+
+ @ApiOperation(value = "Get the latest aggregated pm ves events from database.",
+ notes = "Returns the latest aggregated pm ves events from database between "
+ + "startTime and now, together with the itemsLength "
+ + "(i.e., total items in the returned pm array, i.e., active cells count)",
+ httpMethod = "GET",
+ produces = "application/json",
+ response = AggregatedPM.class
+ )
+ @GetMapping(value = "/v1/pm/events/aggregatedmetrics")
+ public @ResponseBody
+ AggregatedPM retrievePMData(
+ @ApiParam(value = "aggregation period (in seconds) for which an average performance "
+ + "metrics are calculated", required = true) @RequestParam("slot") int slot,
+ @ApiParam(value = "number of aggregated performance metrics that should be returned by the method, "
+ + "one aggergated performance metric per each slot. The first performance metrics is avarage "
+ + "metrics for (startTime, startTime +slot)", required = true) @RequestParam("count") int count,
+ @ApiParam(value = "ISO 8601 time format as string (e.g., 2020-10-26T06:52:54.01+00:00) for which aggregated "
+ + "performance metrics are calculated with the pm ves data starting from startTime. "
+ + "\"+\" and \".\" signs must be properly encoded in url",
+ required = true) @RequestParam("startTime") String startTime) {
+ OffsetDateTime time = getOffsetDateTime(startTime);
+ logger.debug("Getting {} aggregated metrics for {} second slot, start time {}", count, slot, startTime);
+
+ return pmService.getAggregatedPMDataForTimeInterval(slot, count, time);
+ }
+
+ private OffsetDateTime getOffsetDateTime(String startTime) {
+ OffsetDateTime time = OffsetDateTime.parse(startTime);
+ if (time.toEpochSecond() > Instant.now().getEpochSecond()) {
+ throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "Start time can't be from future.");
+ }
+ return time;
+ }
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/controller/UEController.java b/datacollector/src/main/java/org/onap/rapp/datacollector/controller/UEController.java
new file mode 100644
index 0000000..343de0d
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/controller/UEController.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.rapp.datacollector.controller;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import java.util.Set;
+import org.onap.rapp.datacollector.entity.UEInfo;
+import org.onap.rapp.datacollector.service.UEHolder;
+import org.springframework.stereotype.Controller;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.ResponseBody;
+
+@Controller
+@Api(tags = {"RESTful APIs for DataCollector (current is PM DataCollector) R-APP mS"})
+public class UEController {
+
+ private final UEHolder holder;
+
+ public UEController(UEHolder holder) {
+ this.holder = holder;
+ }
+
+ @ApiOperation(value = "Get all user equipment from topology.",
+ notes = "Returns all user equipment from topology.",
+ httpMethod = "GET",
+ produces = "application/json",
+ response = UEInfo.class
+ )
+ @GetMapping(value = "/v1/pm/ues")
+ public @ResponseBody
+ UEInfo getUserEquipments() {
+ Set<String> ues = holder.getUes();
+ return new UEInfo(ues);
+ }
+
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/entity/DataAggregationInfo.java b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/DataAggregationInfo.java
new file mode 100644
index 0000000..8f16d9f
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/DataAggregationInfo.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.rapp.datacollector.entity;
+
+import lombok.Builder;
+import lombok.Getter;
+
+@Builder
+@Getter
+public class DataAggregationInfo {
+ int slot;
+ long startTime;
+ long endTime;
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/entity/UEInfo.java b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/UEInfo.java
new file mode 100644
index 0000000..0b2c48a
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/UEInfo.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.rapp.datacollector.entity;
+
+import java.util.Set;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+@Getter
+@Setter
+@NoArgsConstructor
+@AllArgsConstructor
+public class UEInfo {
+ Set<String> ues;
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/entity/pm/AggregatedPM.java b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/pm/AggregatedPM.java
new file mode 100644
index 0000000..840a6c6
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/pm/AggregatedPM.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.rapp.datacollector.entity.pm;
+
+import java.util.List;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+
+@Getter
+@NoArgsConstructor
+@AllArgsConstructor
+@Builder
+public class AggregatedPM {
+ List<PMData> pm;
+ int itemsLength;
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/entity/pm/PMData.java b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/pm/PMData.java
new file mode 100644
index 0000000..7edea87
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/pm/PMData.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.rapp.datacollector.entity.pm;
+
+import java.util.List;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+@Getter
+@Setter
+@NoArgsConstructor
+@AllArgsConstructor
+public class PMData {
+ String cellId;
+ List<PerformanceData> performance;
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/entity/pm/PerformanceData.java b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/pm/PerformanceData.java
new file mode 100644
index 0000000..c49f81f
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/pm/PerformanceData.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.rapp.datacollector.entity.pm;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+@Getter
+@Setter
+@NoArgsConstructor
+@AllArgsConstructor
+public class PerformanceData {
+ Integer latency;
+ Integer throughput;
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/AdditionalMeasurementValues.java b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/AdditionalMeasurementValues.java
new file mode 100644
index 0000000..2add40f
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/AdditionalMeasurementValues.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+
+package org.onap.rapp.datacollector.entity.ves;
+
+import lombok.Builder;
+import lombok.Data;
+import lombok.ToString;
+import org.springframework.data.relational.core.mapping.Column;
+import org.springframework.data.relational.core.mapping.Table;
+
+@Data
+@Builder
+@ToString
+@Table("additional_measurement_value")
+public class AdditionalMeasurementValues {
+ @Column("am_name")
+ public final String name;
+ @Column("am_key")
+ public final String parameterName;
+ @Column("am_value")
+ public final String parameterValue;
+
+ public AdditionalMeasurementValues(String name, String parameterName, String parameterValue) {
+ this.name = name;
+ this.parameterName = parameterName;
+ this.parameterValue = parameterValue;
+ }
+
+ public static AdditionalMeasurementValues of(String name, String parameterName, String parameterValue) {
+ return AdditionalMeasurementValues.builder()
+ .name(name)
+ .parameterName(parameterName)
+ .parameterValue(parameterValue)
+ .build();
+ }
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/AdditionalMeasurements.java b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/AdditionalMeasurements.java
new file mode 100644
index 0000000..10d49ff
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/AdditionalMeasurements.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+
+package org.onap.rapp.datacollector.entity.ves;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import lombok.Data;
+import lombok.ToString;
+import org.springframework.data.relational.core.mapping.Column;
+import org.springframework.data.relational.core.mapping.Table;
+
+@Data
+@ToString
+@Table("additional_measurement")
+public class AdditionalMeasurements implements Serializable {
+ final Long eventId;
+ @Column("am_name")
+ public final String name;
+ public final List<AdditionalMeasurementValues> values;
+
+ private AdditionalMeasurements(Long eventId, String name, Map<String, String> hashMap) {
+ this.eventId = eventId;
+ this.name = name;
+ this.values = hashMap.keySet()
+ .stream()
+ .map(key -> AdditionalMeasurementValues.of(this.name, key, hashMap.getOrDefault(key, ""))
+ ).collect(Collectors.toUnmodifiableList());
+ }
+
+ @JsonCreator
+ public static AdditionalMeasurements of(@JsonProperty("name") String name, @JsonProperty("hashMap") Map<String, String> hashMap) {
+ return new AdditionalMeasurements(null, name, hashMap);
+ }
+
+ public static AdditionalMeasurements of(Long eventId, String name, Map<String, String> hashMap) {
+ return new AdditionalMeasurements(eventId, name, hashMap);
+ }
+
+ public static AdditionalMeasurementsBuilder builder() {
+ return new AdditionalMeasurementsBuilder();
+ }
+
+ public static class AdditionalMeasurementsBuilder {
+ private long eventId;
+ private String name;
+ private Map<String, String> hashMap;
+
+ public AdditionalMeasurementsBuilder withEventId(long id) {
+ this.eventId = id;
+ return this;
+ }
+
+ public AdditionalMeasurementsBuilder withName(String name) {
+ this.name = name;
+ return this;
+ }
+
+ public AdditionalMeasurementsBuilder withHashMap(Map<String, String> hashMap) {
+ this.hashMap = Collections.unmodifiableMap(hashMap);
+ return this;
+ }
+
+ public AdditionalMeasurements build() {
+ return new AdditionalMeasurements(eventId, name, hashMap);
+ }
+ }
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/CommonEventHeader.java b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/CommonEventHeader.java
new file mode 100644
index 0000000..e9d3f75
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/CommonEventHeader.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+
+package org.onap.rapp.datacollector.entity.ves;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.springframework.data.annotation.Transient;
+
+@Data
+@EqualsAndHashCode
+@ToString
+@Builder
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class CommonEventHeader {
+ public final String eventType;
+ public final String version;
+ public final String sourceId;
+ public final String reportingEntityName;
+ public final Long startEpochMicrosec;
+ public final String eventId;
+ public final Long lastEpochMicrosec;
+ public final String priority;
+ public final Integer sequence;
+ public final String sourceName;
+ public final String domain;
+ public final String eventName;
+ public final String reportingEntityId;
+ public final String nfcNamingCode;
+ public final String nfNamingCode;
+ @Transient
+ public final String timeZoneOffset;
+
+ protected CommonEventHeader(String eventType, String version, String sourceId, String reportingEntityName, Long startEpochMicrosec, String eventId, Long lastEpochMicrosec, String priority, Integer sequence, String sourceName, String domain, String eventName, String reportingEntityId, String nfcNamingCode, String nfNamingCode, String timeZone) {
+ this.eventType = eventType;
+ this.version = version;
+ this.sourceId = sourceId;
+ this.reportingEntityName = reportingEntityName;
+ this.startEpochMicrosec = startEpochMicrosec;
+ this.eventId = eventId;
+ this.lastEpochMicrosec = lastEpochMicrosec;
+ this.priority = priority;
+ this.sequence = sequence;
+ this.sourceName = sourceName;
+ this.domain = domain;
+ this.eventName = eventName;
+ this.reportingEntityId = reportingEntityId;
+ this.nfcNamingCode = nfcNamingCode;
+ this.nfNamingCode = nfNamingCode;
+ this.timeZoneOffset = timeZone;
+ }
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/Event.java b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/Event.java
new file mode 100644
index 0000000..2d00636
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/Event.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+
+package org.onap.rapp.datacollector.entity.ves;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import lombok.Getter;
+import lombok.ToString;
+import org.springframework.data.annotation.Id;
+import org.springframework.data.relational.core.mapping.Column;
+import org.springframework.data.relational.core.mapping.Embedded;
+import org.springframework.data.relational.core.mapping.Table;
+
+@JsonTypeName("event")
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@Table("ves_measurement")
+@ToString
+@Getter
+public class Event {
+ @Id
+ Long id;
+
+ @Column("rawdata")
+ public volatile String raw;
+
+ @Embedded(onEmpty = Embedded.OnEmpty.USE_NULL)
+ public final CommonEventHeader commonEventHeader;
+
+ @Column("event_id")
+ public final MeasurementFields measurementFields;
+
+ protected Event(final Long id, CommonEventHeader header, MeasurementFields fields, String raw) {
+ this.id = id;
+ this.commonEventHeader = header;
+ this.measurementFields = fields;
+ this.raw = raw;
+ }
+
+ public static Event of(CommonEventHeader header, MeasurementFields fields) {
+ return new Event(null, header, fields, "");
+ }
+
+ public static Event of(final Long id, CommonEventHeader header, MeasurementFields fields) {
+ return new Event(id, header, fields, "");
+ }
+
+ public static Event of(final Long id, CommonEventHeader header, MeasurementFields fields, String raw) {
+ return new Event(id, header, fields, raw);
+ }
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/EventAPI.java b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/EventAPI.java
new file mode 100644
index 0000000..95e9bfb
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/EventAPI.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+
+package org.onap.rapp.datacollector.entity.ves;
+
+import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.GenerationType;
+import javax.persistence.Id;
+import javax.persistence.Table;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+@NoArgsConstructor
+@AllArgsConstructor
+@Getter
+@Setter
+@EqualsAndHashCode
+@Builder
+@Entity
+@Table(name = "ves_measurement")
+public class EventAPI {
+
+ @Id
+ @GeneratedValue(strategy = GenerationType.AUTO)
+ private long id;
+
+ private String rawdata;
+
+ private Long lastEpochMicrosec;
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/MeasurementFields.java b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/MeasurementFields.java
new file mode 100644
index 0000000..802aace
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/MeasurementFields.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.rapp.datacollector.entity.ves;
+
+import java.util.Collections;
+import java.util.List;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.springframework.data.relational.core.mapping.Column;
+import org.springframework.data.relational.core.mapping.Table;
+
+@Data
+@ToString
+@EqualsAndHashCode
+@Builder
+@Table("ves_measurement_fields")
+public class MeasurementFields {
+ public static final MeasurementFields EMPTY = new MeasurementFields(-1L, -1L, Collections.emptyList());
+ public final Long eventId;
+ public final long measurementInterval;
+ public final String measurementFieldsVersion = "4.0";
+
+ @Column("event_id")
+ public final List<AdditionalMeasurements> additionalMeasurements;
+
+ private MeasurementFields(Long eventId, long measurementInterval, List<AdditionalMeasurements> additionalMeasurements) {
+ this.eventId = eventId;
+ this.measurementInterval = measurementInterval;
+ this.additionalMeasurements = Collections.unmodifiableList(additionalMeasurements);
+ }
+
+ public static MeasurementFields of(Long eventId) {
+ return new MeasurementFields(eventId, -1, Collections.emptyList());
+ }
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/RawPayload.java b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/RawPayload.java
new file mode 100644
index 0000000..0a7fe03
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/RawPayload.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.rapp.datacollector.entity.ves;
+
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.springframework.data.annotation.Id;
+import org.springframework.data.relational.core.mapping.Table;
+
+@ToString
+@EqualsAndHashCode
+@Table("payload")
+public class RawPayload {
+ @Id
+ public final Long eventId;
+ public final String payload;
+
+ private RawPayload(Long eventId, String payload) {
+ this.eventId = eventId;
+ this.payload = payload;
+ }
+
+ public static class RawPayloadBuilder {
+ @Id
+ private Long eventId;
+ private String payload;
+
+ public RawPayloadBuilder withEvent(Long event) {
+ this.eventId = event;
+ return this;
+ }
+
+ public RawPayloadBuilder withPayload(String payload) {
+ this.payload = payload;
+ return this;
+ }
+
+ public RawPayload build() {
+ return new RawPayload(eventId, payload);
+ }
+ }
+
+ public static RawPayloadBuilder builder() {
+ return new RawPayloadBuilder();
+ }
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/DataAggregationService.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/DataAggregationService.java
new file mode 100644
index 0000000..aebdd4c
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/DataAggregationService.java
@@ -0,0 +1,123 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.rapp.datacollector.service;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.OptionalDouble;
+import java.util.stream.Collectors;
+import org.onap.rapp.datacollector.entity.DataAggregationInfo;
+import org.onap.rapp.datacollector.entity.pm.PMData;
+import org.onap.rapp.datacollector.entity.pm.PerformanceData;
+import org.onap.rapp.datacollector.entity.ves.AdditionalMeasurements;
+import org.onap.rapp.datacollector.entity.ves.Event;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+@Service
+public class DataAggregationService {
+
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ private static final String LATENCY_FIELD_NAME = "latency";
+ private static final String THROUGHPUT_FIELD_NAME = "throughput";
+ public static final int MEASUREMENT_INDEX = 0;
+
+ public PMData getAggregatedDataFromEventsForCell(String cellId, List<Event> events, DataAggregationInfo dataAggregationInfo) {
+ logger.info("Cell {}, events size {}", cellId, events.size());
+ Collection<List<Event>> eventsByTime = groupEventsTimeSlots(events, dataAggregationInfo);
+
+ List<PerformanceData> pmDataList = new ArrayList<>();
+ eventsByTime.forEach(slotOfEvents -> {
+ List<Integer> latencyList = getPerformanceData(slotOfEvents, LATENCY_FIELD_NAME);
+ List<Integer> throughputList = getPerformanceData(slotOfEvents, THROUGHPUT_FIELD_NAME);
+
+ Integer latencyAggregatedData = getAverage(latencyList);
+ Integer throughputAggregatedData = getAverage(throughputList);
+
+ PerformanceData pm = new PerformanceData(latencyAggregatedData, throughputAggregatedData);
+ pmDataList.add(pm);
+ });
+
+ return new PMData(cellId, pmDataList);
+ }
+
+ private Collection<List<Event>> groupEventsTimeSlots(List<Event> events, DataAggregationInfo aggregationInfo) {
+ long slotStartTime = aggregationInfo.getStartTime();
+ long slotEndTime = slotStartTime + aggregationInfo.getSlot();
+
+ List<List<Event>> eventsByTime = new ArrayList<>();
+ List<Event> eventsOfSlot = new ArrayList<>();
+
+ for (Event event : events) {
+ if (isInNextSlot(slotEndTime, event)) {
+ eventsByTime.add(eventsOfSlot);
+ eventsOfSlot = new ArrayList<>();
+
+ slotStartTime = slotEndTime;
+ slotEndTime = slotStartTime + aggregationInfo.getSlot();
+
+ while (isInNextSlot(slotEndTime, event)){
+ eventsByTime.add(Collections.emptyList());
+ slotStartTime = slotEndTime;
+ slotEndTime = slotStartTime + aggregationInfo.getSlot();
+ }
+ }
+ eventsOfSlot.add(event);
+ }
+
+ eventsByTime.add(eventsOfSlot);
+ fillEmptyEndIfNeeded(aggregationInfo, slotEndTime, eventsByTime);
+ return eventsByTime;
+ }
+
+ private boolean isInNextSlot(long slotEndTime, Event event) {
+ return event.getCommonEventHeader().getLastEpochMicrosec() > slotEndTime;
+ }
+
+ private void fillEmptyEndIfNeeded(DataAggregationInfo aggregationInfo, long slotEndTime, List<List<Event>> eventsByTime) {
+ while (slotEndTime < aggregationInfo.getEndTime()){
+ eventsByTime.add(Collections.emptyList());
+ slotEndTime = slotEndTime + aggregationInfo.getSlot();
+ }
+ }
+
+ private List<Integer> getPerformanceData(List<Event> events, String measurement) {
+ return events.stream().map(e -> getPerformanceDataFromEvent(e, measurement))
+ .collect(Collectors.toList());
+ }
+
+ private int getPerformanceDataFromEvent(Event event, String name) {
+ AdditionalMeasurements performance = event.getMeasurementFields().getAdditionalMeasurements()
+ .stream().filter(am -> am.getName().equals(name)).findAny().orElseThrow();
+ return Integer.parseInt(performance.getValues().get(MEASUREMENT_INDEX).getParameterValue());
+ }
+
+ private Integer getAverage(List<Integer> values) {
+ if (values.isEmpty()) {
+ return null;
+ }
+ OptionalDouble average = values.stream().mapToDouble(l -> l).average();
+
+ if (average.isPresent()) {
+ return (int) average.getAsDouble();
+ } else {
+ return null;
+ }
+ }
+
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/DmaapRestReader.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/DmaapRestReader.java
new file mode 100644
index 0000000..a6c0fa4
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/DmaapRestReader.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+
+package org.onap.rapp.datacollector.service;
+
+import java.util.Collection;
+
+interface DmaapRestReader {
+ Collection<String> retrieveEvents();
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/MariadbDialectResolver.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/MariadbDialectResolver.java
new file mode 100644
index 0000000..8259ac7
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/MariadbDialectResolver.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+
+package org.onap.rapp.datacollector.service;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.SQLException;
+import java.util.Locale;
+import java.util.Optional;
+import org.springframework.data.jdbc.repository.config.DialectResolver;
+import org.springframework.data.relational.core.dialect.Dialect;
+import org.springframework.data.relational.core.dialect.MySqlDialect;
+import org.springframework.jdbc.core.ConnectionCallback;
+import org.springframework.jdbc.core.JdbcOperations;
+
+public class MariadbDialectResolver implements DialectResolver.JdbcDialectProvider {
+
+ @Override
+ public Optional<Dialect> getDialect(JdbcOperations operations) {
+ return Optional.ofNullable(
+ operations.execute((ConnectionCallback<Dialect>) MariadbDialectResolver::getDialect));
+ }
+
+ private static Dialect getDialect(Connection connection) throws SQLException {
+ DatabaseMetaData metaData = connection.getMetaData();
+ String name = metaData.getDatabaseProductName().toLowerCase(Locale.ROOT);
+ if (name.contains("mariadb")) {
+ return MySqlDialect.INSTANCE;
+ }
+ return null;
+ }
+}
+
+
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/PMService.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/PMService.java
new file mode 100644
index 0000000..7039d2e
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/PMService.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.rapp.datacollector.service;
+
+import java.time.OffsetDateTime;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.onap.rapp.datacollector.entity.DataAggregationInfo;
+import org.onap.rapp.datacollector.entity.pm.AggregatedPM;
+import org.onap.rapp.datacollector.entity.pm.PMData;
+import org.onap.rapp.datacollector.entity.ves.AdditionalMeasurements;
+import org.onap.rapp.datacollector.entity.ves.Event;
+import org.onap.rapp.datacollector.entity.ves.EventAPI;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+@Service("pmService")
+public class PMService {
+
+ private static final Logger logger = LoggerFactory.getLogger(PMService.class);
+
+ public static final String CELL_FIELD_NAME = "identifier";
+ public static final int CELL_INDEX = 0;
+ private static final int MICRO_SECONDS_OF_SECOND = 1_000_000;
+
+ private final VesPersisterSqlImpl vesPersisterSql;
+ private final DataAggregationService aggregationService;
+ private final VesParser parser;
+
+ public PMService(VesPersisterSqlImpl vesPersisterSql, DataAggregationService aggregationService, VesParser parser) {
+ this.vesPersisterSql = vesPersisterSql;
+ this.aggregationService = aggregationService;
+ this.parser = parser;
+ }
+
+ public AggregatedPM getAggregatedPMDataForTimeInterval(int slot, int count, OffsetDateTime startTime) {
+ DataAggregationInfo aggregationInfo = buildDataAggregationInfo(slot, count, startTime);
+ logger.info("Start Time: {}, EndTime: {}", aggregationInfo.getStartTime(), aggregationInfo.getEndTime());
+ List<EventAPI> eventsOfInterval = vesPersisterSql.findEventsByTimeWindow(aggregationInfo.getStartTime(), aggregationInfo.getEndTime());
+ Map<String, List<Event>> eventsByCell = groupByCell(eventsOfInterval);
+ List<PMData> pmDataList = calculateAggregatedData(aggregationInfo, eventsByCell);
+ return new AggregatedPM(pmDataList, pmDataList.size());
+ }
+
+ private DataAggregationInfo buildDataAggregationInfo(int slot, int count, OffsetDateTime startTime) {
+ long timeIntervalStartTime = startTime.toEpochSecond() * MICRO_SECONDS_OF_SECOND;
+ long timeIntervalEndTime = getTimeIntervalEndTime(slot, count, timeIntervalStartTime);
+ return DataAggregationInfo.builder()
+ .startTime(timeIntervalStartTime)
+ .endTime(timeIntervalEndTime)
+ .slot(slot * MICRO_SECONDS_OF_SECOND)
+ .build();
+ }
+
+ private long getTimeIntervalEndTime(int slot, int count, long startDate) {
+ int timeIntervalMicrosec = slot * count * MICRO_SECONDS_OF_SECOND;
+ return startDate + timeIntervalMicrosec;
+ }
+
+ private Map<String, List<Event>> groupByCell(List<EventAPI> events) {
+ return events.stream().map(e -> parser.parse(e.getRawdata()))
+ .collect(Collectors.groupingBy(this::getCellFromVes));
+ }
+
+ private String getCellFromVes(Event event) {
+ AdditionalMeasurements cellField = event.getMeasurementFields().getAdditionalMeasurements()
+ .stream().filter(am -> am.getName().equals(CELL_FIELD_NAME)).findAny().orElseThrow();
+ return cellField.getValues().get(CELL_INDEX).getParameterValue();
+ }
+
+ private List<PMData> calculateAggregatedData(DataAggregationInfo dataAggregationInfo, Map<String, List<Event>> events) {
+ return events.entrySet().stream()
+ .map(e -> aggregationService.getAggregatedDataFromEventsForCell(e.getKey(), e.getValue(), dataAggregationInfo))
+ .collect(Collectors.toList());
+ }
+
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/SqlRepository.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/SqlRepository.java
new file mode 100644
index 0000000..7d7f828
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/SqlRepository.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.rapp.datacollector.service;
+
+import org.springframework.data.repository.CrudRepository;
+import org.springframework.stereotype.Repository;
+import org.onap.rapp.datacollector.entity.ves.Event;
+import org.springframework.transaction.annotation.Transactional;
+
+@Repository("repository")
+@Transactional
+public interface SqlRepository extends CrudRepository<Event, String> {
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/SqlRepositoryAPI.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/SqlRepositoryAPI.java
new file mode 100644
index 0000000..aeb0ec0
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/SqlRepositoryAPI.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.rapp.datacollector.service;
+
+import java.util.List;
+import org.onap.rapp.datacollector.entity.ves.EventAPI;
+import org.springframework.data.jpa.repository.Query;
+import org.springframework.data.repository.CrudRepository;
+import org.springframework.data.repository.query.Param;
+import org.springframework.stereotype.Repository;
+import org.springframework.transaction.annotation.Transactional;
+
+@Repository("repositoryAPI")
+@Transactional
+public interface SqlRepositoryAPI extends CrudRepository<EventAPI, Long> {
+ @Query(value = "SELECT * FROM ves_measurement order by id desc limit :limit", nativeQuery = true)
+ List<EventAPI> findTopNVesEvent(@Param("limit") int limit);
+
+ List<EventAPI> findByLastEpochMicrosecBetweenOrderByLastEpochMicrosecAsc(Long startTime, Long endTime);
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/UEHolder.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/UEHolder.java
new file mode 100644
index 0000000..a9451ef
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/UEHolder.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.rapp.datacollector.service;
+
+import org.springframework.stereotype.Service;
+import java.util.HashSet;
+import java.util.Set;
+
+@Service
+public class UEHolder {
+
+ private Set<String> ues = new HashSet<>();
+
+ public void addUE(String ue) {
+ ues.add(ue);
+ }
+
+ public Set<String> getUes(){
+ return ues;
+ }
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesParser.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesParser.java
new file mode 100644
index 0000000..9a453b4
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesParser.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+
+package org.onap.rapp.datacollector.service;
+
+import org.onap.rapp.datacollector.entity.ves.Event;
+
+public interface VesParser {
+ Event parse(final String event);
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesParserImpl.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesParserImpl.java
new file mode 100644
index 0000000..cf90bfd
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesParserImpl.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+
+package org.onap.rapp.datacollector.service;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.onap.rapp.datacollector.entity.ves.AdditionalMeasurements;
+import org.onap.rapp.datacollector.entity.ves.CommonEventHeader;
+import org.onap.rapp.datacollector.entity.ves.Event;
+import org.onap.rapp.datacollector.entity.ves.MeasurementFields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+@Service
+public class VesParserImpl implements VesParser {
+ private static final Logger logger = LoggerFactory.getLogger(VesParserImpl.class);
+
+ private static class VesEventDeserializer implements JsonDeserializer<Event> {
+ private static class AdditionalMeasurementsRawValue {
+ String name;
+ Map<String, String> hashMap;
+ }
+
+ @Override
+ public Event deserialize(JsonElement jsonElement, Type type, JsonDeserializationContext jsonDeserializationContext) throws JsonParseException {
+ JsonObject obj = jsonElement.getAsJsonObject();
+ obj = obj.getAsJsonObject("event");
+ CommonEventHeader header;
+ Optional<MeasurementFields> measurementFields = Optional.empty();
+ List<AdditionalMeasurements> additionalMeasurements = Collections.emptyList();
+ if (obj.has("commonEventHeader")) {
+ JsonObject h = obj.getAsJsonObject("commonEventHeader");
+ header = jsonDeserializationContext.deserialize(h, CommonEventHeader.class);
+ } else {
+ throw new JsonParseException("Common header not found");
+ }
+ if (obj.has("measurementFields")) {
+ JsonObject h = obj.getAsJsonObject("measurementFields");
+ measurementFields = Optional.ofNullable(jsonDeserializationContext.deserialize(h, MeasurementFields.class));
+ if (h.has("additionalMeasurements")) {
+ JsonArray arr = h.getAsJsonArray("additionalMeasurements");
+ additionalMeasurements = new ArrayList<>();
+ for (int i = 0; i < arr.size(); i++) {
+ AdditionalMeasurementsRawValue tmp = jsonDeserializationContext.deserialize(arr.get(i).getAsJsonObject(), AdditionalMeasurementsRawValue.class);
+ additionalMeasurements.add(AdditionalMeasurements.builder()
+ .withName(tmp.name)
+ .withHashMap(tmp.hashMap)
+ .build());
+ }
+ }
+ }
+ logger.trace("measurement fields {}", measurementFields);
+ logger.trace("additional measurements {}", additionalMeasurements);
+ measurementFields = Optional.of(MeasurementFields.builder()
+ .measurementInterval(measurementFields.orElse(MeasurementFields.EMPTY).measurementInterval)
+ .additionalMeasurements(additionalMeasurements)
+ .build());
+
+ return Event.of(header, measurementFields.get());
+ }
+ }
+
+ private final Gson gson = new GsonBuilder()
+ .registerTypeAdapter(Event.class, new VesEventDeserializer())
+ .create();
+
+ public Event parse(final String event) {
+ logger.debug("parsing ves event {}", event);
+ final Event result = gson.fromJson(event, Event.class);
+ result.raw = event;
+ return result;
+ }
+
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesPersister.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesPersister.java
new file mode 100644
index 0000000..391f762
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesPersister.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.rapp.datacollector.service;
+
+import java.util.List;
+import java.util.Optional;
+import org.onap.rapp.datacollector.entity.ves.Event;
+import org.onap.rapp.datacollector.entity.ves.EventAPI;
+
+public interface VesPersister {
+ void persists(Event event);
+
+ List<EventAPI> findTopNVesEvent(int n);
+
+ List<EventAPI> findAll();
+
+ Optional<EventAPI> findById(Long id);
+
+ void create(Event event);
+
+ void update(Event event, Long id);
+
+ List<EventAPI> findEventsByTimeWindow(long startTime, long endTime);
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesPersisterSqlImpl.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesPersisterSqlImpl.java
new file mode 100644
index 0000000..c30ff41
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesPersisterSqlImpl.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+
+package org.onap.rapp.datacollector.service;
+
+import java.util.List;
+import java.util.Optional;
+import org.onap.rapp.datacollector.entity.ves.Event;
+import org.onap.rapp.datacollector.entity.ves.EventAPI;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+@Service("vesPersisterSqlImpl")
+@Transactional
+public class VesPersisterSqlImpl implements VesPersister {
+ private static final Logger logger = LoggerFactory.getLogger(VesPersisterSqlImpl.class);
+
+ private final SqlRepository repository;
+ private final SqlRepositoryAPI repositoryAPI;
+
+ @Autowired
+ public VesPersisterSqlImpl(SqlRepository repository, SqlRepositoryAPI repositoryAPI) {
+ this.repository = repository;
+ this.repositoryAPI = repositoryAPI;
+ }
+
+ @Override
+ public void persists(Event event) {
+ logger.debug("persisting event {}", event);
+ repository.save(event);
+ }
+
+ @Override
+ public List<EventAPI> findTopNVesEvent(int n) {
+ logger.debug("finding top {} events", n);
+ return repositoryAPI.findTopNVesEvent(n);
+ }
+
+ @Override
+ public List<EventAPI> findAll() {
+ logger.debug("finding all event");
+ return (List<EventAPI>)repositoryAPI.findAll();
+ }
+
+ @Override
+ public Optional<EventAPI> findById(Long id) {
+ logger.debug("finding event by id {}", id);
+ return repositoryAPI.findById(id);
+ }
+
+ @Override
+ public void create(Event event) {
+ logger.debug("creating event {}", event);
+ repository.save(event);
+ }
+
+ @Override
+ public void update(Event event, Long id) {
+ if (!repository.existsById(String.valueOf(id))) {
+ throw new RuntimeException("Event not found");
+ }
+ logger.debug("updating event {} by id {}", event, id);
+ repository.save(event);
+ }
+
+ @Override
+ public List<EventAPI> findEventsByTimeWindow(long startTime, long endTime) {
+ logger.debug("finding top {} events", startTime);
+ return repositoryAPI.findByLastEpochMicrosecBetweenOrderByLastEpochMicrosecAsc(startTime, endTime);
+ }
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesRetrievalService.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesRetrievalService.java
new file mode 100644
index 0000000..517bb8b
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesRetrievalService.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.rapp.datacollector.service;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.onap.rapp.datacollector.entity.ves.AdditionalMeasurementValues;
+import org.onap.rapp.datacollector.entity.ves.AdditionalMeasurements;
+import org.onap.rapp.datacollector.entity.ves.Event;
+import org.onap.rapp.datacollector.service.configuration.DmaapRestReaderConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.ResponseEntity;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Service;
+import org.springframework.web.client.RestClientException;
+import org.springframework.web.client.RestTemplate;
+
+@Service
+public class VesRetrievalService implements DmaapRestReader {
+
+ private static final Logger logger = LoggerFactory.getLogger(VesRetrievalService.class);
+ public static final String UE_FIELD_NAME = "trafficModel";
+
+ private final RestTemplate restTemplate;
+ private final DmaapRestReaderConfiguration config;
+ private final VesParser parser;
+ private final VesPersister persister;
+ private final UEHolder ueHolder;
+
+ @Autowired
+ public VesRetrievalService(RestTemplate restTemplate, VesParser parser, VesPersister persister,
+ DmaapRestReaderConfiguration configuration, UEHolder ueHolder) {
+ this.restTemplate = restTemplate;
+ this.parser = parser;
+ this.persister = persister;
+ this.config = configuration;
+ this.ueHolder = ueHolder;
+ }
+
+ @Override
+ public Collection<String> retrieveEvents() {
+ logger.info("Reaching from dmaap: {}", config.getMeasurementsTopicUrl());
+ try {
+ ResponseEntity<String[]> responseEntity =
+ restTemplate.getForEntity(config.getMeasurementsTopicUrl(), String[].class);
+ if (responseEntity.hasBody()) {
+ String[] events = responseEntity.getBody();
+ return Arrays.stream(events).collect(Collectors.toList());
+ }
+ } catch (RestClientException ex) {
+ logger.error("Failed to reach to dmaap", ex);
+ }
+ return Collections.emptyList();
+ }
+
+ @Scheduled(fixedRate = 5000)
+ public void retrieveAndStoreVesEvents() {
+ retrieveEvents().stream().map(parser::parse).forEach(this::saveEvent);
+ }
+
+ private void saveEvent(Event event) {
+ persister.persists(event);
+ saveUesOfVes(event);
+ }
+
+ private void saveUesOfVes(Event event){
+ Set<String> uesOfVes = getUserEquipmentData(event);
+ uesOfVes.forEach(ueHolder::addUE);
+ }
+
+ private Set<String> getUserEquipmentData(Event event) {
+ Optional<AdditionalMeasurements> ues = event.getMeasurementFields().getAdditionalMeasurements()
+ .stream().filter(am -> am.getName().equals(UE_FIELD_NAME)).findAny();
+ return ues.map(additionalMeasurements -> additionalMeasurements.getValues().stream()
+ .map(AdditionalMeasurementValues::getParameterName)
+ .collect(Collectors.toSet())).orElse(Collections.emptySet());
+ }
+
+}
+
+
+
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DatabaseProperties.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DatabaseProperties.java
new file mode 100644
index 0000000..517666d
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DatabaseProperties.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.rapp.datacollector.service.configuration;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+@Getter
+@Setter
+@ConfigurationProperties(prefix = "database")
+public class DatabaseProperties {
+ private String driverClassName;
+ private String host;
+ private String port;
+ private String name;
+ private String username;
+ private String password;
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapProperties.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapProperties.java
new file mode 100644
index 0000000..adc3695
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapProperties.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.rapp.datacollector.service.configuration;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+@Getter
+@Setter
+@ConfigurationProperties(prefix = "dmaap")
+public class DmaapProperties {
+ private String protocol;
+ private String host;
+ private int port;
+ private String measurementsTopic;
+
+ public String getMeasurementsTopicUrl() {
+ return String.format("%s://%s:%d/%s", protocol, host, port, measurementsTopic);
+ }
+
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapRestReaderConfiguration.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapRestReaderConfiguration.java
new file mode 100644
index 0000000..36dee70
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapRestReaderConfiguration.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+
+package org.onap.rapp.datacollector.service.configuration;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+import javax.sql.DataSource;
+import org.apache.http.client.HttpClient;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.conn.ssl.TrustAllStrategy;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
+import org.springframework.jdbc.datasource.DataSourceTransactionManager;
+import org.springframework.jdbc.datasource.DriverManagerDataSource;
+import org.springframework.transaction.PlatformTransactionManager;
+import org.springframework.web.client.RestTemplate;
+
+@Configuration
+public class DmaapRestReaderConfiguration {
+
+ private final static class TrustAllSSLSocketFactory extends SSLSocketFactory {
+ SSLContext sslContext = SSLContext.getInstance("TLS");
+
+ public TrustAllSSLSocketFactory() throws NoSuchAlgorithmException, KeyManagementException {
+ TrustManager tm = new X509TrustManager() {
+ public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
+ }
+
+ public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
+ }
+
+ public X509Certificate[] getAcceptedIssuers() {
+ return null;
+ }
+ };
+
+ sslContext.init(null, new TrustManager[] { tm }, null);
+ }
+
+ @Override
+ public Socket createSocket(Socket socket, String host, int port, boolean autoClose) throws IOException, UnknownHostException {
+ return sslContext.getSocketFactory().createSocket(socket, host, port, autoClose);
+ }
+
+ @Override
+ public Socket createSocket() throws IOException {
+ return sslContext.getSocketFactory().createSocket();
+ }
+
+ @Override
+ public Socket createSocket(String s, int i) throws IOException, UnknownHostException {
+ return sslContext.getSocketFactory().createSocket(s, i);
+ }
+
+ @Override
+ public Socket createSocket(String s, int i, InetAddress inetAddress, int i1) throws IOException, UnknownHostException {
+ return sslContext.getSocketFactory().createSocket(s, i, inetAddress, i1);
+ }
+
+ @Override
+ public Socket createSocket(InetAddress inetAddress, int i) throws IOException {
+ return sslContext.getSocketFactory().createSocket(inetAddress, i);
+ }
+
+ @Override
+ public Socket createSocket(InetAddress inetAddress, int i, InetAddress inetAddress1, int i1) throws IOException {
+ return sslContext.getSocketFactory().createSocket(inetAddress, i , inetAddress1, i1);
+ }
+
+ @Override
+ public String[] getDefaultCipherSuites() {
+ return new String[] {"ALL"};
+ }
+
+ @Override
+ public String[] getSupportedCipherSuites() {
+ return new String[] {"ALL"};
+ }
+ }
+
+ private final DmaapProperties dmaapProperties;
+ private final DatabaseProperties databaseProperties;
+
+ @Autowired
+ public DmaapRestReaderConfiguration(DmaapProperties dmaapProperties, DatabaseProperties databaseProperties) {
+ this.dmaapProperties = dmaapProperties;
+ this.databaseProperties = databaseProperties;
+ }
+
+ public String getMeasurementsTopicUrl() {
+ return dmaapProperties.getMeasurementsTopicUrl();
+ }
+
+
+ @Bean
+ public DataSource dataSource() {
+ DriverManagerDataSource dataSource = new DriverManagerDataSource();
+ dataSource.setDriverClassName(databaseProperties.getDriverClassName());
+ dataSource.setUrl("jdbc:mysql://" + databaseProperties.getHost() + ":" + databaseProperties.getPort() + "/" + databaseProperties.getName());
+ dataSource.setUsername(databaseProperties.getUsername());
+ dataSource.setPassword(databaseProperties.getPassword());
+ return dataSource;
+ }
+
+ @Bean
+ public PlatformTransactionManager transactionManager(DataSource ds) {
+ return new DataSourceTransactionManager(dataSource());
+ }
+
+
+ @Bean
+ public RestTemplate restTemplate() {
+ SSLConnectionSocketFactory socketFactory = null;
+ try {
+ SSLContext sslContext = new SSLContextBuilder()
+ .loadTrustMaterial(null, new TrustAllStrategy())
+ .build();
+ HostnameVerifier trustAll = new HostnameVerifier() {
+ @Override
+ public boolean verify(String s, SSLSession sslSession) {
+ return true;
+ }
+ };
+ socketFactory = new SSLConnectionSocketFactory(sslContext, trustAll);
+
+ HttpClient httpClient = HttpClients.custom().setSSLSocketFactory(socketFactory).build();
+ HttpComponentsClientHttpRequestFactory httpClientFactory = new HttpComponentsClientHttpRequestFactory(httpClient);
+
+ RestTemplate template = new RestTemplate();
+ template.setRequestFactory(httpClientFactory);
+ return template;
+ } catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/SwaggerConfig.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/SwaggerConfig.java
new file mode 100644
index 0000000..e449e50
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/SwaggerConfig.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+
+package org.onap.rapp.datacollector.service.configuration;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
+import springfox.documentation.builders.ApiInfoBuilder;
+import springfox.documentation.builders.PathSelectors;
+import springfox.documentation.builders.RequestHandlerSelectors;
+import springfox.documentation.service.ApiInfo;
+import springfox.documentation.spi.DocumentationType;
+import springfox.documentation.spring.web.plugins.Docket;
+import springfox.documentation.swagger2.annotations.EnableSwagger2;
+
+@Configuration
+@EnableSwagger2
+public class SwaggerConfig implements WebMvcConfigurer {
+ @Bean
+ public Docket api() {
+ return new Docket(DocumentationType.SWAGGER_2)
+ .apiInfo(apiInfo())
+ .select()
+ .apis(RequestHandlerSelectors.basePackage("org.onap.rapp.datacollector"))
+ .paths(PathSelectors.any())
+ .build();
+ }
+
+ public ApiInfo apiInfo() {
+ final ApiInfoBuilder builder = new ApiInfoBuilder();
+ builder
+ .title("DataCollector's REST APIs (Policy Enforcement PoC)")
+ .description("DataCollector's REST interfaces for serving VES Data to other R-APPs")
+ .version("1.0.0")
+ .license("Copyright (C) 2021 Samsung Electronics");
+ return builder.build();
+ }
+} \ No newline at end of file
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/WebConfig.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/WebConfig.java
new file mode 100644
index 0000000..9196c0f
--- /dev/null
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/WebConfig.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.rapp.datacollector.service.configuration;
+
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.servlet.config.annotation.CorsRegistry;
+import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
+
+@Configuration
+public class WebConfig implements WebMvcConfigurer {
+
+ @Override
+ public void addCorsMappings(CorsRegistry registry) {
+ registry.addMapping("/**");
+ }
+}
diff --git a/datacollector/src/main/resources/META-INF/spring.factories b/datacollector/src/main/resources/META-INF/spring.factories
new file mode 100644
index 0000000..5b31f5a
--- /dev/null
+++ b/datacollector/src/main/resources/META-INF/spring.factories
@@ -0,0 +1 @@
+org.springframework.data.jdbc.repository.config.DialectResolver$JdbcDialectProvider=org.onap.rapp.datacollector.service.MariadbDialectResolver \ No newline at end of file
diff --git a/datacollector/src/main/resources/application.yml b/datacollector/src/main/resources/application.yml
new file mode 100644
index 0000000..0b8b661
--- /dev/null
+++ b/datacollector/src/main/resources/application.yml
@@ -0,0 +1,27 @@
+server:
+ port: 8087
+dmaap:
+ prtocol: "http"
+ host: "localhost"
+ port: 8181
+ measurements-topic: "measurements"
+database:
+ host: mariadb-host
+ port: 3306
+ name: "ves"
+ username: ves
+ driver-class-name: "org.mariadb.jdbc.Driver"
+logging:
+ level:
+ org:
+ springframework: DEBUG
+ logging.file.name: logs/rapp-datacollector.log
+ pattern:
+ console: "%d %-5level %logger : %msg%n"
+ file: "%d %-5level [%thread] %logger : %msg%n"
+spring:
+ autoconfigure:
+ exclude:
+ - org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration
+ main:
+ allow-bean-definition-overriding: true