swift - How to implement a queue of serial network calls then processing in RxSwift? -
i'm working on app want achieve following using rxswift , rxcocoa
- download json containing url x number of files
- download file 1, process file 1
- download file 2, process file 2
- download file 3, process file 3
... etc
the key here processing of each file has complete before downloading next file. @ least order of file processing must performed in order. if can start downloading file 2 while file 1 processing, awesome, not necessary.
i've tried using serialdispatchqueuescheduler make work, since files of different sizes, download of each file finishes @ different times, , therefore processing code fires in different order started downloads.
i implement without using rx using nsoperations , like, i'd keep using rx in app, it's use elsewhere in app.
below i've included snippet of code. comments have been added sake of question.
.flatmap { [unowned self] (tasks: [difftask]) -> observable<applydiffstatus> in return observable.from(tasks) .observeon(self.backgroundscheduler) // stackoverflow: backgroundscheduler serialdispatchqueuescheduler .flatmapwithindex({ [unowned self] (task, index) in return self.fetchdiff(for: task, taskindex: index, taskcount: tasks.count) // stackoverflow: downloads file url }) .catcherror({ (error) -> observable<dictionaryupdater.difftaskprogress> in observable.onerror(error) throw error }) .map({ (difftask : difftaskprogress) -> difftaskprogress.progress in // stack overflow: i've wrapped of progress observable in observable<updateprogress> switch difftask.progress { case .started(currenttask: let currenttask, taskcount: let taskcount): observable.on(.next(.fetchingdiff(progress: difftask, currentdiff: currenttask, diffcount: taskcount))) case .finished(data: _, currenttask: let currenttask, taskcount: let taskcount): observable.on(.next(.fetchingdiff(progress: difftask, currentdiff: currenttask, diffcount: taskcount))) case .progress(completion: _, currenttask: let currenttask, taskcount: let taskcount): observable.on(.next(.fetchingdiff(progress: difftask, currentdiff: currenttask, diffcount: taskcount))) } return difftask.progress }) .flatmap({ [unowned self] (progress: difftaskprogress.progress) -> observable<applydiffstatus> in switch progress { case .finished(data: let data, currenttask: let currenttask, taskcount: let taskcount): return self.applydiff(data, currenttask: currenttask, taskcount: taskcount) // stackoverflow: processes file downloaded default: return observable.empty() } }) }
i managed solve using concatmap operator instead of
.flatmapwithindex({ [unowned self] (task, index) in return self.fetchdiff(for: task, taskindex: index, taskcount: tasks.count) // stackoverflow: downloads file url })
the concatmap operator makes sure first observable finished before emitting more signals. had use trickery since concatmap not come concatmapwithindex, works :)
Comments
Post a Comment