scala - Kafka stuck on "Preparing to restabilize group "" with old generation 6 (kafka.coordinator.GroupCoordinator)" -
i'm trying run simple script on cluster (5 machines). consumes data coming 3 different topics.
this code:
val sparkconf = new sparkconf().setappname("sparkscript").set("spark.driver.allowmultiplecontexts", "true").set("spark.streaming.concurrentjobs","3").setmaster("spark://0.0.0.0:7077") val sc = new sparkcontext(sparkconf) val ssc = new streamingcontext(sc, seconds(4)) case class thema(name: string, metadata: jobject) case class tempo(unit: string, count: int, metadata: jobject) case class spatio(unit: string, metadata: jobject) case class stt(spatial: spatio, temporal: tempo, thematic: thema) case class location(latitude: double, longitude: double, name: string) case class data(location: location, timestamp: string, measurement: int, unit: string, accuracy: double) case class sensor(sensor_name: string, start_date: string, end_date: string, data_schema: array[string], data: data, stt: stt) case class datas(location: location, timestamp: string, measurement: int, unit: string, accuracy: double) case class sensor2(sensor_name: string, start_date: string, end_date: string, data_schema: array[string], data: datas, stt: stt) case class datax(location: location, timestamp: string, measurement: int, unit: string, accuracy: double) case class sensor3(sensor_name: string, start_date: string, end_date: string, data_schema: array[string], data: datax, stt: stt) val kafkaparams = map[string, object]( "bootstrap.servers" -> "0.0.0.0:9092", "key.deserializer" -> classof[stringdeserializer].getcanonicalname, "value.deserializer" -> classof[stringdeserializer].getcanonicalname, "group.id" -> "test_group", "auto.offset.reset" -> "earliest", "enable.auto.commit" -> (false: java.lang.boolean) ) val topics1 = array("topics1") val topics2 = array("topics2") val topics3 = array("topics3") val stream = kafkautils.createdirectstream[string, string](ssc, preferconsistent, subscribe[string, string](topics1, kafkaparams)) val stream2 = kafkautils.createdirectstream[string, string](ssc, preferconsistent, subscribe[string, string](topics2, kafkaparams)) val stream3 = kafkautils.createdirectstream[string, string](ssc, preferconsistent, subscribe[string, string](topics3, kafkaparams)) val s1 = stream.map(record => { implicit val formats = defaultformats parse(record.value).extract[sensor] } ) val s2 = stream2.map(record2 => { implicit val formats = defaultformats parse(record2.value).extract[sensor2] } ) val s3 = stream3.map(record3 => { implicit val formats = defaultformats parse(record3.value).extract[sensor3] } ) s1.print() s2.print() s3.print() with 2 topics runs no problems. 3 topics keep on having kafka message: info [groupcoordinator 0]: preparing restabilize group test_group old generation 6 (kafka.coordinator.groupcoordinator)
i modified kafka server properties file adding information ip of machine kafka running (listeners=plaintext://0.0.0.0:9092)
any suggestions?
thanks lf
Comments
Post a Comment