java - Stateful Complex event processing with apache flink -
i want detect if 2 events happen in defined timeframe based on 2 events have same identifier. example doorevent looks this:
<doorevent> <door> <id>1</id> <status>open</status> </door> <timestamp>12345679</timestamp> </doorevent> <doorevent> <door> <id>1</id> <status>close</status> </door> <timestamp>23456790</timestamp> </doorevent> my doorevent java class in example below has same structure.
i want detect door id 1 closes within 5 minutes of opening. try use apache flink cep library purpose. incoming stream contains open , close messages lets 20 doors.
pattern<string, ?> pattern = pattern.<string>begin("door_open").where( new simplecondition<string>() { private static final long serialversionuid = 1l; public boolean filter(string doorevent) { doorevent event = new doorevent().parseinstance(doorevent, datatype.xml); if (event.getdoor().getstatus().equals("open")){ // save state of door open return true; } return false; } } ) .followedbyany("door_close").where( new simplecondition<string>() { private static final long serialversionuid = 1l; public boolean filter(string doorevent) throws jsonparseexception, jsonmappingexception, ioexception { doorevent event = new doorevent().parseinstance(doorevent, datatype.xml); if (event.getdoor().getstatus().equals("close")){ // check if close of opened door return true; } return false; } } ) .within(time.minutes(5)); how save state of door 1 open in door_open in door_close step know door 1 one being closed , not other door?
if have flink 1.3.0 , above straightforard want do
your pattern this:
pattern.<doorevent>begin("first") .where(new simplecondition<doorevent>() { private static final long serialversionuid = 1390448281048961616l; @override public boolean filter(doorevent event) throws exception { return event.getdoor().getstatus().equals("open"); } }) .followedby("second") .where(new iterativecondition<doorevent>() { private static final long serialversionuid = -9216505110246259082l; @override public boolean filter(doorevent secondevent, context<doorevent> ctx) throws exception { if (!secondevent.getdoor().getstatus().equals("close")) { return false; } (doorevent firstevent : ctx.geteventsforpattern("first")) { if (secondevent.getdoor().geteventid().equals(firstevent.getdoor().geteventid())) { return true; } } return false; } }) .within(time.minutes(5)); so can use iterativeconditions , context first patterns matched , iterate on list while comparing 1 need , proceed want.
iterativeconditions expensive , should handled accordingly
for more information on conditions check here @ flink - conditions
Comments
Post a Comment