summaryrefslogtreecommitdiffstats
path: root/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
diff options
context:
space:
mode:
Diffstat (limited to 'components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java')
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java130
1 files changed, 84 insertions, 46 deletions
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
index ace33dcc..e1da4d4d 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
@@ -19,39 +19,61 @@
*/
package org.onap.datalake.feeder.domain;
+import java.util.Set;
import java.util.function.Predicate;
-import javax.validation.constraints.NotNull;
+import javax.persistence.CascadeType;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.FetchType;
+import javax.persistence.Id;
+import javax.persistence.JoinColumn;
+import javax.persistence.JoinTable;
+import javax.persistence.ManyToMany;
+import javax.persistence.ManyToOne;
+import javax.persistence.Table;
import org.apache.commons.lang3.StringUtils;
import org.json.JSONObject;
import org.onap.datalake.feeder.enumeration.DataFormat;
-import org.springframework.data.annotation.Id;
-import org.springframework.data.annotation.Transient;
-import org.springframework.data.couchbase.core.mapping.Document;
+import com.fasterxml.jackson.annotation.JsonBackReference;
+
+import lombok.Getter;
import lombok.Setter;
/**
- * Domain class representing topic table in Couchbase
+ * Domain class representing topic
*
* @author Guobiao Mo
*
*/
-@Document
@Setter
+@Getter
+@Entity
+@Table(name = "topic")
public class Topic {
- @NotNull
@Id
- private String id;//topic name
+ private String name;//topic name
- @Transient
+ @ManyToOne(fetch = FetchType.EAGER, cascade = CascadeType.ALL)
+ @JoinColumn(name = "default_topic", nullable = true)
private Topic defaultTopic;
//for protected Kafka topics
private String login;
private String pass;
+ //@ManyToMany(mappedBy = "topics", cascade=CascadeType.ALL)
+ @JsonBackReference
+ //@JsonManagedReference
+ @ManyToMany(cascade=CascadeType.ALL, fetch=FetchType.EAGER)
+ @JoinTable( name = "map_db_topic",
+ joinColumns = { @JoinColumn(name="topic_name") },
+ inverseJoinColumns = { @JoinColumn(name="db_name") }
+ )
+ protected Set<Db> dbs;
+
/**
* indicate if we should monitor this topic
*/
@@ -60,20 +82,14 @@ public class Topic {
/**
* save raw message text
*/
+ @Column(name = "save_raw")
private Boolean saveRaw;
/**
- * true: save it to Elasticsearch false: don't save null: use default
- */
- private Boolean supportElasticsearch;
- private Boolean supportCouchbase;
- private Boolean supportDruid;
-
- /**
- * need to explicitly tell feeder the data format of the message
+ * need to explicitly tell feeder the data format of the message.
* support JSON, XML, YAML, TEXT
*/
- private DataFormat dataFormat;
+ private String dataFormat;
/**
* TTL in day
@@ -81,26 +97,24 @@ public class Topic {
private Integer ttl;
//if this flag is true, need to correlate alarm cleared message to previous alarm
+ @Column(name = "correlate_cleared_message")
private Boolean correlateClearedMessage;
//the value in the JSON with this path will be used as DB id
+ @Column(name = "message_id_path")
private String messageIdPath;
public Topic() {
}
- public Topic(String id) {
- this.id = id;
+ public Topic(String name) {
+ this.name = name;
}
- public String getId() {
- return id;
+ public boolean isDefault() {
+ return "_DL_DEFAULT_".equals(name);
}
- public void setDefaultTopic(Topic defaultTopic) {
- this.defaultTopic = defaultTopic;
- }
-
public boolean isEnabled() {
return is(enabled, Topic::isEnabled);
}
@@ -121,7 +135,7 @@ public class Topic {
public DataFormat getDataFormat() {
if (dataFormat != null) {
- return dataFormat;
+ return DataFormat.fromString(dataFormat);
} else if (defaultTopic != null) {
return defaultTopic.getDataFormat();
} else {
@@ -148,24 +162,51 @@ public class Topic {
return is(saveRaw, Topic::isSaveRaw);
}
- public boolean isSupportElasticsearch() {
- return is(supportElasticsearch, Topic::isSupportElasticsearch);
+ public boolean supportElasticsearch() {
+ return containDb("Elasticsearch");//TODO string hard codes
+ }
+
+ public boolean supportCouchbase() {
+ return containDb("Couchbase");
}
- public boolean isSupportCouchbase() {
- return is(supportCouchbase, Topic::isSupportCouchbase);
+ public boolean supportDruid() {
+ return containDb("Druid");
}
- public boolean isSupportDruid() {
- return is(supportDruid, Topic::isSupportDruid);
+ public boolean supportMongoDB() {
+ return containDb("MongoDB");
}
- //extract DB id from a JSON attribute, TODO support multiple attributes
+ private boolean containDb(String dbName) {
+ Db db = new Db(dbName);
+
+ if(dbs!=null && dbs.contains(db)) {
+ return true;
+ }
+
+ if (defaultTopic != null) {
+ return defaultTopic.containDb(dbName);
+ } else {
+ return false;
+ }
+ }
+
+ //extract DB id from JSON attributes, support multiple attributes
public String getMessageId(JSONObject json) {
String id = null;
if(StringUtils.isNotBlank(messageIdPath)) {
- id = json.query(messageIdPath).toString();
+ String[] paths=messageIdPath.split(",");
+
+ StringBuilder sb= new StringBuilder();
+ for(int i=0; i<paths.length; i++) {
+ if(i>0) {
+ sb.append('^');
+ }
+ sb.append(json.query(paths[i]).toString());
+ }
+ id = sb.toString();
}
return id;
@@ -173,20 +214,17 @@ public class Topic {
@Override
public String toString() {
- return id;
+ return name;
}
- /**
- * @return the messageIdPath
- */
- public String getMessageIdPath() {
- return messageIdPath;
+ @Override
+ public boolean equals(Object obj) {
+ return name.equals(((Topic)obj).getName());
}
- /**
- * @param messageIdPath the messageIdPath to set
- */
- public void setMessageIdPath(String messageIdPath) {
- this.messageIdPath = messageIdPath;
+ @Override
+ public int hashCode() {
+ return name.hashCode();
}
+
}