scala - Kafka (Re-)Joining group stuck with more than 2 topics -


i'm developing system uses kafka messaging pub/sub tool.

data generated scala script:

val kafkaparams = new properties()     kafkaparams.put("bootstrap.servers", "localhost:9092")     kafkaparams.put("key.serializer", "org.apache.kafka.common.serialization.stringserializer")     kafkaparams.put("value.serializer", "org.apache.kafka.common.serialization.stringserializer")     kafkaparams.put("group.id", "test_luca")      //kafka producer     val producer = new kafkaproducer[string, string](kafkaparams)      //source list     val s1 = new java.util.timer()     val tasks1 = new java.util.timertask {         def run() = {             val date = new java.util.date             val date2 = date.gettime()             val send = ""+ date2 + ", 45.1234, 12.5432, 4.5, 3.0"             val data = new producerrecord[string,string]("topic_s1", send)             producer.send(data)         }     }     s1.schedule(tasks1, 1000l, 1000l)      val s2 = new java.util.timer()     val tasks2 = new java.util.timertask {         def run() = {             val date = new java.util.date             val date2 = date.gettime()             val send = ""+ date2 + ", 1.111, 9.999, 10.4, 10.0"             val data = new producerrecord[string,string]("topic_s2", send)             producer.send(data)         }     }     s2.schedule(tasks2, 2000l, 2000l) 

i need test kafka performances in particular situations. in 1 case have other script consume data topics "topic_s1" , "topic_s2", elaborate them , produce new data different topics (topic_s1b , topic_s2b). subsequently these elaborated datum consumed apache spark streaming script.

if omit consumer/producer script (i have 1 kafka producer 2 topics , spark script) works fine.

if use full configuration (1 kafka producer 2 topics, "middleware" script consume data kafka producer, elaborate them , produce new data new topics, 1 spark script consume data new topics) spark streaming script stuck on info abstractcoordinator: (re-)joining group test_luca

i'm running locally , not make modifications kafka , zookeeper configurations.

any suggestions?

update: spark script:

val sparkconf = new sparkconf().setappname("sparkscript").set("spark.driver.allowmultiplecontexts", "true").setmaster("local[2]") val sc = new sparkcontext(sparkconf)  val ssc = new streamingcontext(sc, seconds(4))  case class thema(name: string, metadata: jobject) case class tempo(unit: string, count: int, metadata: jobject) case class spatio(unit: string, metadata: jobject) case class stt(spatial: spatio, temporal: tempo, thematic: thema) case class location(latitude: double, longitude: double, name: string)  case class data(location: location, timestamp: long, measurement: int, unit: string, accuracy: double) case class sensor(sensor_name: string, start_date: string, end_date: string, data_schema: array[string], data: data, stt: stt)   case class datas(location: location, timestamp: long, measurement: int, unit: string, accuracy: double) case class sensor2(sensor_name: string, start_date: string, end_date: string, data_schema: array[string], data: datas, stt: stt)   val kafkaparams = map[string, object](     "bootstrap.servers" -> "localhost:9092",     "key.deserializer" -> classof[stringdeserializer].getcanonicalname,     "value.deserializer" -> classof[stringdeserializer].getcanonicalname,     "group.id" -> "test_luca",     "auto.offset.reset" -> "latest",     "enable.auto.commit" -> (false: java.lang.boolean) )  val topics1 = array("topics1") val topics2 = array("topics2")  val stream = kafkautils.createdirectstream[string, string](ssc, preferconsistent, subscribe[string, string](topics1, kafkaparams)) val stream2 = kafkautils.createdirectstream[string, string](ssc, preferconsistent, subscribe[string, string](topics2, kafkaparams))  val s1 = stream.map(record => {   implicit val formats = defaultformats   parse(record.value).extract[sensor] } ) val s2 = stream2.map(record => {   implicit val formats = defaultformats   parse(record.value).extract[sensor2] } )  val f1 = s1.map { x => x.sensor_name } f1.print() val f2 = s2.map { x => x.sensor_name } f2.print() 

thanks luca

maybe should change group.id spark streaming script. guess "middleware" script's consumer has same group.id spark streaming script's consumer. terrible thing happen.

in kafka, consumer group real subscriber topic, consumer in group splitting worker, in case, should use different group.id in middleware script consumer , spark streaming script consumer.

in first try not have middle script, works because this.


Comments

Popular posts from this blog

angular - Ionic slides - dynamically add slides before and after -

minify - Minimizing css files -

Add a dynamic header in angular 2 http provider -