kafka streams simulate join within the stream -


need suggestions on best approach solve problem -

i developing kafka stream processing application source log stream contains 2 types of messages - employeeinfo , departmentinfo. not have control on log stream, cannot change schema or how written.

following schema of messages -

employeeinfo schema {employeeid, employeename, departmentid}

departmentinfo schema {departmentid, departmentname}

i need build log stream kinda simulates kafka stream join, merges these 2 types of message type follow -

source ts 0 = employeeinfo { 1, first employee, 10 }

source ts 1 = employeeinfo { 2, second employee, 20 }

source ts 2 = employeeinfo { 3, third employee, 10 }

source ts 3 = departmentinfo { 10, marketing }

here, matched departmentinfo message departmentid=10, kafka processor has merge employeeinfo messages departmentid=10 , forward sink processor.

sink stream: message 0 => employeedepartmentinfo { 1, first employee, 10, marketing }

sink stream: message 1 => employeedepartmentinfo { 2, third employee, 10, marketing }

now new departmentinfo observed in source stream.

source ts 4 = departmentinfo { 20, engineering }

here, same above. employeeinfo messages departmentid=20 have generated.

sink ts 2 => employeedepartmentinfo { 2, second employee, 20, engineering }

source ts 5 = departmentinfo { 10, sales }

here, departmentname has been updated. employeeinfo messages departmentid=10 have written.

sink stream: message 3 => employeedepartmentinfo { 1, first employee, 10, sales }

sink stream: message 4 => employeedepartmentinfo { 2, third employee, 10, sales }

the approach have taken write custom processor extending abstractprocessor has 2 state stores , invoke context().forward(...) whenever there match on either side.

i interested if suggest better alternatives. thanks.


Comments

Popular posts from this blog

angular - Ionic slides - dynamically add slides before and after -

minify - Minimizing css files -

Add a dynamic header in angular 2 http provider -