scala - Kafka stuck on "Preparing to restabilize group "" with old generation 6 (kafka.coordinator.GroupCoordinator)" -


i'm trying run simple script on cluster (5 machines). consumes data coming 3 different topics.

this code:

val sparkconf = new sparkconf().setappname("sparkscript").set("spark.driver.allowmultiplecontexts", "true").set("spark.streaming.concurrentjobs","3").setmaster("spark://0.0.0.0:7077") val sc = new sparkcontext(sparkconf)  val ssc = new streamingcontext(sc, seconds(4))  case class thema(name: string, metadata: jobject) case class tempo(unit: string, count: int, metadata: jobject) case class spatio(unit: string, metadata: jobject) case class stt(spatial: spatio, temporal: tempo, thematic: thema) case class location(latitude: double, longitude: double, name: string)  case class data(location: location, timestamp: string, measurement: int, unit: string, accuracy: double) case class sensor(sensor_name: string, start_date: string, end_date: string, data_schema: array[string], data: data, stt: stt)   case class datas(location: location, timestamp: string, measurement: int, unit: string, accuracy: double) case class sensor2(sensor_name: string, start_date: string, end_date: string, data_schema: array[string], data: datas, stt: stt)  case class datax(location: location, timestamp: string, measurement: int, unit: string, accuracy: double) case class sensor3(sensor_name: string, start_date: string, end_date: string, data_schema: array[string], data: datax, stt: stt)  val kafkaparams = map[string, object](     "bootstrap.servers" -> "0.0.0.0:9092",     "key.deserializer" -> classof[stringdeserializer].getcanonicalname,     "value.deserializer" -> classof[stringdeserializer].getcanonicalname,     "group.id" -> "test_group",     "auto.offset.reset" -> "earliest",     "enable.auto.commit" -> (false: java.lang.boolean) )  val topics1 = array("topics1") val topics2 = array("topics2") val topics3 = array("topics3")   val stream = kafkautils.createdirectstream[string, string](ssc, preferconsistent, subscribe[string, string](topics1, kafkaparams)) val stream2 = kafkautils.createdirectstream[string, string](ssc, preferconsistent, subscribe[string, string](topics2, kafkaparams)) val stream3 = kafkautils.createdirectstream[string, string](ssc, preferconsistent, subscribe[string, string](topics3, kafkaparams))  val s1 = stream.map(record => {   implicit val formats = defaultformats   parse(record.value).extract[sensor] } ) val s2 = stream2.map(record2 => {   implicit val formats = defaultformats   parse(record2.value).extract[sensor2] } ) val s3 = stream3.map(record3 => {   implicit val formats = defaultformats   parse(record3.value).extract[sensor3] } )  s1.print() s2.print() s3.print() 

with 2 topics runs no problems. 3 topics keep on having kafka message: info [groupcoordinator 0]: preparing restabilize group test_group old generation 6 (kafka.coordinator.groupcoordinator)

i modified kafka server properties file adding information ip of machine kafka running (listeners=plaintext://0.0.0.0:9092)

any suggestions?

thanks lf


Comments

Popular posts from this blog

neo4j - finding mutual friends in a cypher statement starting with three or more persons -

php - How to remove letter in front of the word laravel -

minify - Minimizing css files -