summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorwasala <przemyslaw.wasala@nokia.com>2018-06-22 20:11:20 +0200
committerwasala <przemyslaw.wasala@nokia.com>2018-08-06 08:51:56 +0200
commit2cfcc6756e59ed8cda571efa8b29764eab7837c8 (patch)
treee99375044d7ca9a9cac0962e459cafd5f580b094
parenta684d478f8b81bba83123d4f1fd1ec3c29df73ca (diff)
Added reactive tasks flow control
Change-Id: I9cb2bede66e9e446912f2e6a815c7b56b80813b9 Issue-ID: DCAEGEN2-557 Signed-off-by: wasala <przemyslaw.wasala@nokia.com>
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIConsumerTask.java4
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIConsumerTaskImpl.java13
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTask.java5
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTaskImpl.java16
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java4
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java19
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java4
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java13
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java34
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/Task.java42
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskSpy.java2
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapProducerTaskSpy.java2
12 files changed, 41 insertions, 117 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIConsumerTask.java
index 784bc5af..df8330f4 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIConsumerTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIConsumerTask.java
@@ -25,9 +25,11 @@ import org.onap.dcaegen2.services.prh.exceptions.AAINotFoundException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.service.AAIConsumerClient;
-public abstract class AAIConsumerTask<R, S, C> extends Task<R, S, C> {
+public abstract class AAIConsumerTask {
abstract Optional<String> consume(ConsumerDmaapModel message) throws AAINotFoundException;
abstract AAIConsumerClient resolveClient();
+
+ abstract protected String execute(ConsumerDmaapModel consumerDmaapModel) throws AAINotFoundException;
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIConsumerTaskImpl.java
index 4c35b2ee..c545a1be 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIConsumerTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIConsumerTaskImpl.java
@@ -25,7 +25,6 @@ import org.onap.dcaegen2.services.prh.config.AAIClientConfiguration;
import org.onap.dcaegen2.services.prh.configuration.AppConfig;
import org.onap.dcaegen2.services.prh.configuration.Config;
import org.onap.dcaegen2.services.prh.exceptions.AAINotFoundException;
-import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.service.AAIConsumerClient;
import org.slf4j.Logger;
@@ -34,8 +33,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
-public class AAIConsumerTaskImpl extends
- AAIConsumerTask<ConsumerDmaapModel, String, AAIClientConfiguration> {
+public class AAIConsumerTaskImpl extends AAIConsumerTask {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -59,14 +57,6 @@ public class AAIConsumerTaskImpl extends
}
@Override
- protected void receiveRequest(ConsumerDmaapModel body) throws PrhTaskException {
- String response = execute(body);
- if (taskProcess != null && response != null && !response.isEmpty()) {
- taskProcess.receiveRequest(response);
- }
- }
-
- @Override
public String execute(ConsumerDmaapModel consumerDmaapModel) throws AAINotFoundException {
consumerDmaapModel = Optional.ofNullable(consumerDmaapModel)
.orElseThrow(() -> new AAINotFoundException("Invoked null object to AAI task"));
@@ -75,7 +65,6 @@ public class AAIConsumerTaskImpl extends
return consume(consumerDmaapModel).orElseThrow(() -> new AAINotFoundException("Null response code"));
}
- @Override
protected AAIClientConfiguration resolveConfiguration() {
return prhAppConfig.getAAIClientConfiguration();
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTask.java
index c7bde032..abd04640 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTask.java
@@ -20,15 +20,18 @@
package org.onap.dcaegen2.services.prh.tasks;
import org.onap.dcaegen2.services.prh.exceptions.AAINotFoundException;
+import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.service.AAIProducerClient;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
*/
-public abstract class AAIProducerTask<R, S, C> extends Task<R, S, C> {
+public abstract class AAIProducerTask/*<R, S, C> extends Task<R, S, C> */ {
abstract ConsumerDmaapModel publish(ConsumerDmaapModel message) throws AAINotFoundException;
abstract AAIProducerClient resolveClient();
+
+ abstract protected ConsumerDmaapModel execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException;
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTaskImpl.java
index b637bb29..005d08d1 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AAIProducerTaskImpl.java
@@ -19,11 +19,12 @@
*/
package org.onap.dcaegen2.services.prh.tasks;
+import java.net.URISyntaxException;
+import java.util.Optional;
import org.onap.dcaegen2.services.prh.config.AAIClientConfiguration;
import org.onap.dcaegen2.services.prh.configuration.AppConfig;
import org.onap.dcaegen2.services.prh.configuration.Config;
import org.onap.dcaegen2.services.prh.exceptions.AAINotFoundException;
-import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.service.AAIProducerClient;
import org.onap.dcaegen2.services.prh.service.HttpUtils;
@@ -32,15 +33,12 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import java.net.URISyntaxException;
-import java.util.Optional;
-
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
*/
@Component
public class AAIProducerTaskImpl extends
- AAIProducerTask<ConsumerDmaapModel, ConsumerDmaapModel, AAIClientConfiguration> {
+ AAIProducerTask {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -66,14 +64,6 @@ public class AAIProducerTaskImpl extends
}
@Override
- protected void receiveRequest(ConsumerDmaapModel body) throws PrhTaskException {
- ConsumerDmaapModel response = execute(body);
- if (taskProcess != null && response != null) {
- taskProcess.receiveRequest(response);
- }
- }
-
- @Override
public ConsumerDmaapModel execute(ConsumerDmaapModel consumerDmaapModel) throws AAINotFoundException {
consumerDmaapModel = Optional.ofNullable(consumerDmaapModel)
.orElseThrow(() -> new AAINotFoundException("Invoked null object to AAI task"));
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
index 3e36bcdd..56b678a3 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
@@ -26,11 +26,13 @@ import org.onap.dcaegen2.services.prh.service.consumer.ExtendedDmaapConsumerHttp
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
*/
-abstract class DmaapConsumerTask<R, S, C> extends Task<R, S, C> {
+abstract class DmaapConsumerTask /*<R, S, C> extends Task<R, S, C>*/ {
abstract ConsumerDmaapModel consume(String message) throws PrhTaskException;
abstract ExtendedDmaapConsumerHttpClientImpl resolveClient();
abstract void initConfigs();
+
+ abstract protected ConsumerDmaapModel execute(String object) throws PrhTaskException;
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
index 43eb9eaa..e72939cf 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
@@ -23,7 +23,6 @@ import java.util.Optional;
import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
import org.onap.dcaegen2.services.prh.configuration.AppConfig;
import org.onap.dcaegen2.services.prh.configuration.Config;
-import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
@@ -38,8 +37,7 @@ import org.springframework.stereotype.Component;
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
*/
@Component
-public class DmaapConsumerTaskImpl extends
- DmaapConsumerTask<String, ConsumerDmaapModel, DmaapConsumerConfiguration> {
+public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Config prhAppConfig;
@@ -67,20 +65,6 @@ public class DmaapConsumerTaskImpl extends
}
@Override
- protected void receiveRequest(String body) throws PrhTaskException {
- try {
- ConsumerDmaapModel response = execute(body);
- if (taskProcess != null && response != null) {
- taskProcess.receiveRequest(response);
- }
- } catch (DmaapEmptyResponseException e) {
- logger.warn("Nothing to consume from DmaaP {} topic.",
- resolveConfiguration().dmaapTopicName());
- }
-
- }
-
- @Override
public ConsumerDmaapModel execute(String object) throws PrhTaskException {
extendedDmaapConsumerHttpClient = resolveClient();
logger.trace("Method called with arg {}", object);
@@ -93,7 +77,6 @@ public class DmaapConsumerTaskImpl extends
prhAppConfig.initFileStreamReader();
}
- @Override
protected DmaapConsumerConfiguration resolveConfiguration() {
return prhAppConfig.getDmaapConsumerConfiguration();
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
index ba8e6e45..bd9a8744 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
@@ -26,9 +26,11 @@ import org.onap.dcaegen2.services.prh.service.producer.ExtendedDmaapProducerHttp
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
*/
-abstract class DmaapPublisherTask<R, S, C> extends Task<R, S, C> {
+abstract class DmaapPublisherTask {
abstract Integer publish(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException;
abstract ExtendedDmaapProducerHttpClientImpl resolveClient();
+
+ abstract protected Integer execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException;
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
index 1a522921..7cbeb3b3 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
@@ -24,7 +24,6 @@ import org.onap.dcaegen2.services.prh.config.DmaapPublisherConfiguration;
import org.onap.dcaegen2.services.prh.configuration.AppConfig;
import org.onap.dcaegen2.services.prh.configuration.Config;
import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
-import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.service.producer.ExtendedDmaapProducerHttpClientImpl;
import org.slf4j.Logger;
@@ -37,8 +36,7 @@ import org.springframework.stereotype.Component;
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
*/
@Component
-public class DmaapPublisherTaskImpl extends
- DmaapPublisherTask<ConsumerDmaapModel, Integer, DmaapPublisherConfiguration> {
+public class DmaapPublisherTaskImpl extends DmaapPublisherTask {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Config prhAppConfig;
@@ -59,14 +57,6 @@ public class DmaapPublisherTaskImpl extends
}
@Override
- protected void receiveRequest(ConsumerDmaapModel body) throws PrhTaskException {
- Integer response = execute(body);
- if (taskProcess != null && response != null) {
- taskProcess.receiveRequest(response);
- }
- }
-
- @Override
public Integer execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException {
consumerDmaapModel = Optional.ofNullable(consumerDmaapModel)
.orElseThrow(() -> new DmaapNotFoundException("Invoked null object to Dmaap task"));
@@ -75,7 +65,6 @@ public class DmaapPublisherTaskImpl extends
return publish(consumerDmaapModel);
}
- @Override
protected DmaapPublisherConfiguration resolveConfiguration() {
return prhAppConfig.getDmaapPublisherConfiguration();
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
index f7767101..addeaae2 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
@@ -19,11 +19,12 @@
*/
package org.onap.dcaegen2.services.prh.tasks;
-import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import reactor.core.Disposable;
+import reactor.core.publisher.Mono;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
@@ -46,19 +47,24 @@ public class ScheduledTasks {
}
public void scheduleMainPrhEventTask() {
- logger.trace("Execution of task was registered");
- setTaskExecutionFlow();
- try {
- dmaapConsumerTask.initConfigs();
- dmaapConsumerTask.receiveRequest("");
- } catch (PrhTaskException e) {
- logger
- .warn("Chain of tasks have been aborted, because some errors occur in prh workflow ", e);
- }
- }
+ logger.trace("Execution of tasks was registered");
- private void setTaskExecutionFlow() {
- dmaapConsumerTask.setNext(aaiProducerTask);
- aaiProducerTask.setNext(dmaapProducerTask);
+ Mono.fromSupplier(() -> Mono.fromCallable(() ->
+ {
+ dmaapConsumerTask.initConfigs();
+ return dmaapConsumerTask.execute("");
+ }).subscribe(consumerDmaapModel -> Mono
+ .fromCallable(() -> aaiProducerTask.execute(consumerDmaapModel))
+ .subscribe(
+ aaiConsumerDmaapModel -> Mono.fromCallable(() -> dmaapProducerTask.execute(aaiConsumerDmaapModel))
+ .subscribe(resp -> logger.info("Message was published to DmaaP, response code: {}", resp),
+ error -> logger.warn("Error has been thrown in DmaapProduerTask: {}", error),
+ () -> logger.info("Completed DmaapPublisher task"))),
+ errorResponse -> logger
+ .warn("Error has been thrown in AAIProducerTask: {}", errorResponse)
+ , () -> logger.info("Completed AAIProducer task")))
+ .subscribe(Disposable::dispose, tasksError -> logger
+ .warn("Chain of tasks have been aborted, because some errors occur in PRH workflow ", tasksError)
+ , () -> logger.info("PRH tasks was consumed properly")).dispose();
}
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/Task.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/Task.java
deleted file mode 100644
index e2b11fdc..00000000
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/Task.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * PNF-REGISTRATION-HANDLER
- * ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcaegen2.services.prh.tasks;
-
-import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
- */
-
-
-public abstract class Task<R, S, C> {
-
- Task taskProcess;
-
- abstract protected void receiveRequest(R body) throws PrhTaskException;
-
- abstract protected S execute(R object) throws PrhTaskException;
-
- abstract protected C resolveConfiguration();
-
- void setNext(Task task) {
- this.taskProcess = task;
- }
-}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskSpy.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskSpy.java
index 60e1bd54..5736afeb 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskSpy.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskSpy.java
@@ -41,7 +41,7 @@ public class DmaapConsumerTaskSpy {
@Bean
@Primary
- public Task registerSimpleDmaapConsumerTask() {
+ public DmaapConsumerTask registerSimpleDmaapConsumerTask() {
AppConfig appConfig = spy(AppConfig.class);
doReturn(mock(DmaapConsumerConfiguration.class)).when(appConfig).getDmaapConsumerConfiguration();
DmaapConsumerTaskImpl dmaapConsumerTask = spy(new DmaapConsumerTaskImpl(appConfig));
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapProducerTaskSpy.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapProducerTaskSpy.java
index b2b97cf0..01056606 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapProducerTaskSpy.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapProducerTaskSpy.java
@@ -39,7 +39,7 @@ public class DmaapProducerTaskSpy {
@Bean
@Primary
- public Task registerSimpleDmaapPublisherTask() {
+ public DmaapPublisherTask registerSimpleDmaapPublisherTask() {
AppConfig appConfig = spy(AppConfig.class);
doReturn(mock(DmaapPublisherConfiguration.class)).when(appConfig).getDmaapPublisherConfiguration();
DmaapPublisherTaskImpl dmaapPublisherTask = spy(new DmaapPublisherTaskImpl(appConfig));