- detect and re-fetch bodies containing unparsed MIME content (boundary markers, Content-Transfer-Encoding headers) from pre-MIMEParser code path - fix MIMEParser section numbering: pass cumulative sectionPrefix in nested multiparts instead of resetting to empty string - generate snippet from parsed body text when envelope snippet is missing - add pendingAction(id:) direct lookup to MailStore, avoid re-fetching all actions - add updateSnippet method to MailStore - fix IMAPIdleClient.selectInbox: use incrementing tag counter instead of hardcoded tag - use static nonisolated(unsafe) ISO8601DateFormatter in ActionQueue (avoid repeated alloc) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
194 lines
5.9 KiB
Swift
194 lines
5.9 KiB
Swift
import Foundation
|
|
import os
|
|
import Models
|
|
import IMAPClient
|
|
import SMTPClient
|
|
import MailStore
|
|
|
|
private nonisolated(unsafe) let iso8601Formatter: ISO8601DateFormatter = {
|
|
ISO8601DateFormatter()
|
|
}()
|
|
|
|
public actor ActionQueue {
|
|
private let store: MailStore
|
|
private let accountId: String
|
|
private let imapClientProvider: @Sendable () -> any IMAPClientProtocol
|
|
private let smtpClientProvider: (@Sendable () -> SMTPClient)?
|
|
|
|
public var pendingCount: Int {
|
|
(try? store.pendingActionCount(accountId: accountId)) ?? 0
|
|
}
|
|
|
|
public init(
|
|
store: MailStore,
|
|
accountId: String,
|
|
imapClientProvider: @escaping @Sendable () -> any IMAPClientProtocol,
|
|
smtpClientProvider: (@Sendable () -> SMTPClient)? = nil
|
|
) {
|
|
self.store = store
|
|
self.accountId = accountId
|
|
self.imapClientProvider = imapClientProvider
|
|
self.smtpClientProvider = smtpClientProvider
|
|
}
|
|
|
|
// MARK: - Enqueue
|
|
|
|
/// Enqueue an action: persist to DB immediately, then fire-and-forget remote dispatch.
|
|
public func enqueue(_ action: PendingAction) throws {
|
|
let record = persistAction(action)
|
|
try store.insertPendingAction(record)
|
|
|
|
// Fire-and-forget remote dispatch
|
|
Task { [weak self] in
|
|
await self?.dispatchSingle(action)
|
|
}
|
|
}
|
|
|
|
// MARK: - Flush
|
|
|
|
/// Dispatch all pending actions in creation order. Called before IMAP fetch.
|
|
/// Reuses a single IMAP connection for all IMAP actions in the batch.
|
|
public func flush() async {
|
|
guard let records = try? store.pendingActions(accountId: accountId),
|
|
!records.isEmpty else { return }
|
|
|
|
// Create one shared IMAP client for all IMAP actions in this flush
|
|
let sharedImapClient = imapClientProvider()
|
|
var imapConnected = false
|
|
|
|
defer {
|
|
if imapConnected {
|
|
Task { try? await sharedImapClient.disconnect() }
|
|
}
|
|
}
|
|
|
|
for record in records {
|
|
guard let action = decodeAction(record) else {
|
|
try? store.deletePendingAction(id: record.id)
|
|
continue
|
|
}
|
|
|
|
do {
|
|
// Connect shared IMAP client on first IMAP action
|
|
if !imapConnected, action.payload.requiresIMAP {
|
|
try await sharedImapClient.connect()
|
|
imapConnected = true
|
|
}
|
|
try await dispatch(action, sharedImapClient: sharedImapClient)
|
|
try store.deletePendingAction(id: record.id)
|
|
} catch {
|
|
var updated = record
|
|
updated.retryCount += 1
|
|
updated.lastError = error.localizedDescription
|
|
if updated.retryCount >= 5 {
|
|
try? store.deletePendingAction(id: record.id)
|
|
} else {
|
|
try? store.updatePendingAction(updated)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// MARK: - Dispatch
|
|
|
|
private func dispatchSingle(_ action: PendingAction) async {
|
|
do {
|
|
try await dispatch(action)
|
|
try store.deletePendingAction(id: action.id)
|
|
} catch {
|
|
guard var record = try? store.pendingAction(id: action.id) else { return }
|
|
record.retryCount += 1
|
|
record.lastError = error.localizedDescription
|
|
if record.retryCount >= 5 {
|
|
try? store.deletePendingAction(id: record.id)
|
|
} else {
|
|
try? store.updatePendingAction(record)
|
|
}
|
|
}
|
|
}
|
|
|
|
private func dispatch(_ action: PendingAction, sharedImapClient: (any IMAPClientProtocol)? = nil) async throws {
|
|
switch action.payload {
|
|
case .setFlags(let uid, let mailbox, let add, let remove):
|
|
let client = sharedImapClient ?? imapClientProvider()
|
|
if sharedImapClient == nil { try await client.connect() }
|
|
defer { if sharedImapClient == nil { Task { try? await client.disconnect() } } }
|
|
try await client.storeFlags(uid: uid, mailbox: mailbox, add: add, remove: remove)
|
|
|
|
case .move(let uid, let from, let to):
|
|
let client = sharedImapClient ?? imapClientProvider()
|
|
if sharedImapClient == nil { try await client.connect() }
|
|
defer { if sharedImapClient == nil { Task { try? await client.disconnect() } } }
|
|
try await client.moveMessage(uid: uid, from: from, to: to)
|
|
|
|
case .delete(let uid, let mailbox, let trashMailbox):
|
|
let client = sharedImapClient ?? imapClientProvider()
|
|
if sharedImapClient == nil { try await client.connect() }
|
|
defer { if sharedImapClient == nil { Task { try? await client.disconnect() } } }
|
|
if mailbox == trashMailbox {
|
|
try await client.storeFlags(uid: uid, mailbox: mailbox, add: ["\\Deleted"], remove: [])
|
|
try await client.expunge(mailbox: mailbox)
|
|
} else {
|
|
try await client.moveMessage(uid: uid, from: mailbox, to: trashMailbox)
|
|
}
|
|
|
|
case .send(let message):
|
|
guard let provider = smtpClientProvider else {
|
|
throw ActionQueueError.noSMTPClient
|
|
}
|
|
let client = provider()
|
|
try await client.send(message: message)
|
|
|
|
case .append(let mailbox, let messageData, let flags):
|
|
let client = sharedImapClient ?? imapClientProvider()
|
|
if sharedImapClient == nil { try await client.connect() }
|
|
defer { if sharedImapClient == nil { Task { try? await client.disconnect() } } }
|
|
guard let data = messageData.data(using: .utf8) else {
|
|
throw ActionQueueError.invalidMessageData
|
|
}
|
|
try await client.appendMessage(to: mailbox, message: data, flags: flags)
|
|
}
|
|
}
|
|
|
|
// MARK: - Serialization
|
|
|
|
func persistAction(_ action: PendingAction) -> PendingActionRecord {
|
|
let encoder = JSONEncoder()
|
|
let payloadJson = (try? encoder.encode(action.payload))
|
|
.flatMap { String(data: $0, encoding: .utf8) } ?? "{}"
|
|
let dateString = iso8601Formatter.string(from: action.createdAt)
|
|
|
|
return PendingActionRecord(
|
|
id: action.id,
|
|
accountId: action.accountId,
|
|
actionType: action.actionType.rawValue,
|
|
payload: payloadJson,
|
|
createdAt: dateString
|
|
)
|
|
}
|
|
|
|
func decodeAction(_ record: PendingActionRecord) -> PendingAction? {
|
|
guard let data = record.payload.data(using: .utf8),
|
|
let payload = try? JSONDecoder().decode(ActionPayload.self, from: data),
|
|
let actionType = ActionType(rawValue: record.actionType) else {
|
|
return nil
|
|
}
|
|
|
|
let formatter = iso8601Formatter
|
|
let date = formatter.date(from: record.createdAt) ?? Date()
|
|
|
|
return PendingAction(
|
|
id: record.id,
|
|
accountId: record.accountId,
|
|
actionType: actionType,
|
|
payload: payload,
|
|
createdAt: date
|
|
)
|
|
}
|
|
}
|
|
|
|
enum ActionQueueError: Error {
|
|
case noSMTPClient
|
|
case invalidMessageData
|
|
}
|