scala - Kafka (Re-)Joining group stuck with more than 2 topics -
i'm developing system uses kafka messaging pub/sub tool.
data generated scala script:
val kafkaparams = new properties() kafkaparams.put("bootstrap.servers", "localhost:9092") kafkaparams.put("key.serializer", "org.apache.kafka.common.serialization.stringserializer") kafkaparams.put("value.serializer", "org.apache.kafka.common.serialization.stringserializer") kafkaparams.put("group.id", "test_luca") //kafka producer val producer = new kafkaproducer[string, string](kafkaparams) //source list val s1 = new java.util.timer() val tasks1 = new java.util.timertask { def run() = { val date = new java.util.date val date2 = date.gettime() val send = ""+ date2 + ", 45.1234, 12.5432, 4.5, 3.0" val data = new producerrecord[string,string]("topic_s1", send) producer.send(data) } } s1.schedule(tasks1, 1000l, 1000l) val s2 = new java.util.timer() val tasks2 = new java.util.timertask { def run() = { val date = new java.util.date val date2 = date.gettime() val send = ""+ date2 + ", 1.111, 9.999, 10.4, 10.0" val data = new producerrecord[string,string]("topic_s2", send) producer.send(data) } } s2.schedule(tasks2, 2000l, 2000l)
i need test kafka performances in particular situations. in 1 case have other script consume data topics "topic_s1" , "topic_s2", elaborate them , produce new data different topics (topic_s1b , topic_s2b). subsequently these elaborated datum consumed apache spark streaming script.
if omit consumer/producer script (i have 1 kafka producer 2 topics , spark script) works fine.
if use full configuration (1 kafka producer 2 topics, "middleware" script consume data kafka producer, elaborate them , produce new data new topics, 1 spark script consume data new topics) spark streaming script stuck on info abstractcoordinator: (re-)joining group test_luca
i'm running locally , not make modifications kafka , zookeeper configurations.
any suggestions?
update: spark script:
val sparkconf = new sparkconf().setappname("sparkscript").set("spark.driver.allowmultiplecontexts", "true").setmaster("local[2]") val sc = new sparkcontext(sparkconf) val ssc = new streamingcontext(sc, seconds(4)) case class thema(name: string, metadata: jobject) case class tempo(unit: string, count: int, metadata: jobject) case class spatio(unit: string, metadata: jobject) case class stt(spatial: spatio, temporal: tempo, thematic: thema) case class location(latitude: double, longitude: double, name: string) case class data(location: location, timestamp: long, measurement: int, unit: string, accuracy: double) case class sensor(sensor_name: string, start_date: string, end_date: string, data_schema: array[string], data: data, stt: stt) case class datas(location: location, timestamp: long, measurement: int, unit: string, accuracy: double) case class sensor2(sensor_name: string, start_date: string, end_date: string, data_schema: array[string], data: datas, stt: stt) val kafkaparams = map[string, object]( "bootstrap.servers" -> "localhost:9092", "key.deserializer" -> classof[stringdeserializer].getcanonicalname, "value.deserializer" -> classof[stringdeserializer].getcanonicalname, "group.id" -> "test_luca", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.boolean) ) val topics1 = array("topics1") val topics2 = array("topics2") val stream = kafkautils.createdirectstream[string, string](ssc, preferconsistent, subscribe[string, string](topics1, kafkaparams)) val stream2 = kafkautils.createdirectstream[string, string](ssc, preferconsistent, subscribe[string, string](topics2, kafkaparams)) val s1 = stream.map(record => { implicit val formats = defaultformats parse(record.value).extract[sensor] } ) val s2 = stream2.map(record => { implicit val formats = defaultformats parse(record.value).extract[sensor2] } ) val f1 = s1.map { x => x.sensor_name } f1.print() val f2 = s2.map { x => x.sensor_name } f2.print()
thanks luca
maybe should change group.id spark streaming script. guess "middleware" script's consumer has same group.id spark streaming script's consumer. terrible thing happen.
in kafka, consumer group real subscriber topic, consumer in group splitting worker, in case, should use different group.id in middleware script consumer , spark streaming script consumer.
in first try not have middle script, works because this.
Comments
Post a Comment