From cf2b463fd39bddf8e6b048e99b701d671f7dd8ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20F=C3=B6rtsch?= Date: Sat, 14 Mar 2026 05:29:56 +0100 Subject: [PATCH] add ActionQueue: two-phase enqueue, FIFO flush, retry with max 5 attempts Co-Authored-By: Claude Opus 4.6 (1M context) --- .../Sources/SyncEngine/ActionQueue.swift | 175 ++++++++++ .../Sources/SyncEngine/ActionTypes.swift | 50 +++ .../Sources/SyncEngine/SyncCoordinator.swift | 12 +- .../SyncEngineTests/ActionQueueTests.swift | 308 ++++++++++++++++++ 4 files changed, 544 insertions(+), 1 deletion(-) create mode 100644 Packages/MagnumOpusCore/Sources/SyncEngine/ActionQueue.swift create mode 100644 Packages/MagnumOpusCore/Sources/SyncEngine/ActionTypes.swift create mode 100644 Packages/MagnumOpusCore/Tests/SyncEngineTests/ActionQueueTests.swift diff --git a/Packages/MagnumOpusCore/Sources/SyncEngine/ActionQueue.swift b/Packages/MagnumOpusCore/Sources/SyncEngine/ActionQueue.swift new file mode 100644 index 0000000..370394e --- /dev/null +++ b/Packages/MagnumOpusCore/Sources/SyncEngine/ActionQueue.swift @@ -0,0 +1,175 @@ +import Foundation +import Models +import IMAPClient +import SMTPClient +import MailStore + +public actor ActionQueue { + private let store: MailStore + private let accountId: String + private let imapClientProvider: @Sendable () -> any IMAPClientProtocol + private let smtpClientProvider: (@Sendable () -> SMTPClient)? + + public var pendingCount: Int { + (try? store.pendingActionCount(accountId: accountId)) ?? 0 + } + + public init( + store: MailStore, + accountId: String, + imapClientProvider: @escaping @Sendable () -> any IMAPClientProtocol, + smtpClientProvider: (@Sendable () -> SMTPClient)? = nil + ) { + self.store = store + self.accountId = accountId + self.imapClientProvider = imapClientProvider + self.smtpClientProvider = smtpClientProvider + } + + // MARK: - Enqueue + + /// Enqueue an action: persist to DB immediately, then fire-and-forget remote dispatch. + public func enqueue(_ action: PendingAction) throws { + let record = persistAction(action) + try store.insertPendingAction(record) + + // Fire-and-forget remote dispatch + Task { [weak self] in + await self?.dispatchSingle(action) + } + } + + // MARK: - Flush + + /// Dispatch all pending actions in creation order. Called before IMAP fetch. + public func flush() async { + guard let records = try? store.pendingActions(accountId: accountId) else { return } + + for record in records { + guard let action = decodeAction(record) else { + // Corrupt record — remove it + try? store.deletePendingAction(id: record.id) + continue + } + + do { + try await dispatch(action) + try store.deletePendingAction(id: record.id) + } catch { + var updated = record + updated.retryCount += 1 + updated.lastError = error.localizedDescription + if updated.retryCount >= 5 { + try? store.deletePendingAction(id: record.id) + } else { + try? store.updatePendingAction(updated) + } + } + } + } + + // MARK: - Dispatch + + private func dispatchSingle(_ action: PendingAction) async { + do { + try await dispatch(action) + try store.deletePendingAction(id: action.id) + } catch { + // Update retry count on failure; flush will retry later + guard var record = try? store.pendingActions(accountId: accountId) + .first(where: { $0.id == action.id }) else { return } + record.retryCount += 1 + record.lastError = error.localizedDescription + if record.retryCount >= 5 { + try? store.deletePendingAction(id: record.id) + } else { + try? store.updatePendingAction(record) + } + } + } + + private func dispatch(_ action: PendingAction) async throws { + switch action.payload { + case .setFlags(let uid, let mailbox, let add, let remove): + let client = imapClientProvider() + try await client.connect() + defer { Task { try? await client.disconnect() } } + try await client.storeFlags(uid: uid, mailbox: mailbox, add: add, remove: remove) + + case .move(let uid, let from, let to): + let client = imapClientProvider() + try await client.connect() + defer { Task { try? await client.disconnect() } } + try await client.moveMessage(uid: uid, from: from, to: to) + + case .delete(let uid, let mailbox, let trashMailbox): + let client = imapClientProvider() + try await client.connect() + defer { Task { try? await client.disconnect() } } + if mailbox == trashMailbox { + // Already in trash — permanent delete + try await client.storeFlags(uid: uid, mailbox: mailbox, add: ["\\Deleted"], remove: []) + try await client.expunge(mailbox: mailbox) + } else { + try await client.moveMessage(uid: uid, from: mailbox, to: trashMailbox) + } + + case .send(let message): + guard let provider = smtpClientProvider else { + throw ActionQueueError.noSMTPClient + } + let client = provider() + try await client.send(message: message) + + case .append(let mailbox, let messageData, let flags): + let client = imapClientProvider() + try await client.connect() + defer { Task { try? await client.disconnect() } } + guard let data = messageData.data(using: .utf8) else { + throw ActionQueueError.invalidMessageData + } + try await client.appendMessage(to: mailbox, message: data, flags: flags) + } + } + + // MARK: - Serialization + + func persistAction(_ action: PendingAction) -> PendingActionRecord { + let encoder = JSONEncoder() + let payloadJson = (try? encoder.encode(action.payload)) + .flatMap { String(data: $0, encoding: .utf8) } ?? "{}" + let dateString = ISO8601DateFormatter().string(from: action.createdAt) + + return PendingActionRecord( + id: action.id, + accountId: action.accountId, + actionType: action.actionType.rawValue, + payload: payloadJson, + createdAt: dateString + ) + } + + func decodeAction(_ record: PendingActionRecord) -> PendingAction? { + guard let data = record.payload.data(using: .utf8), + let payload = try? JSONDecoder().decode(ActionPayload.self, from: data), + let actionType = ActionType(rawValue: record.actionType) else { + return nil + } + + let formatter = ISO8601DateFormatter() + let date = formatter.date(from: record.createdAt) ?? Date() + + return PendingAction( + id: record.id, + accountId: record.accountId, + actionType: actionType, + payload: payload, + createdAt: date + ) + } +} + +enum ActionQueueError: Error { + case noSMTPClient + case invalidMessageData +} diff --git a/Packages/MagnumOpusCore/Sources/SyncEngine/ActionTypes.swift b/Packages/MagnumOpusCore/Sources/SyncEngine/ActionTypes.swift new file mode 100644 index 0000000..3ace171 --- /dev/null +++ b/Packages/MagnumOpusCore/Sources/SyncEngine/ActionTypes.swift @@ -0,0 +1,50 @@ +import Foundation +import Models + +public struct PendingAction: Sendable, Codable { + public var id: String + public var accountId: String + public var actionType: ActionType + public var payload: ActionPayload + public var createdAt: Date + + public init( + id: String = UUID().uuidString, + accountId: String, + actionType: ActionType, + payload: ActionPayload, + createdAt: Date = Date() + ) { + self.id = id + self.accountId = accountId + self.actionType = actionType + self.payload = payload + self.createdAt = createdAt + } +} + +public enum ActionType: String, Sendable, Codable { + case setFlags + case move + case delete + case send + case append +} + +public enum ActionPayload: Sendable, Codable { + case setFlags(uid: Int, mailbox: String, add: [String], remove: [String]) + case move(uid: Int, from: String, to: String) + case delete(uid: Int, mailbox: String, trashMailbox: String) + case send(message: OutgoingMessage) + case append(mailbox: String, messageData: String, flags: [String]) + + public var actionType: ActionType { + switch self { + case .setFlags: return .setFlags + case .move: return .move + case .delete: return .delete + case .send: return .send + case .append: return .append + } + } +} diff --git a/Packages/MagnumOpusCore/Sources/SyncEngine/SyncCoordinator.swift b/Packages/MagnumOpusCore/Sources/SyncEngine/SyncCoordinator.swift index 1831fdd..2b25dad 100644 --- a/Packages/MagnumOpusCore/Sources/SyncEngine/SyncCoordinator.swift +++ b/Packages/MagnumOpusCore/Sources/SyncEngine/SyncCoordinator.swift @@ -9,15 +9,22 @@ public final class SyncCoordinator { private let accountConfig: AccountConfig private let imapClient: any IMAPClientProtocol private let store: MailStore + private let actionQueue: ActionQueue? private var syncTask: Task? public private(set) var syncState: SyncState = .idle private var eventHandlers: [(SyncEvent) -> Void] = [] - public init(accountConfig: AccountConfig, imapClient: any IMAPClientProtocol, store: MailStore) { + public init( + accountConfig: AccountConfig, + imapClient: any IMAPClientProtocol, + store: MailStore, + actionQueue: ActionQueue? = nil + ) { self.accountConfig = accountConfig self.imapClient = imapClient self.store = store + self.actionQueue = actionQueue } public func onEvent(_ handler: @escaping (SyncEvent) -> Void) { @@ -60,6 +67,9 @@ public final class SyncCoordinator { )) } + // Flush pending actions before fetching new state + await actionQueue?.flush() + try await imapClient.connect() do { try await syncAllMailboxes() diff --git a/Packages/MagnumOpusCore/Tests/SyncEngineTests/ActionQueueTests.swift b/Packages/MagnumOpusCore/Tests/SyncEngineTests/ActionQueueTests.swift new file mode 100644 index 0000000..e251763 --- /dev/null +++ b/Packages/MagnumOpusCore/Tests/SyncEngineTests/ActionQueueTests.swift @@ -0,0 +1,308 @@ +import Testing +import Foundation +import GRDB +@testable import SyncEngine +@testable import IMAPClient +@testable import MailStore +@testable import Models + +@Suite("ActionQueue") +struct ActionQueueTests { + func makeStore() throws -> MailStore { + try MailStore(dbWriter: DatabaseSetup.openInMemoryDatabase()) + } + + func makeAccountAndStore() throws -> (MailStore, String) { + let store = try makeStore() + let accountId = "acc1" + try store.insertAccount(AccountRecord( + id: accountId, + name: "Test", + email: "test@example.com", + imapHost: "imap.example.com", + imapPort: 993 + )) + return (store, accountId) + } + + @Test("enqueue persists action to database") + func enqueuePersists() async throws { + let (store, accountId) = try makeAccountAndStore() + let mock = MockIMAPClient() + let queue = ActionQueue( + store: store, + accountId: accountId, + imapClientProvider: { mock } + ) + + let action = PendingAction( + accountId: accountId, + actionType: .setFlags, + payload: .setFlags(uid: 1, mailbox: "INBOX", add: ["\\Seen"], remove: []) + ) + + try await queue.enqueue(action) + + let pending = try store.pendingActions(accountId: accountId) + #expect(pending.count >= 1) + let found = pending.first { $0.id == action.id } + #expect(found != nil) + #expect(found?.actionType == "setFlags") + } + + @Test("flush dispatches actions and removes them from queue") + func flushDispatches() async throws { + let (store, accountId) = try makeAccountAndStore() + let mock = MockIMAPClient() + let queue = ActionQueue( + store: store, + accountId: accountId, + imapClientProvider: { mock } + ) + + // Persist action directly (bypass fire-and-forget dispatch) + let action = PendingAction( + accountId: accountId, + actionType: .setFlags, + payload: .setFlags(uid: 1, mailbox: "INBOX", add: ["\\Seen"], remove: []) + ) + let record = await queue.persistAction(action) + try store.insertPendingAction(record) + + #expect(try store.pendingActionCount(accountId: accountId) == 1) + + await queue.flush() + + #expect(try store.pendingActionCount(accountId: accountId) == 0) + #expect(mock.storedFlags.count == 1) + #expect(mock.storedFlags[0].uid == 1) + #expect(mock.storedFlags[0].add == ["\\Seen"]) + } + + @Test("flush dispatches actions in creation order (FIFO)") + func flushFIFO() async throws { + let (store, accountId) = try makeAccountAndStore() + let mock = MockIMAPClient() + let queue = ActionQueue( + store: store, + accountId: accountId, + imapClientProvider: { mock } + ) + + let now = Date() + let action1 = PendingAction( + id: "first", + accountId: accountId, + actionType: .setFlags, + payload: .setFlags(uid: 1, mailbox: "INBOX", add: ["\\Seen"], remove: []), + createdAt: now + ) + let action2 = PendingAction( + id: "second", + accountId: accountId, + actionType: .setFlags, + payload: .setFlags(uid: 2, mailbox: "INBOX", add: ["\\Flagged"], remove: []), + createdAt: now.addingTimeInterval(1) + ) + + try store.insertPendingAction(await queue.persistAction(action1)) + try store.insertPendingAction(await queue.persistAction(action2)) + + await queue.flush() + + #expect(mock.storedFlags.count == 2) + #expect(mock.storedFlags[0].uid == 1) + #expect(mock.storedFlags[1].uid == 2) + } + + @Test("pendingCount reflects queue state") + func pendingCountReflectsState() async throws { + let (store, accountId) = try makeAccountAndStore() + let mock = MockIMAPClient() + let queue = ActionQueue( + store: store, + accountId: accountId, + imapClientProvider: { mock } + ) + + #expect(await queue.pendingCount == 0) + + let action = PendingAction( + accountId: accountId, + actionType: .move, + payload: .move(uid: 1, from: "INBOX", to: "Archive") + ) + let record = await queue.persistAction(action) + try store.insertPendingAction(record) + + #expect(await queue.pendingCount == 1) + + await queue.flush() + + #expect(await queue.pendingCount == 0) + } + + @Test("failed dispatch increments retryCount") + func failedDispatchRetry() async throws { + let (store, accountId) = try makeAccountAndStore() + let mock = FailingIMAPClient() + let queue = ActionQueue( + store: store, + accountId: accountId, + imapClientProvider: { mock } + ) + + let action = PendingAction( + accountId: accountId, + actionType: .setFlags, + payload: .setFlags(uid: 1, mailbox: "INBOX", add: ["\\Seen"], remove: []) + ) + let record = await queue.persistAction(action) + try store.insertPendingAction(record) + + await queue.flush() + + let pending = try store.pendingActions(accountId: accountId) + #expect(pending.count == 1) + #expect(pending[0].retryCount == 1) + #expect(pending[0].lastError != nil) + } + + @Test("action removed after 5 failures") + func removedAfterMaxRetries() async throws { + let (store, accountId) = try makeAccountAndStore() + let mock = FailingIMAPClient() + let queue = ActionQueue( + store: store, + accountId: accountId, + imapClientProvider: { mock } + ) + + let action = PendingAction( + accountId: accountId, + actionType: .setFlags, + payload: .setFlags(uid: 1, mailbox: "INBOX", add: ["\\Seen"], remove: []) + ) + var record = await queue.persistAction(action) + record.retryCount = 4 + try store.insertPendingAction(record) + + await queue.flush() + + // retryCount was 4, incremented to 5 → removed + #expect(try store.pendingActionCount(accountId: accountId) == 0) + } + + @Test("delete action in trash expunges instead of moving") + func deleteInTrashExpunges() async throws { + let (store, accountId) = try makeAccountAndStore() + let mock = MockIMAPClient() + let queue = ActionQueue( + store: store, + accountId: accountId, + imapClientProvider: { mock } + ) + + let action = PendingAction( + accountId: accountId, + actionType: .delete, + payload: .delete(uid: 5, mailbox: "Trash", trashMailbox: "Trash") + ) + let record = await queue.persistAction(action) + try store.insertPendingAction(record) + + await queue.flush() + + // Should have stored \Deleted flag and expunged + #expect(mock.storedFlags.count == 1) + #expect(mock.storedFlags[0].add == ["\\Deleted"]) + #expect(mock.expungedMailboxes == ["Trash"]) + #expect(mock.movedMessages.isEmpty) + } + + @Test("delete action not in trash moves to trash") + func deleteNotInTrashMoves() async throws { + let (store, accountId) = try makeAccountAndStore() + let mock = MockIMAPClient() + let queue = ActionQueue( + store: store, + accountId: accountId, + imapClientProvider: { mock } + ) + + let action = PendingAction( + accountId: accountId, + actionType: .delete, + payload: .delete(uid: 3, mailbox: "INBOX", trashMailbox: "Trash") + ) + let record = await queue.persistAction(action) + try store.insertPendingAction(record) + + await queue.flush() + + #expect(mock.movedMessages.count == 1) + #expect(mock.movedMessages[0].from == "INBOX") + #expect(mock.movedMessages[0].to == "Trash") + #expect(mock.storedFlags.isEmpty) + } + + @Test("move action dispatches correctly") + func moveDispatches() async throws { + let (store, accountId) = try makeAccountAndStore() + let mock = MockIMAPClient() + let queue = ActionQueue( + store: store, + accountId: accountId, + imapClientProvider: { mock } + ) + + let action = PendingAction( + accountId: accountId, + actionType: .move, + payload: .move(uid: 7, from: "INBOX", to: "Archive") + ) + let record = await queue.persistAction(action) + try store.insertPendingAction(record) + + await queue.flush() + + #expect(mock.movedMessages.count == 1) + #expect(mock.movedMessages[0].uid == 7) + #expect(mock.movedMessages[0].from == "INBOX") + #expect(mock.movedMessages[0].to == "Archive") + } +} + +// MARK: - FailingIMAPClient + +private final class FailingIMAPClient: IMAPClientProtocol, @unchecked Sendable { + func connect() async throws {} + func disconnect() async throws {} + func listMailboxes() async throws -> [IMAPMailboxInfo] { [] } + func selectMailbox(_ name: String) async throws -> IMAPMailboxStatus { + throw FailingIMAPError.alwaysFails + } + func fetchEnvelopes(uidsGreaterThan uid: Int) async throws -> [FetchedEnvelope] { [] } + func fetchFlags(uids: ClosedRange) async throws -> [UIDFlagsPair] { [] } + func fetchBody(uid: Int) async throws -> (text: String?, html: String?) { (nil, nil) } + func storeFlags(uid: Int, mailbox: String, add: [String], remove: [String]) async throws { + throw FailingIMAPError.alwaysFails + } + func moveMessage(uid: Int, from: String, to: String) async throws { + throw FailingIMAPError.alwaysFails + } + func copyMessage(uid: Int, from: String, to: String) async throws { + throw FailingIMAPError.alwaysFails + } + func expunge(mailbox: String) async throws { + throw FailingIMAPError.alwaysFails + } + func appendMessage(to mailbox: String, message: Data, flags: [String]) async throws { + throw FailingIMAPError.alwaysFails + } + func capabilities() async throws -> Set { [] } +} + +private enum FailingIMAPError: Error { + case alwaysFails +}