The Publisher will terminate the stream in case of an error. However, if the Publisher switched by flatMap fails, the upstream before the switch does not terminate.
(This would be a big difference from RxSwift.)
Using flatMap sample
import Foundation
import Combine
let cancelable = (1...3).publisher
.handleEvents(receiveOutput: {
print("๐ฅท handle output: \($0)")
}, receiveCompletion: { _ in
print("๐ฅท handle completion:")
}, receiveCancel: {
print("๐ฅท handle cancel:")
})
.setFailureType(to: NSError.self)
.flatMap { value -> Future<String, NSError> in
print("๐ flatMap: \(value)")
return Future<String, NSError> {
guard value == 1 else {
$0(.failure(NSError(domain: "test", code: 1)))
return
}
$0(.success("\(value)"))
}
}
.sink(receiveCompletion: {
switch $0 {
case .finished:
print("๐ sink finished:")
case .failure(let error):
print("๐ sink failure:", error)
}
}, receiveValue: {
print("๐ sink received: \(String(describing: $0)) ๐")
})
This code has the following output:
๐ฅท handle output: 1
๐ flatMap: 1
๐ sink received: 1 ๐
๐ฅท handle output: 2
๐ flatMap: 2
๐ sink failure: Error Domain=test Code=1 "(null)"
๐ฅท handle output: 3
๐ฅท handle completion:
Notice the ๐ฅท emoji; despite the sink failure, the output will see that the element is handled 3 and even sent to completion.
Using PasshtoughSubject & flatMap sample
Of course, using PassthroughSubject doesn't change the result.
import Foundation
import Combine
let subject = PassthroughSubject<Int, NSError>()
let cancelable = subject
.handleEvents(receiveOutput: {
print("๐ฅท handle output: \($0)")
}, receiveCompletion: { _ in
print("๐ฅท handle completion:")
}, receiveCancel: {
print("๐ฅท handle cancel:")
})
.flatMap { value -> Future<String, NSError> in
print("๐ flatMap: \(value)")
return .init {
guard value == 1 else {
$0(.failure(NSError(domain: "test", code: 1)))
return
}
$0(.success("\(value)"))
}
}
.sink(receiveCompletion: {
switch $0 {
case .finished:
print("๐ sink finished:")
case .failure(let error):
print("๐ sink failure:", error)
}
}, receiveValue: {
print("๐ sink received: \(String(describing: $0)) ๐")
})
subject.send(1)
subject.send(2)
subject.send(3)
subject.send(completion: .finished)
This code has the following output:
๐ฅท handle output: 1
๐ flatMap: 1
๐ sink received: 1 ๐
๐ฅท handle output: 2
๐ flatMap: 2
๐ sink failure: Error Domain=test Code=1 "(null)"
๐ฅท handle output: 3
๐ฅท handle completion:
After all, the upstream doesn't care if the downstream switched stream fails. This may cause the stream execution process to continue unnecessarily.
My guess is that to account for this, we would have to control it with subscriptions.
Top comments (0)