java - Consumer does not receive messages after kafka producer/consumer restart -


we have 1 producer & 1 consumer & 1 partition. both consumer/producer spring boot applications. consumer app runs on local machine while producer along kafka & zookeeper on remote machine.

during development, redeployed producer application changes. after consumer not receiving messages. tried restarting consumer, no luck. can issue and/or how can solved?

consumer config:

spring:   cloud:     stream:       defaultbinder: kafka       bindings:         input:           destination: sales           content-type: application/json       kafka:         binder:           brokers: ${service_registry_host:127.0.0.1}           zknodes: ${service_registry_host:127.0.0.1}           defaultzkport: 2181           defaultbrokerport: 9092 server:   port: 0 

producer config:

cloud: stream:   defaultbinder: kafka   bindings:     output:       destination: sales       content-type: application/json   kafka:     binder:       brokers: ${service_registry_host:127.0.0.1}       zknodes: ${service_registry_host:127.0.0.1}       defaultzkport: 2181       defaultbrokerport: 9092 

edit2:

after 5 minutes consumer app dies following exception:

2017-09-12 18:14:47,254 error main o.s.c.s.b.k.p.kafkatopicprovisioner:253 - cannot initialize binder org.apache.kafka.common.errors.timeoutexception: timeout expired while fetching topic metadata 2017-09-12 18:14:47,255  warn main o.s.b.c.e.annotationconfigembeddedwebapplicationcontext:550 - exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.applicationcontextexception: failed start bean 'inputbindinglifecycle'; nested exception org.springframework.cloud.stream.binder.binderexception: cannot initialize binder: 2017-09-12 18:14:47,256  info main o.s.i.m.integrationmbeanexporter:449 - unregistering jmx-exposed beans on shutdown 2017-09-12 18:14:47,257  info main o.s.i.m.integrationmbeanexporter:241 - unregistering jmx-exposed beans 2017-09-12 18:14:47,257  info main o.s.i.m.integrationmbeanexporter:375 - summary on shutdown: input 2017-09-12 18:14:47,257  info main o.s.i.m.integrationmbeanexporter:375 - summary on shutdown: nullchannel 2017-09-12 18:14:47,258  info main o.s.i.m.integrationmbeanexporter:375 - summary on shutdown: errorchannel 

see if suggestion above debug reveals further information. looks getting timeout exception kafkatopicprovisioner. occurs when restart consumer assume. looks consumer has trouble communicating broker somehow , need find out whats going on there.


Comments

Popular posts from this blog

neo4j - finding mutual friends in a cypher statement starting with three or more persons -

php - How to remove letter in front of the word laravel -

minify - Minimizing css files -