java - Stateful Complex event processing with apache flink -


i want detect if 2 events happen in defined timeframe based on 2 events have same identifier. example doorevent looks this:

<doorevent>   <door>     <id>1</id>     <status>open</status>   </door>   <timestamp>12345679</timestamp> </doorevent>   <doorevent>   <door>     <id>1</id>     <status>close</status>   </door>   <timestamp>23456790</timestamp> </doorevent> 

my doorevent java class in example below has same structure.

i want detect door id 1 closes within 5 minutes of opening. try use apache flink cep library purpose. incoming stream contains open , close messages lets 20 doors.

pattern<string, ?> pattern = pattern.<string>begin("door_open").where(     new simplecondition<string>() {         private static final long serialversionuid = 1l;         public boolean filter(string doorevent) {             doorevent event = new doorevent().parseinstance(doorevent, datatype.xml);             if (event.getdoor().getstatus().equals("open")){                 // save state of door open                 return true;             }             return false;                                    }     } ) .followedbyany("door_close").where(     new simplecondition<string>() {             private static final long serialversionuid = 1l;             public boolean filter(string doorevent) throws jsonparseexception, jsonmappingexception, ioexception {                 doorevent event = new doorevent().parseinstance(doorevent, datatype.xml);                 if (event.getdoor().getstatus().equals("close")){                     // check if close of opened door                     return true;                 }                 return false;             }         } ) .within(time.minutes(5)); 

how save state of door 1 open in door_open in door_close step know door 1 one being closed , not other door?

if have flink 1.3.0 , above straightforard want do

your pattern this:

pattern.<doorevent>begin("first")         .where(new simplecondition<doorevent>() {           private static final long serialversionuid = 1390448281048961616l;            @override           public boolean filter(doorevent event) throws exception {             return event.getdoor().getstatus().equals("open");           }         })         .followedby("second")         .where(new iterativecondition<doorevent>() {           private static final long serialversionuid = -9216505110246259082l;            @override           public boolean filter(doorevent secondevent, context<doorevent> ctx) throws exception {              if (!secondevent.getdoor().getstatus().equals("close")) {               return false;             }              (doorevent firstevent : ctx.geteventsforpattern("first")) {               if (secondevent.getdoor().geteventid().equals(firstevent.getdoor().geteventid())) {                 return true;               }             }             return false;           }         })         .within(time.minutes(5)); 

so can use iterativeconditions , context first patterns matched , iterate on list while comparing 1 need , proceed want.

iterativeconditions expensive , should handled accordingly

for more information on conditions check here @ flink - conditions


Comments

Popular posts from this blog

neo4j - finding mutual friends in a cypher statement starting with three or more persons -

php - How to remove letter in front of the word laravel -

minify - Minimizing css files -