diff options
Diffstat (limited to 'components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java')
-rw-r--r-- | components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java | 130 |
1 files changed, 84 insertions, 46 deletions
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java index ace33dcc..e1da4d4d 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java @@ -19,39 +19,61 @@ */ package org.onap.datalake.feeder.domain; +import java.util.Set; import java.util.function.Predicate; -import javax.validation.constraints.NotNull; +import javax.persistence.CascadeType; +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.FetchType; +import javax.persistence.Id; +import javax.persistence.JoinColumn; +import javax.persistence.JoinTable; +import javax.persistence.ManyToMany; +import javax.persistence.ManyToOne; +import javax.persistence.Table; import org.apache.commons.lang3.StringUtils; import org.json.JSONObject; import org.onap.datalake.feeder.enumeration.DataFormat; -import org.springframework.data.annotation.Id; -import org.springframework.data.annotation.Transient; -import org.springframework.data.couchbase.core.mapping.Document; +import com.fasterxml.jackson.annotation.JsonBackReference; + +import lombok.Getter; import lombok.Setter; /** - * Domain class representing topic table in Couchbase + * Domain class representing topic * * @author Guobiao Mo * */ -@Document @Setter +@Getter +@Entity +@Table(name = "topic") public class Topic { - @NotNull @Id - private String id;//topic name + private String name;//topic name - @Transient + @ManyToOne(fetch = FetchType.EAGER, cascade = CascadeType.ALL) + @JoinColumn(name = "default_topic", nullable = true) private Topic defaultTopic; //for protected Kafka topics private String login; private String pass; + //@ManyToMany(mappedBy = "topics", cascade=CascadeType.ALL) + @JsonBackReference + //@JsonManagedReference + @ManyToMany(cascade=CascadeType.ALL, fetch=FetchType.EAGER) + @JoinTable( name = "map_db_topic", + joinColumns = { @JoinColumn(name="topic_name") }, + inverseJoinColumns = { @JoinColumn(name="db_name") } + ) + protected Set<Db> dbs; + /** * indicate if we should monitor this topic */ @@ -60,20 +82,14 @@ public class Topic { /** * save raw message text */ + @Column(name = "save_raw") private Boolean saveRaw; /** - * true: save it to Elasticsearch false: don't save null: use default - */ - private Boolean supportElasticsearch; - private Boolean supportCouchbase; - private Boolean supportDruid; - - /** - * need to explicitly tell feeder the data format of the message + * need to explicitly tell feeder the data format of the message. * support JSON, XML, YAML, TEXT */ - private DataFormat dataFormat; + private String dataFormat; /** * TTL in day @@ -81,26 +97,24 @@ public class Topic { private Integer ttl; //if this flag is true, need to correlate alarm cleared message to previous alarm + @Column(name = "correlate_cleared_message") private Boolean correlateClearedMessage; //the value in the JSON with this path will be used as DB id + @Column(name = "message_id_path") private String messageIdPath; public Topic() { } - public Topic(String id) { - this.id = id; + public Topic(String name) { + this.name = name; } - public String getId() { - return id; + public boolean isDefault() { + return "_DL_DEFAULT_".equals(name); } - public void setDefaultTopic(Topic defaultTopic) { - this.defaultTopic = defaultTopic; - } - public boolean isEnabled() { return is(enabled, Topic::isEnabled); } @@ -121,7 +135,7 @@ public class Topic { public DataFormat getDataFormat() { if (dataFormat != null) { - return dataFormat; + return DataFormat.fromString(dataFormat); } else if (defaultTopic != null) { return defaultTopic.getDataFormat(); } else { @@ -148,24 +162,51 @@ public class Topic { return is(saveRaw, Topic::isSaveRaw); } - public boolean isSupportElasticsearch() { - return is(supportElasticsearch, Topic::isSupportElasticsearch); + public boolean supportElasticsearch() { + return containDb("Elasticsearch");//TODO string hard codes + } + + public boolean supportCouchbase() { + return containDb("Couchbase"); } - public boolean isSupportCouchbase() { - return is(supportCouchbase, Topic::isSupportCouchbase); + public boolean supportDruid() { + return containDb("Druid"); } - public boolean isSupportDruid() { - return is(supportDruid, Topic::isSupportDruid); + public boolean supportMongoDB() { + return containDb("MongoDB"); } - //extract DB id from a JSON attribute, TODO support multiple attributes + private boolean containDb(String dbName) { + Db db = new Db(dbName); + + if(dbs!=null && dbs.contains(db)) { + return true; + } + + if (defaultTopic != null) { + return defaultTopic.containDb(dbName); + } else { + return false; + } + } + + //extract DB id from JSON attributes, support multiple attributes public String getMessageId(JSONObject json) { String id = null; if(StringUtils.isNotBlank(messageIdPath)) { - id = json.query(messageIdPath).toString(); + String[] paths=messageIdPath.split(","); + + StringBuilder sb= new StringBuilder(); + for(int i=0; i<paths.length; i++) { + if(i>0) { + sb.append('^'); + } + sb.append(json.query(paths[i]).toString()); + } + id = sb.toString(); } return id; @@ -173,20 +214,17 @@ public class Topic { @Override public String toString() { - return id; + return name; } - /** - * @return the messageIdPath - */ - public String getMessageIdPath() { - return messageIdPath; + @Override + public boolean equals(Object obj) { + return name.equals(((Topic)obj).getName()); } - /** - * @param messageIdPath the messageIdPath to set - */ - public void setMessageIdPath(String messageIdPath) { - this.messageIdPath = messageIdPath; + @Override + public int hashCode() { + return name.hashCode(); } + } |