swift - How to implement a queue of serial network calls then processing in RxSwift? -


i'm working on app want achieve following using rxswift , rxcocoa

  1. download json containing url x number of files
  2. download file 1, process file 1
  3. download file 2, process file 2
  4. download file 3, process file 3

... etc

the key here processing of each file has complete before downloading next file. @ least order of file processing must performed in order. if can start downloading file 2 while file 1 processing, awesome, not necessary.

i've tried using serialdispatchqueuescheduler make work, since files of different sizes, download of each file finishes @ different times, , therefore processing code fires in different order started downloads.

i implement without using rx using nsoperations , like, i'd keep using rx in app, it's use elsewhere in app.

below i've included snippet of code. comments have been added sake of question.

       .flatmap { [unowned self] (tasks: [difftask]) -> observable<applydiffstatus> in             return observable.from(tasks)                 .observeon(self.backgroundscheduler) // stackoverflow: backgroundscheduler serialdispatchqueuescheduler                 .flatmapwithindex({ [unowned self] (task, index) in                     return self.fetchdiff(for: task, taskindex: index, taskcount: tasks.count) // stackoverflow: downloads file url                 })                 .catcherror({ (error) -> observable<dictionaryupdater.difftaskprogress> in                     observable.onerror(error)                     throw error                 })                 .map({ (difftask : difftaskprogress) -> difftaskprogress.progress in                     // stack overflow: i've wrapped of progress observable in observable<updateprogress>                     switch difftask.progress {                     case .started(currenttask: let currenttask, taskcount: let taskcount):                         observable.on(.next(.fetchingdiff(progress: difftask, currentdiff: currenttask, diffcount: taskcount)))                     case .finished(data: _, currenttask: let currenttask, taskcount: let taskcount):                         observable.on(.next(.fetchingdiff(progress: difftask, currentdiff: currenttask, diffcount: taskcount)))                     case .progress(completion: _, currenttask: let currenttask, taskcount: let taskcount):                         observable.on(.next(.fetchingdiff(progress: difftask, currentdiff: currenttask, diffcount: taskcount)))                     }                      return difftask.progress                 })                 .flatmap({ [unowned self] (progress: difftaskprogress.progress) -> observable<applydiffstatus> in                     switch progress {                     case .finished(data: let data, currenttask: let currenttask, taskcount: let taskcount):                         return self.applydiff(data, currenttask: currenttask, taskcount: taskcount) // stackoverflow: processes file downloaded                     default:                         return observable.empty()                     }                 })         } 

i managed solve using concatmap operator instead of

            .flatmapwithindex({ [unowned self] (task, index) in                 return self.fetchdiff(for: task, taskindex: index, taskcount: tasks.count) // stackoverflow: downloads file url             }) 

the concatmap operator makes sure first observable finished before emitting more signals. had use trickery since concatmap not come concatmapwithindex, works :)


Comments

Popular posts from this blog

angular - Ionic slides - dynamically add slides before and after -

minify - Minimizing css files -

Add a dynamic header in angular 2 http provider -