import Foundation import Models import IMAPClient import SMTPClient import MailStore public actor ActionQueue { private let store: MailStore private let accountId: String private let imapClientProvider: @Sendable () -> any IMAPClientProtocol private let smtpClientProvider: (@Sendable () -> SMTPClient)? public var pendingCount: Int { (try? store.pendingActionCount(accountId: accountId)) ?? 0 } public init( store: MailStore, accountId: String, imapClientProvider: @escaping @Sendable () -> any IMAPClientProtocol, smtpClientProvider: (@Sendable () -> SMTPClient)? = nil ) { self.store = store self.accountId = accountId self.imapClientProvider = imapClientProvider self.smtpClientProvider = smtpClientProvider } // MARK: - Enqueue /// Enqueue an action: persist to DB immediately, then fire-and-forget remote dispatch. public func enqueue(_ action: PendingAction) throws { let record = persistAction(action) try store.insertPendingAction(record) // Fire-and-forget remote dispatch Task { [weak self] in await self?.dispatchSingle(action) } } // MARK: - Flush /// Dispatch all pending actions in creation order. Called before IMAP fetch. public func flush() async { guard let records = try? store.pendingActions(accountId: accountId) else { return } for record in records { guard let action = decodeAction(record) else { // Corrupt record — remove it try? store.deletePendingAction(id: record.id) continue } do { try await dispatch(action) try store.deletePendingAction(id: record.id) } catch { var updated = record updated.retryCount += 1 updated.lastError = error.localizedDescription if updated.retryCount >= 5 { try? store.deletePendingAction(id: record.id) } else { try? store.updatePendingAction(updated) } } } } // MARK: - Dispatch private func dispatchSingle(_ action: PendingAction) async { do { try await dispatch(action) try store.deletePendingAction(id: action.id) } catch { // Update retry count on failure; flush will retry later guard var record = try? store.pendingActions(accountId: accountId) .first(where: { $0.id == action.id }) else { return } record.retryCount += 1 record.lastError = error.localizedDescription if record.retryCount >= 5 { try? store.deletePendingAction(id: record.id) } else { try? store.updatePendingAction(record) } } } private func dispatch(_ action: PendingAction) async throws { switch action.payload { case .setFlags(let uid, let mailbox, let add, let remove): let client = imapClientProvider() try await client.connect() defer { Task { try? await client.disconnect() } } try await client.storeFlags(uid: uid, mailbox: mailbox, add: add, remove: remove) case .move(let uid, let from, let to): let client = imapClientProvider() try await client.connect() defer { Task { try? await client.disconnect() } } try await client.moveMessage(uid: uid, from: from, to: to) case .delete(let uid, let mailbox, let trashMailbox): let client = imapClientProvider() try await client.connect() defer { Task { try? await client.disconnect() } } if mailbox == trashMailbox { // Already in trash — permanent delete try await client.storeFlags(uid: uid, mailbox: mailbox, add: ["\\Deleted"], remove: []) try await client.expunge(mailbox: mailbox) } else { try await client.moveMessage(uid: uid, from: mailbox, to: trashMailbox) } case .send(let message): guard let provider = smtpClientProvider else { throw ActionQueueError.noSMTPClient } let client = provider() try await client.send(message: message) case .append(let mailbox, let messageData, let flags): let client = imapClientProvider() try await client.connect() defer { Task { try? await client.disconnect() } } guard let data = messageData.data(using: .utf8) else { throw ActionQueueError.invalidMessageData } try await client.appendMessage(to: mailbox, message: data, flags: flags) } } // MARK: - Serialization func persistAction(_ action: PendingAction) -> PendingActionRecord { let encoder = JSONEncoder() let payloadJson = (try? encoder.encode(action.payload)) .flatMap { String(data: $0, encoding: .utf8) } ?? "{}" let dateString = ISO8601DateFormatter().string(from: action.createdAt) return PendingActionRecord( id: action.id, accountId: action.accountId, actionType: action.actionType.rawValue, payload: payloadJson, createdAt: dateString ) } func decodeAction(_ record: PendingActionRecord) -> PendingAction? { guard let data = record.payload.data(using: .utf8), let payload = try? JSONDecoder().decode(ActionPayload.self, from: data), let actionType = ActionType(rawValue: record.actionType) else { return nil } let formatter = ISO8601DateFormatter() let date = formatter.date(from: record.createdAt) ?? Date() return PendingAction( id: record.id, accountId: record.accountId, actionType: actionType, payload: payload, createdAt: date ) } } enum ActionQueueError: Error { case noSMTPClient case invalidMessageData }