The Publisher will terminate the stream in case of an error. However, if the Publisher switched by flatMap fails, the upstream before the switch does not terminate.
(This would be a big difference from RxSwift.)
Using flatMap sample
import Foundation
import Combine
let cancelable = (1...3).publisher
.handleEvents(receiveOutput: {
print("π₯· handle output: \($0)")
}, receiveCompletion: { _ in
print("π₯· handle completion:")
}, receiveCancel: {
print("π₯· handle cancel:")
})
.setFailureType(to: NSError.self)
.flatMap { value -> Future<String, NSError> in
print("π flatMap: \(value)")
return Future<String, NSError> {
guard value == 1 else {
$0(.failure(NSError(domain: "test", code: 1)))
return
}
$0(.success("\(value)"))
}
}
.sink(receiveCompletion: {
switch $0 {
case .finished:
print("π sink finished:")
case .failure(let error):
print("π sink failure:", error)
}
}, receiveValue: {
print("π sink received: \(String(describing: $0)) π")
})
This code has the following output:
π₯· handle output: 1
π flatMap: 1
π sink received: 1 π
π₯· handle output: 2
π flatMap: 2
π sink failure: Error Domain=test Code=1 "(null)"
π₯· handle output: 3
π₯· handle completion:
Notice the π₯· emoji; despite the sink failure, the output will see that the element is handled 3 and even sent to completion.
Using PasshtoughSubject & flatMap sample
Of course, using PassthroughSubject doesn't change the result.
import Foundation
import Combine
let subject = PassthroughSubject<Int, NSError>()
let cancelable = subject
.handleEvents(receiveOutput: {
print("π₯· handle output: \($0)")
}, receiveCompletion: { _ in
print("π₯· handle completion:")
}, receiveCancel: {
print("π₯· handle cancel:")
})
.flatMap { value -> Future<String, NSError> in
print("π flatMap: \(value)")
return .init {
guard value == 1 else {
$0(.failure(NSError(domain: "test", code: 1)))
return
}
$0(.success("\(value)"))
}
}
.sink(receiveCompletion: {
switch $0 {
case .finished:
print("π sink finished:")
case .failure(let error):
print("π sink failure:", error)
}
}, receiveValue: {
print("π sink received: \(String(describing: $0)) π")
})
subject.send(1)
subject.send(2)
subject.send(3)
subject.send(completion: .finished)
This code has the following output:
π₯· handle output: 1
π flatMap: 1
π sink received: 1 π
π₯· handle output: 2
π flatMap: 2
π sink failure: Error Domain=test Code=1 "(null)"
π₯· handle output: 3
π₯· handle completion:
After all, the upstream doesn't care if the downstream switched stream fails. This may cause the stream execution process to continue unnecessarily.
My guess is that to account for this, we would have to control it with subscriptions.
Top comments (0)