Handle live polls

This commit is contained in:
Alfonso Grillo
2023-01-25 17:52:39 +01:00
parent cddf935fa8
commit 2e3aa18f6c
4 changed files with 67 additions and 5 deletions
@@ -22,15 +22,26 @@ final class PollHistoryService: PollHistoryServiceProtocol {
private let room: MXRoom
private let timeline: MXEventTimeline
private let chunkSizeInDays: UInt
private var timelineListener: Any?
private var timelineListener: Any?
private var roomListener: Any?
// polls aggregation
private var pollAggregators: [String: PollAggregator] = [:]
private var livePollsIDs: Set<String> = .init()
private var publishedPollsIDs: Set<String> = .init()
// polls
private var currentBatchSubject: PassthroughSubject<TimelinePollDetails, Error>?
private var livePollsSubject: PassthroughSubject<TimelinePollDetails, Never> = .init()
// polls updates
private let updatesSubject: PassthroughSubject<TimelinePollDetails, Never> = .init()
private let pollErrorsSubject: PassthroughSubject<Error, Never> = .init()
private var pollAggregators: [String: PollAggregator] = [:]
// timestamps
private var targetTimestamp: Date?
private var oldestEventDateSubject: CurrentValueSubject<Date, Never> = .init(Date.distantFuture)
private var currentBatchSubject: PassthroughSubject<TimelinePollDetails, Error>?
var updates: AnyPublisher<TimelinePollDetails, Never> {
updatesSubject.eraseToAnyPublisher()
@@ -45,6 +56,7 @@ final class PollHistoryService: PollHistoryServiceProtocol {
self.chunkSizeInDays = chunkSizeInDays
timeline = MXRoomEventTimeline(room: room, andInitialEventId: nil)
setupTimeline()
setupLiveUpdates()
}
func nextBatch() -> AnyPublisher<TimelinePollDetails, Error> {
@@ -58,6 +70,17 @@ final class PollHistoryService: PollHistoryServiceProtocol {
var fetchedUpTo: AnyPublisher<Date, Never> {
oldestEventDateSubject.eraseToAnyPublisher()
}
var livePolls: AnyPublisher<TimelinePollDetails, Never> {
livePollsSubject.eraseToAnyPublisher()
}
deinit {
guard let roomListener = roomListener else {
return
}
room.removeListener(roomListener)
}
}
private extension PollHistoryService {
@@ -77,6 +100,15 @@ private extension PollHistoryService {
}
}
func setupLiveUpdates() {
roomListener = room.listen(toEventsOfTypes: [kMXEventTypeStringPollStart, kMXEventTypeStringPollStartMSC3381]) { [weak self] event, _, _ in
if event.eventType == .pollStart {
self?.livePollsIDs.insert(event.eventId)
self?.aggregatePoll(pollStartEvent: event)
}
}
}
func updateTimestamp(event: MXEvent) {
oldestEventDate = min(event.originServerDate, oldestEventDate)
}
@@ -169,7 +201,21 @@ extension PollHistoryService: PollAggregatorDelegate {
func pollAggregatorDidStartLoading(_ aggregator: PollAggregator) {}
func pollAggregatorDidEndLoading(_ aggregator: PollAggregator) {
currentBatchSubject?.send(.init(poll: aggregator.poll, represent: .started))
let pollID = aggregator.poll.id
guard publishedPollsIDs.contains(pollID) == false else {
return
}
publishedPollsIDs.insert(pollID)
let newPoll: TimelinePollDetails = .init(poll: aggregator.poll, represent: .started)
if livePollsIDs.contains(newPoll.id) {
livePollsSubject.send(newPoll)
} else {
currentBatchSubject?.send(newPoll)
}
}
func pollAggregator(_ aggregator: PollAggregator, didFailWithError: Error) {