Refactor PollHistoryService

This commit is contained in:
Alfonso Grillo
2023-01-23 12:04:24 +01:00
parent 9cc9a046fe
commit 9b16774d6a
5 changed files with 130 additions and 137 deletions
@@ -22,25 +22,22 @@ final class PollHistoryService: PollHistoryServiceProtocol {
private let room: MXRoom
private let timeline: MXEventTimeline
private let chunkSizeInDays: UInt
private let pollsSubject: PassthroughSubject<TimelinePollDetails, Never> = .init()
private let errorSubject: PassthroughSubject<PollHistoryError, Never> = .init()
private let isFetchingSubject: PassthroughSubject<Bool, Never> = .init()
private var timelineListener: Any?
private let updatesSubject: PassthroughSubject<TimelinePollDetails, Never> = .init()
private let updatesErrorsSubject: PassthroughSubject<Error, Never> = .init()
private var pollAggregators: [String: PollAggregator] = [:]
private var targetTimestamp: Date
private var oldestEventDate: Date = .distantFuture
private var currentBatchSubject: PassthroughSubject<TimelinePollDetails, Error>?
var pollHistory: AnyPublisher<TimelinePollDetails, Never> {
pollsSubject.eraseToAnyPublisher()
var updates: AnyPublisher<TimelinePollDetails, Never> {
updatesSubject.eraseToAnyPublisher()
}
var error: AnyPublisher<PollHistoryError, Never> {
errorSubject.eraseToAnyPublisher()
}
var isFetching: AnyPublisher<Bool, Never> {
isFetchingSubject.eraseToAnyPublisher()
var updatesErrors: AnyPublisher<Error, Never> {
updatesErrorsSubject.eraseToAnyPublisher()
}
init(room: MXRoom, chunkSizeInDays: UInt) {
@@ -51,8 +48,8 @@ final class PollHistoryService: PollHistoryServiceProtocol {
setup(timeline: timeline)
}
func next() {
startPagination()
func next() -> AnyPublisher<TimelinePollDetails, Error> {
currentBatchSubject?.eraseToAnyPublisher() ?? startPagination()
}
}
@@ -64,22 +61,31 @@ private extension PollHistoryService {
func setup(timeline: MXEventTimeline) {
timelineListener = timeline.listenToEvents { [weak self] event, _, _ in
guard let self = self else {
return
}
if event.eventType == .pollStart {
self.aggregatePoll(pollStartEvent: event)
self?.aggregatePoll(pollStartEvent: event)
}
self.oldestEventDate = min(event.originServerDate, self.oldestEventDate)
self?.updateTimestamp(event: event)
}
}
func startPagination() {
isFetchingSubject.send(true)
timeline.resetPagination()
paginate(timeline: timeline)
func updateTimestamp(event: MXEvent) {
oldestEventDate = min(event.originServerDate, oldestEventDate)
}
func startPagination() -> AnyPublisher<TimelinePollDetails, Error> {
let batchSubject = PassthroughSubject<TimelinePollDetails, Error>()
currentBatchSubject = batchSubject
DispatchQueue.main.async { [weak self] in
guard let self = self else {
return
}
self.timeline.resetPagination()
self.paginate(timeline: self.timeline)
}
return batchSubject.eraseToAnyPublisher()
}
func paginate(timeline: MXEventTimeline) {
@@ -93,15 +99,19 @@ private extension PollHistoryService {
if timeline.canPaginate(.backwards), self.timestampTargetReached == false {
self.paginate(timeline: timeline)
} else {
self.isFetchingSubject.send(false)
self.completeBatch(completion: .finished)
}
case .failure(let error):
self.errorSubject.send(.paginationFailed(error))
self.isFetchingSubject.send(false)
self.completeBatch(completion: .failure(error))
}
}
}
func completeBatch(completion: Subscribers.Completion<Error>) {
currentBatchSubject?.send(completion: completion)
currentBatchSubject = nil
}
func aggregatePoll(pollStartEvent: MXEvent) {
guard pollAggregators[pollStartEvent.eventId] == nil else {
return
@@ -131,14 +141,14 @@ extension PollHistoryService: PollAggregatorDelegate {
func pollAggregatorDidStartLoading(_ aggregator: PollAggregator) {}
func pollAggregatorDidEndLoading(_ aggregator: PollAggregator) {
pollsSubject.send(.init(poll: aggregator.poll, represent: .started))
currentBatchSubject?.send(.init(poll: aggregator.poll, represent: .started))
}
func pollAggregator(_ aggregator: PollAggregator, didFailWithError: Error) {
errorSubject.send(.pollAggregationFailed(didFailWithError))
updatesErrorsSubject.send(didFailWithError)
}
func pollAggregatorDidUpdateData(_ aggregator: PollAggregator) {
pollsSubject.send(.init(poll: aggregator.poll, represent: .started))
updatesSubject.send(.init(poll: aggregator.poll, represent: .started))
}
}