File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -18,6 +18,14 @@ USERVER_NAMESPACE_BEGIN
1818
1919namespace kafka {
2020
21+ namespace {
22+
23+ std::string TaskProcessorName (std::string_view name, const components::ComponentConfig& config) {
24+ return config[name].As <std::string>(name);
25+ }
26+
27+ } // namespace
28+
2129ConsumerComponent::ConsumerComponent (
2230 const components::ComponentConfig& config,
2331 const components::ComponentContext& context
@@ -26,9 +34,9 @@ ConsumerComponent::ConsumerComponent(
2634 consumer_ (
2735 config.Name(),
2836 config["topics"].As<std::vector<std::string>>(),
29- context.GetTaskProcessor(" consumer-task-processor" ),
30- context.GetTaskProcessor(" consumer-blocking-task-processor" ),
31- context.GetTaskProcessor(" main-task-processor" ),
37+ context.GetTaskProcessor(TaskProcessorName( " consumer-task-processor" , config) ),
38+ context.GetTaskProcessor(TaskProcessorName( " consumer-blocking-task-processor" , config) ),
39+ context.GetTaskProcessor(config[ " task-processor " ].As<std::string>( " main-task-processor" ) ),
3240 config.As<impl::ConsumerConfiguration>(),
3341 context.FindComponent<components::Secdist>()
3442 .Get()
Original file line number Diff line number Diff line change @@ -2,6 +2,18 @@ type: object
22description : Kafka consumer component
33additionalProperties : false
44properties :
5+ task-processor :
6+ type : string
7+ description : the name of the TaskProcessor to invoke callbacks
8+ default : main-task-processor
9+ consumer-task-processor :
10+ type : string
11+ description : TaskProcessor for message batches polling. Feel free to use 'main-task-processor'.
12+ default : consumer-task-processor
13+ consumer-blocking-task-processor :
14+ type : string
15+ description : TaskProcessor for consumer blocking operations. Feel free to use 'fs-task-processor'.
16+ default : consumer-blocking-task-processor
517 group_id :
618 type : string
719 description : |
You can’t perform that action at this time.
0 commit comments