Gestión de múltiples cargas con ReactiveX (en iOS con Swift y Alamofire)

Estoy intentando cargar varias fotos en un server usando ReactiveX (RxSwift), recostackndo las respuestas de cada request y luego haciendo una última request para completar la presentación.

Todo parece funcionar bastante bien hasta que bash networkinguce todas las respuestas. El último subscribeNext nunca se llama. (¿Quizás no flatMap qué tan flatMap es flatMap o networkinguce ?)

Específicamente, así es como bash realizar este procedimiento.

  • Prepare un observable para codificar cada foto ( self.imageMgr es una instancia de PHCachingImageManager() )

     func getPhotoDataObservable(asset: PHAsset) -> Observable<NSData> { return create { observer in self.imageMgr.requestImageForAsset(asset, targetSize: PHImageManagerMaximumSize, contentMode: .AspectFit, options: nil, resultHandler: { (myImage, myInfo) -> Void in let data = UIImageJPEGRepresentation(myImage!, 1.0)! NSLog("Encoded photo") observer.onNext(data) self.converts += 1 if self.converts == self.userReview.photos.count { NSLog("Completed encoding photos") observer.onCompleted() } }) return NopDisposable.instance } } 
  • Prepare un observable para cargar cada foto codificada una vez (con Alamofire y RxAlamofire)

     func getPostPhotoObservable(photoData: NSData) -> Observable<ReviewPhotoObject> { return create { observer in NSLog("Uploading Photo") upload(.POST, urlRequest.URLString, headers: nil, multipartFormData: { mfd in mfd.appendBodyPart(data: photoData, name: "image", fileName: "image", mimeType: "image/jpeg") }, encodingMemoryThreshold: Manager.MultipartFormDataEncodingMemoryThreshold, encodingCompletion: { encodingResult in switch encodingResult { case .Success(let upload, _, _): upload.responseJSON(completionHandler: { (myResponse) -> Void in if let photoResponse = myResponse.result.value { let photoObject = photoResponse.objectForKey("photo")! let photo = ReviewPhotoObject() photo.photoID = photoObject.objectForKey("id")! as! NSNumber NSLog("Uploaded Photo") observer.onNext(photo) } self.uploads += 1 if self.uploads == self.userReview.photos.count { NSLog("Completed uploading photos") observer.onCompleted() } }) case .Failure(let encodingError): observer.onError(encodingError) print(encodingError) } }) return NopDisposable.instance } } 
  • Finalmente, póngalo todo junto

     func postReview(review: MyReview) { self.userReview = review _ = review.photos.toObservable().flatMap { photos in return self.getPhotoDataObservable(photos) }.flatMap { photoData in return self.getPostPhotoObservable(photoData) }.networkinguce([], { var accumulator, photo: ReviewPhotoObject) -> [Int] in accumulator.append(Int(photo.photoID)) return accumulator }).subscribeNext({ (photoIds) -> Void in print(photoIds) // Never called }) } 

Cuando se ejecuta (con 2 fotos, por ejemplo), este es el resultado:

 Encoded photo Uploading photo Encoded photo Uploading photo Completed encoding photos Uploaded photo Uploaded photo Completed uploading photos 

Pero subscribeNext nunca se llama. Debido a que la documentation sobre RxSwift específicamente es todavía un poco delgada, esperaba que alguien por aquí pudiera darme una pista sobre lo que no entiendo.

La idea aquí es que una vez que se haga un observable enviando todos los elementos que va a enviar, debería completarse. Está creando un observable para cada PHAsset y ese observable solo envía un elemento para que luego se complete. De la forma en que tenías el código, solo el último se completaba, por lo que el operador de networkinguce estaba esperando esperando que el rest se completara antes de que pudiera terminar su trabajo.

Aquí es cómo habría escrito la primera function (En Swift 3 en lugar de 2.)

 extension PHImageManager { func requestMaximumSizeImage(for asset: PHAsset) -> Observable<UIImage> { return .create { observer in let request = self.requestImage(for: asset, targetSize: PHImageManagerMaximumSize, contentMode: .aspectFit, options: nil, resultHandler: { image, info in if let image = image { observer.onNext(image) observer.onCompleted() } else if let info = info, let error = info[PHImageErrorKey] as? Error { observer.onError(error) } }) return Disposables.create { self.cancelImageRequest(request) } } } } 

Verá que lo haría una extensión de PHImageManager en lugar de una function gratuita, pero eso es solo una diferencia de estilo. Las diferencias funcionales son que mi código emitirá un error si la request subyacente falla, y cancelará la request si todos los suscriptores se liberan antes de que se complete la request. Además, no hace la conversión de JPEG. Mantenga estas operaciones pequeñas y realice la conversión JPEG dentro de un map como este:

  let imagesData = review.photos.toObservable().flatMap { self.imageMgr.requestMaximumSizeImage(for: $0) }.map { UIImageJPEGRepresentation($0, 1.0) }.filter { $0 != nil }.map { $0! } 

El código anterior solicita las imágenes del administrador, luego las convierte en datos JPEG, filtrando las que fallaron en la conversión. imagesData es un tipo Observable<Data> .

Su getPostPhotoObservable está bien, excepto por el problema completo y el hecho de que no maneja la cancelación en el desechable. Además, podría hacer que su function de publicación devuelva un Observable en lugar de envolver el resultado en un ReviewPhotoObject.

Otras advertencias:

  1. La forma en que está poniendo todo junto no garantiza que ReviewPhotoObject s esté en el mismo order en que van las fotos (porque no puede garantizar el order en que se completarán las subidas). Para solucionarlo, si es necesario , deberías usar concat lugar de flatMap .

  2. Si falla alguno de los files cargados, la tubería completa desconectará y cancelará cualquier carga posterior. Probablemente debería configurar algo para detectar los errores y hacer algo apropiado. O catchErrorJustReturn o catchError según sus requisitos.