add ActionQueue: two-phase enqueue, FIFO flush, retry with max 5 attempts
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
175
Packages/MagnumOpusCore/Sources/SyncEngine/ActionQueue.swift
Normal file
175
Packages/MagnumOpusCore/Sources/SyncEngine/ActionQueue.swift
Normal file
@@ -0,0 +1,175 @@
|
||||
import Foundation
|
||||
import Models
|
||||
import IMAPClient
|
||||
import SMTPClient
|
||||
import MailStore
|
||||
|
||||
public actor ActionQueue {
|
||||
private let store: MailStore
|
||||
private let accountId: String
|
||||
private let imapClientProvider: @Sendable () -> any IMAPClientProtocol
|
||||
private let smtpClientProvider: (@Sendable () -> SMTPClient)?
|
||||
|
||||
public var pendingCount: Int {
|
||||
(try? store.pendingActionCount(accountId: accountId)) ?? 0
|
||||
}
|
||||
|
||||
public init(
|
||||
store: MailStore,
|
||||
accountId: String,
|
||||
imapClientProvider: @escaping @Sendable () -> any IMAPClientProtocol,
|
||||
smtpClientProvider: (@Sendable () -> SMTPClient)? = nil
|
||||
) {
|
||||
self.store = store
|
||||
self.accountId = accountId
|
||||
self.imapClientProvider = imapClientProvider
|
||||
self.smtpClientProvider = smtpClientProvider
|
||||
}
|
||||
|
||||
// MARK: - Enqueue
|
||||
|
||||
/// Enqueue an action: persist to DB immediately, then fire-and-forget remote dispatch.
|
||||
public func enqueue(_ action: PendingAction) throws {
|
||||
let record = persistAction(action)
|
||||
try store.insertPendingAction(record)
|
||||
|
||||
// Fire-and-forget remote dispatch
|
||||
Task { [weak self] in
|
||||
await self?.dispatchSingle(action)
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - Flush
|
||||
|
||||
/// Dispatch all pending actions in creation order. Called before IMAP fetch.
|
||||
public func flush() async {
|
||||
guard let records = try? store.pendingActions(accountId: accountId) else { return }
|
||||
|
||||
for record in records {
|
||||
guard let action = decodeAction(record) else {
|
||||
// Corrupt record — remove it
|
||||
try? store.deletePendingAction(id: record.id)
|
||||
continue
|
||||
}
|
||||
|
||||
do {
|
||||
try await dispatch(action)
|
||||
try store.deletePendingAction(id: record.id)
|
||||
} catch {
|
||||
var updated = record
|
||||
updated.retryCount += 1
|
||||
updated.lastError = error.localizedDescription
|
||||
if updated.retryCount >= 5 {
|
||||
try? store.deletePendingAction(id: record.id)
|
||||
} else {
|
||||
try? store.updatePendingAction(updated)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - Dispatch
|
||||
|
||||
private func dispatchSingle(_ action: PendingAction) async {
|
||||
do {
|
||||
try await dispatch(action)
|
||||
try store.deletePendingAction(id: action.id)
|
||||
} catch {
|
||||
// Update retry count on failure; flush will retry later
|
||||
guard var record = try? store.pendingActions(accountId: accountId)
|
||||
.first(where: { $0.id == action.id }) else { return }
|
||||
record.retryCount += 1
|
||||
record.lastError = error.localizedDescription
|
||||
if record.retryCount >= 5 {
|
||||
try? store.deletePendingAction(id: record.id)
|
||||
} else {
|
||||
try? store.updatePendingAction(record)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private func dispatch(_ action: PendingAction) async throws {
|
||||
switch action.payload {
|
||||
case .setFlags(let uid, let mailbox, let add, let remove):
|
||||
let client = imapClientProvider()
|
||||
try await client.connect()
|
||||
defer { Task { try? await client.disconnect() } }
|
||||
try await client.storeFlags(uid: uid, mailbox: mailbox, add: add, remove: remove)
|
||||
|
||||
case .move(let uid, let from, let to):
|
||||
let client = imapClientProvider()
|
||||
try await client.connect()
|
||||
defer { Task { try? await client.disconnect() } }
|
||||
try await client.moveMessage(uid: uid, from: from, to: to)
|
||||
|
||||
case .delete(let uid, let mailbox, let trashMailbox):
|
||||
let client = imapClientProvider()
|
||||
try await client.connect()
|
||||
defer { Task { try? await client.disconnect() } }
|
||||
if mailbox == trashMailbox {
|
||||
// Already in trash — permanent delete
|
||||
try await client.storeFlags(uid: uid, mailbox: mailbox, add: ["\\Deleted"], remove: [])
|
||||
try await client.expunge(mailbox: mailbox)
|
||||
} else {
|
||||
try await client.moveMessage(uid: uid, from: mailbox, to: trashMailbox)
|
||||
}
|
||||
|
||||
case .send(let message):
|
||||
guard let provider = smtpClientProvider else {
|
||||
throw ActionQueueError.noSMTPClient
|
||||
}
|
||||
let client = provider()
|
||||
try await client.send(message: message)
|
||||
|
||||
case .append(let mailbox, let messageData, let flags):
|
||||
let client = imapClientProvider()
|
||||
try await client.connect()
|
||||
defer { Task { try? await client.disconnect() } }
|
||||
guard let data = messageData.data(using: .utf8) else {
|
||||
throw ActionQueueError.invalidMessageData
|
||||
}
|
||||
try await client.appendMessage(to: mailbox, message: data, flags: flags)
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - Serialization
|
||||
|
||||
func persistAction(_ action: PendingAction) -> PendingActionRecord {
|
||||
let encoder = JSONEncoder()
|
||||
let payloadJson = (try? encoder.encode(action.payload))
|
||||
.flatMap { String(data: $0, encoding: .utf8) } ?? "{}"
|
||||
let dateString = ISO8601DateFormatter().string(from: action.createdAt)
|
||||
|
||||
return PendingActionRecord(
|
||||
id: action.id,
|
||||
accountId: action.accountId,
|
||||
actionType: action.actionType.rawValue,
|
||||
payload: payloadJson,
|
||||
createdAt: dateString
|
||||
)
|
||||
}
|
||||
|
||||
func decodeAction(_ record: PendingActionRecord) -> PendingAction? {
|
||||
guard let data = record.payload.data(using: .utf8),
|
||||
let payload = try? JSONDecoder().decode(ActionPayload.self, from: data),
|
||||
let actionType = ActionType(rawValue: record.actionType) else {
|
||||
return nil
|
||||
}
|
||||
|
||||
let formatter = ISO8601DateFormatter()
|
||||
let date = formatter.date(from: record.createdAt) ?? Date()
|
||||
|
||||
return PendingAction(
|
||||
id: record.id,
|
||||
accountId: record.accountId,
|
||||
actionType: actionType,
|
||||
payload: payload,
|
||||
createdAt: date
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
enum ActionQueueError: Error {
|
||||
case noSMTPClient
|
||||
case invalidMessageData
|
||||
}
|
||||
50
Packages/MagnumOpusCore/Sources/SyncEngine/ActionTypes.swift
Normal file
50
Packages/MagnumOpusCore/Sources/SyncEngine/ActionTypes.swift
Normal file
@@ -0,0 +1,50 @@
|
||||
import Foundation
|
||||
import Models
|
||||
|
||||
public struct PendingAction: Sendable, Codable {
|
||||
public var id: String
|
||||
public var accountId: String
|
||||
public var actionType: ActionType
|
||||
public var payload: ActionPayload
|
||||
public var createdAt: Date
|
||||
|
||||
public init(
|
||||
id: String = UUID().uuidString,
|
||||
accountId: String,
|
||||
actionType: ActionType,
|
||||
payload: ActionPayload,
|
||||
createdAt: Date = Date()
|
||||
) {
|
||||
self.id = id
|
||||
self.accountId = accountId
|
||||
self.actionType = actionType
|
||||
self.payload = payload
|
||||
self.createdAt = createdAt
|
||||
}
|
||||
}
|
||||
|
||||
public enum ActionType: String, Sendable, Codable {
|
||||
case setFlags
|
||||
case move
|
||||
case delete
|
||||
case send
|
||||
case append
|
||||
}
|
||||
|
||||
public enum ActionPayload: Sendable, Codable {
|
||||
case setFlags(uid: Int, mailbox: String, add: [String], remove: [String])
|
||||
case move(uid: Int, from: String, to: String)
|
||||
case delete(uid: Int, mailbox: String, trashMailbox: String)
|
||||
case send(message: OutgoingMessage)
|
||||
case append(mailbox: String, messageData: String, flags: [String])
|
||||
|
||||
public var actionType: ActionType {
|
||||
switch self {
|
||||
case .setFlags: return .setFlags
|
||||
case .move: return .move
|
||||
case .delete: return .delete
|
||||
case .send: return .send
|
||||
case .append: return .append
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -9,15 +9,22 @@ public final class SyncCoordinator {
|
||||
private let accountConfig: AccountConfig
|
||||
private let imapClient: any IMAPClientProtocol
|
||||
private let store: MailStore
|
||||
private let actionQueue: ActionQueue?
|
||||
private var syncTask: Task<Void, Never>?
|
||||
|
||||
public private(set) var syncState: SyncState = .idle
|
||||
private var eventHandlers: [(SyncEvent) -> Void] = []
|
||||
|
||||
public init(accountConfig: AccountConfig, imapClient: any IMAPClientProtocol, store: MailStore) {
|
||||
public init(
|
||||
accountConfig: AccountConfig,
|
||||
imapClient: any IMAPClientProtocol,
|
||||
store: MailStore,
|
||||
actionQueue: ActionQueue? = nil
|
||||
) {
|
||||
self.accountConfig = accountConfig
|
||||
self.imapClient = imapClient
|
||||
self.store = store
|
||||
self.actionQueue = actionQueue
|
||||
}
|
||||
|
||||
public func onEvent(_ handler: @escaping (SyncEvent) -> Void) {
|
||||
@@ -60,6 +67,9 @@ public final class SyncCoordinator {
|
||||
))
|
||||
}
|
||||
|
||||
// Flush pending actions before fetching new state
|
||||
await actionQueue?.flush()
|
||||
|
||||
try await imapClient.connect()
|
||||
do {
|
||||
try await syncAllMailboxes()
|
||||
|
||||
@@ -0,0 +1,308 @@
|
||||
import Testing
|
||||
import Foundation
|
||||
import GRDB
|
||||
@testable import SyncEngine
|
||||
@testable import IMAPClient
|
||||
@testable import MailStore
|
||||
@testable import Models
|
||||
|
||||
@Suite("ActionQueue")
|
||||
struct ActionQueueTests {
|
||||
func makeStore() throws -> MailStore {
|
||||
try MailStore(dbWriter: DatabaseSetup.openInMemoryDatabase())
|
||||
}
|
||||
|
||||
func makeAccountAndStore() throws -> (MailStore, String) {
|
||||
let store = try makeStore()
|
||||
let accountId = "acc1"
|
||||
try store.insertAccount(AccountRecord(
|
||||
id: accountId,
|
||||
name: "Test",
|
||||
email: "test@example.com",
|
||||
imapHost: "imap.example.com",
|
||||
imapPort: 993
|
||||
))
|
||||
return (store, accountId)
|
||||
}
|
||||
|
||||
@Test("enqueue persists action to database")
|
||||
func enqueuePersists() async throws {
|
||||
let (store, accountId) = try makeAccountAndStore()
|
||||
let mock = MockIMAPClient()
|
||||
let queue = ActionQueue(
|
||||
store: store,
|
||||
accountId: accountId,
|
||||
imapClientProvider: { mock }
|
||||
)
|
||||
|
||||
let action = PendingAction(
|
||||
accountId: accountId,
|
||||
actionType: .setFlags,
|
||||
payload: .setFlags(uid: 1, mailbox: "INBOX", add: ["\\Seen"], remove: [])
|
||||
)
|
||||
|
||||
try await queue.enqueue(action)
|
||||
|
||||
let pending = try store.pendingActions(accountId: accountId)
|
||||
#expect(pending.count >= 1)
|
||||
let found = pending.first { $0.id == action.id }
|
||||
#expect(found != nil)
|
||||
#expect(found?.actionType == "setFlags")
|
||||
}
|
||||
|
||||
@Test("flush dispatches actions and removes them from queue")
|
||||
func flushDispatches() async throws {
|
||||
let (store, accountId) = try makeAccountAndStore()
|
||||
let mock = MockIMAPClient()
|
||||
let queue = ActionQueue(
|
||||
store: store,
|
||||
accountId: accountId,
|
||||
imapClientProvider: { mock }
|
||||
)
|
||||
|
||||
// Persist action directly (bypass fire-and-forget dispatch)
|
||||
let action = PendingAction(
|
||||
accountId: accountId,
|
||||
actionType: .setFlags,
|
||||
payload: .setFlags(uid: 1, mailbox: "INBOX", add: ["\\Seen"], remove: [])
|
||||
)
|
||||
let record = await queue.persistAction(action)
|
||||
try store.insertPendingAction(record)
|
||||
|
||||
#expect(try store.pendingActionCount(accountId: accountId) == 1)
|
||||
|
||||
await queue.flush()
|
||||
|
||||
#expect(try store.pendingActionCount(accountId: accountId) == 0)
|
||||
#expect(mock.storedFlags.count == 1)
|
||||
#expect(mock.storedFlags[0].uid == 1)
|
||||
#expect(mock.storedFlags[0].add == ["\\Seen"])
|
||||
}
|
||||
|
||||
@Test("flush dispatches actions in creation order (FIFO)")
|
||||
func flushFIFO() async throws {
|
||||
let (store, accountId) = try makeAccountAndStore()
|
||||
let mock = MockIMAPClient()
|
||||
let queue = ActionQueue(
|
||||
store: store,
|
||||
accountId: accountId,
|
||||
imapClientProvider: { mock }
|
||||
)
|
||||
|
||||
let now = Date()
|
||||
let action1 = PendingAction(
|
||||
id: "first",
|
||||
accountId: accountId,
|
||||
actionType: .setFlags,
|
||||
payload: .setFlags(uid: 1, mailbox: "INBOX", add: ["\\Seen"], remove: []),
|
||||
createdAt: now
|
||||
)
|
||||
let action2 = PendingAction(
|
||||
id: "second",
|
||||
accountId: accountId,
|
||||
actionType: .setFlags,
|
||||
payload: .setFlags(uid: 2, mailbox: "INBOX", add: ["\\Flagged"], remove: []),
|
||||
createdAt: now.addingTimeInterval(1)
|
||||
)
|
||||
|
||||
try store.insertPendingAction(await queue.persistAction(action1))
|
||||
try store.insertPendingAction(await queue.persistAction(action2))
|
||||
|
||||
await queue.flush()
|
||||
|
||||
#expect(mock.storedFlags.count == 2)
|
||||
#expect(mock.storedFlags[0].uid == 1)
|
||||
#expect(mock.storedFlags[1].uid == 2)
|
||||
}
|
||||
|
||||
@Test("pendingCount reflects queue state")
|
||||
func pendingCountReflectsState() async throws {
|
||||
let (store, accountId) = try makeAccountAndStore()
|
||||
let mock = MockIMAPClient()
|
||||
let queue = ActionQueue(
|
||||
store: store,
|
||||
accountId: accountId,
|
||||
imapClientProvider: { mock }
|
||||
)
|
||||
|
||||
#expect(await queue.pendingCount == 0)
|
||||
|
||||
let action = PendingAction(
|
||||
accountId: accountId,
|
||||
actionType: .move,
|
||||
payload: .move(uid: 1, from: "INBOX", to: "Archive")
|
||||
)
|
||||
let record = await queue.persistAction(action)
|
||||
try store.insertPendingAction(record)
|
||||
|
||||
#expect(await queue.pendingCount == 1)
|
||||
|
||||
await queue.flush()
|
||||
|
||||
#expect(await queue.pendingCount == 0)
|
||||
}
|
||||
|
||||
@Test("failed dispatch increments retryCount")
|
||||
func failedDispatchRetry() async throws {
|
||||
let (store, accountId) = try makeAccountAndStore()
|
||||
let mock = FailingIMAPClient()
|
||||
let queue = ActionQueue(
|
||||
store: store,
|
||||
accountId: accountId,
|
||||
imapClientProvider: { mock }
|
||||
)
|
||||
|
||||
let action = PendingAction(
|
||||
accountId: accountId,
|
||||
actionType: .setFlags,
|
||||
payload: .setFlags(uid: 1, mailbox: "INBOX", add: ["\\Seen"], remove: [])
|
||||
)
|
||||
let record = await queue.persistAction(action)
|
||||
try store.insertPendingAction(record)
|
||||
|
||||
await queue.flush()
|
||||
|
||||
let pending = try store.pendingActions(accountId: accountId)
|
||||
#expect(pending.count == 1)
|
||||
#expect(pending[0].retryCount == 1)
|
||||
#expect(pending[0].lastError != nil)
|
||||
}
|
||||
|
||||
@Test("action removed after 5 failures")
|
||||
func removedAfterMaxRetries() async throws {
|
||||
let (store, accountId) = try makeAccountAndStore()
|
||||
let mock = FailingIMAPClient()
|
||||
let queue = ActionQueue(
|
||||
store: store,
|
||||
accountId: accountId,
|
||||
imapClientProvider: { mock }
|
||||
)
|
||||
|
||||
let action = PendingAction(
|
||||
accountId: accountId,
|
||||
actionType: .setFlags,
|
||||
payload: .setFlags(uid: 1, mailbox: "INBOX", add: ["\\Seen"], remove: [])
|
||||
)
|
||||
var record = await queue.persistAction(action)
|
||||
record.retryCount = 4
|
||||
try store.insertPendingAction(record)
|
||||
|
||||
await queue.flush()
|
||||
|
||||
// retryCount was 4, incremented to 5 → removed
|
||||
#expect(try store.pendingActionCount(accountId: accountId) == 0)
|
||||
}
|
||||
|
||||
@Test("delete action in trash expunges instead of moving")
|
||||
func deleteInTrashExpunges() async throws {
|
||||
let (store, accountId) = try makeAccountAndStore()
|
||||
let mock = MockIMAPClient()
|
||||
let queue = ActionQueue(
|
||||
store: store,
|
||||
accountId: accountId,
|
||||
imapClientProvider: { mock }
|
||||
)
|
||||
|
||||
let action = PendingAction(
|
||||
accountId: accountId,
|
||||
actionType: .delete,
|
||||
payload: .delete(uid: 5, mailbox: "Trash", trashMailbox: "Trash")
|
||||
)
|
||||
let record = await queue.persistAction(action)
|
||||
try store.insertPendingAction(record)
|
||||
|
||||
await queue.flush()
|
||||
|
||||
// Should have stored \Deleted flag and expunged
|
||||
#expect(mock.storedFlags.count == 1)
|
||||
#expect(mock.storedFlags[0].add == ["\\Deleted"])
|
||||
#expect(mock.expungedMailboxes == ["Trash"])
|
||||
#expect(mock.movedMessages.isEmpty)
|
||||
}
|
||||
|
||||
@Test("delete action not in trash moves to trash")
|
||||
func deleteNotInTrashMoves() async throws {
|
||||
let (store, accountId) = try makeAccountAndStore()
|
||||
let mock = MockIMAPClient()
|
||||
let queue = ActionQueue(
|
||||
store: store,
|
||||
accountId: accountId,
|
||||
imapClientProvider: { mock }
|
||||
)
|
||||
|
||||
let action = PendingAction(
|
||||
accountId: accountId,
|
||||
actionType: .delete,
|
||||
payload: .delete(uid: 3, mailbox: "INBOX", trashMailbox: "Trash")
|
||||
)
|
||||
let record = await queue.persistAction(action)
|
||||
try store.insertPendingAction(record)
|
||||
|
||||
await queue.flush()
|
||||
|
||||
#expect(mock.movedMessages.count == 1)
|
||||
#expect(mock.movedMessages[0].from == "INBOX")
|
||||
#expect(mock.movedMessages[0].to == "Trash")
|
||||
#expect(mock.storedFlags.isEmpty)
|
||||
}
|
||||
|
||||
@Test("move action dispatches correctly")
|
||||
func moveDispatches() async throws {
|
||||
let (store, accountId) = try makeAccountAndStore()
|
||||
let mock = MockIMAPClient()
|
||||
let queue = ActionQueue(
|
||||
store: store,
|
||||
accountId: accountId,
|
||||
imapClientProvider: { mock }
|
||||
)
|
||||
|
||||
let action = PendingAction(
|
||||
accountId: accountId,
|
||||
actionType: .move,
|
||||
payload: .move(uid: 7, from: "INBOX", to: "Archive")
|
||||
)
|
||||
let record = await queue.persistAction(action)
|
||||
try store.insertPendingAction(record)
|
||||
|
||||
await queue.flush()
|
||||
|
||||
#expect(mock.movedMessages.count == 1)
|
||||
#expect(mock.movedMessages[0].uid == 7)
|
||||
#expect(mock.movedMessages[0].from == "INBOX")
|
||||
#expect(mock.movedMessages[0].to == "Archive")
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - FailingIMAPClient
|
||||
|
||||
private final class FailingIMAPClient: IMAPClientProtocol, @unchecked Sendable {
|
||||
func connect() async throws {}
|
||||
func disconnect() async throws {}
|
||||
func listMailboxes() async throws -> [IMAPMailboxInfo] { [] }
|
||||
func selectMailbox(_ name: String) async throws -> IMAPMailboxStatus {
|
||||
throw FailingIMAPError.alwaysFails
|
||||
}
|
||||
func fetchEnvelopes(uidsGreaterThan uid: Int) async throws -> [FetchedEnvelope] { [] }
|
||||
func fetchFlags(uids: ClosedRange<Int>) async throws -> [UIDFlagsPair] { [] }
|
||||
func fetchBody(uid: Int) async throws -> (text: String?, html: String?) { (nil, nil) }
|
||||
func storeFlags(uid: Int, mailbox: String, add: [String], remove: [String]) async throws {
|
||||
throw FailingIMAPError.alwaysFails
|
||||
}
|
||||
func moveMessage(uid: Int, from: String, to: String) async throws {
|
||||
throw FailingIMAPError.alwaysFails
|
||||
}
|
||||
func copyMessage(uid: Int, from: String, to: String) async throws {
|
||||
throw FailingIMAPError.alwaysFails
|
||||
}
|
||||
func expunge(mailbox: String) async throws {
|
||||
throw FailingIMAPError.alwaysFails
|
||||
}
|
||||
func appendMessage(to mailbox: String, message: Data, flags: [String]) async throws {
|
||||
throw FailingIMAPError.alwaysFails
|
||||
}
|
||||
func capabilities() async throws -> Set<String> { [] }
|
||||
}
|
||||
|
||||
private enum FailingIMAPError: Error {
|
||||
case alwaysFails
|
||||
}
|
||||
Reference in New Issue
Block a user