DEV Community

Yoshinori Imajo
Yoshinori Imajo

Posted on • Edited on

Errors in the stream switched using flatMap are not involved upstream in Combine framework.

The Publisher will terminate the stream in case of an error. However, if the Publisher switched by flatMap fails, the upstream before the switch does not terminate.

(This would be a big difference from RxSwift.)

Using flatMap sample

import Foundation
import Combine

let cancelable = (1...3).publisher
    .handleEvents(receiveOutput: {
        print("๐Ÿฅท handle output: \($0)")
    }, receiveCompletion: { _ in
        print("๐Ÿฅท handle completion:")
    }, receiveCancel: {
        print("๐Ÿฅท handle cancel:")
    })
    .setFailureType(to: NSError.self)
    .flatMap { value -> Future<String, NSError> in
        print("๐Ÿ flatMap: \(value)")
        return Future<String, NSError> {
            guard value == 1 else {
                $0(.failure(NSError(domain: "test", code: 1)))
                return
            }

            $0(.success("\(value)"))
        }
    }
    .sink(receiveCompletion: {
        switch $0 {
        case .finished:
            print("๐ŸŽ sink finished:")
        case .failure(let error):
            print("๐ŸŽ sink failure:", error)
        }
    }, receiveValue: {
        print("๐ŸŽ sink received: \(String(describing: $0)) ๐ŸŽ‰")
    })
Enter fullscreen mode Exit fullscreen mode

This code has the following output:

๐Ÿฅท handle output: 1
๐Ÿ flatMap: 1
๐ŸŽ sink received: 1 ๐ŸŽ‰
๐Ÿฅท handle output: 2
๐Ÿ flatMap: 2
๐ŸŽ sink failure: Error Domain=test Code=1 "(null)"
๐Ÿฅท handle output: 3
๐Ÿฅท handle completion:
Enter fullscreen mode Exit fullscreen mode

Notice the ๐Ÿฅท emoji; despite the sink failure, the output will see that the element is handled 3 and even sent to completion.

Using PasshtoughSubject & flatMap sample

Of course, using PassthroughSubject doesn't change the result.

import Foundation
import Combine

let subject = PassthroughSubject<Int, NSError>()

let cancelable = subject
    .handleEvents(receiveOutput: {
        print("๐Ÿฅท handle output: \($0)")
    }, receiveCompletion: { _ in
        print("๐Ÿฅท handle completion:")
    }, receiveCancel: {
        print("๐Ÿฅท handle cancel:")
    })
    .flatMap { value -> Future<String, NSError> in
        print("๐Ÿ flatMap: \(value)")
        return .init {
            guard value == 1 else {
                $0(.failure(NSError(domain: "test", code: 1)))
                return
            }

            $0(.success("\(value)"))
        }
    }
    .sink(receiveCompletion: {
        switch $0 {
        case .finished:
            print("๐ŸŽ sink finished:")
        case .failure(let error):
            print("๐ŸŽ sink failure:", error)
        }
    }, receiveValue: {
        print("๐ŸŽ sink received: \(String(describing: $0)) ๐ŸŽ‰")
    })

subject.send(1)
subject.send(2)
subject.send(3)
subject.send(completion: .finished)
Enter fullscreen mode Exit fullscreen mode

This code has the following output:

๐Ÿฅท handle output: 1
๐Ÿ flatMap: 1
๐ŸŽ sink received: 1 ๐ŸŽ‰
๐Ÿฅท handle output: 2
๐Ÿ flatMap: 2
๐ŸŽ sink failure: Error Domain=test Code=1 "(null)"
๐Ÿฅท handle output: 3
๐Ÿฅท handle completion:
Enter fullscreen mode Exit fullscreen mode

After all, the upstream doesn't care if the downstream switched stream fails. This may cause the stream execution process to continue unnecessarily.

My guess is that to account for this, we would have to control it with subscriptions.

Top comments (0)