add sync coordinator: imap → mailstore pipeline with delta sync
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1 +0,0 @@
|
||||
enum SyncEnginePlaceholder {}
|
||||
187
Packages/MagnumOpusCore/Sources/SyncEngine/SyncCoordinator.swift
Normal file
187
Packages/MagnumOpusCore/Sources/SyncEngine/SyncCoordinator.swift
Normal file
@@ -0,0 +1,187 @@
|
||||
import Foundation
|
||||
import Models
|
||||
import IMAPClient
|
||||
import MailStore
|
||||
|
||||
@Observable
|
||||
@MainActor
|
||||
public final class SyncCoordinator {
|
||||
private let accountConfig: AccountConfig
|
||||
private let imapClient: any IMAPClientProtocol
|
||||
private let store: MailStore
|
||||
private var syncTask: Task<Void, Never>?
|
||||
|
||||
public private(set) var syncState: SyncState = .idle
|
||||
private var eventHandlers: [(SyncEvent) -> Void] = []
|
||||
|
||||
public init(accountConfig: AccountConfig, imapClient: any IMAPClientProtocol, store: MailStore) {
|
||||
self.accountConfig = accountConfig
|
||||
self.imapClient = imapClient
|
||||
self.store = store
|
||||
}
|
||||
|
||||
public func onEvent(_ handler: @escaping (SyncEvent) -> Void) {
|
||||
eventHandlers.append(handler)
|
||||
}
|
||||
|
||||
private func emit(_ event: SyncEvent) {
|
||||
for handler in eventHandlers {
|
||||
handler(event)
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - Sync
|
||||
|
||||
public func syncNow() async throws {
|
||||
syncState = .syncing(mailbox: nil)
|
||||
emit(.syncStarted)
|
||||
|
||||
do {
|
||||
try await performSync()
|
||||
syncState = .idle
|
||||
emit(.syncCompleted)
|
||||
} catch {
|
||||
syncState = .error(error.localizedDescription)
|
||||
emit(.syncFailed(error.localizedDescription))
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
private func performSync() async throws {
|
||||
// Ensure account exists in DB
|
||||
let existingAccounts = try store.accounts()
|
||||
if !existingAccounts.contains(where: { $0.id == accountConfig.id }) {
|
||||
try store.insertAccount(AccountRecord(
|
||||
id: accountConfig.id,
|
||||
name: accountConfig.name,
|
||||
email: accountConfig.email,
|
||||
imapHost: accountConfig.imapHost,
|
||||
imapPort: accountConfig.imapPort
|
||||
))
|
||||
}
|
||||
|
||||
try await imapClient.connect()
|
||||
do {
|
||||
try await syncAllMailboxes()
|
||||
} catch {
|
||||
try? await imapClient.disconnect()
|
||||
throw error
|
||||
}
|
||||
try? await imapClient.disconnect()
|
||||
}
|
||||
|
||||
private func syncAllMailboxes() async throws {
|
||||
let remoteMailboxes = try await imapClient.listMailboxes()
|
||||
for remoteMailbox in remoteMailboxes {
|
||||
syncState = .syncing(mailbox: remoteMailbox.name)
|
||||
try await syncMailbox(remoteMailbox)
|
||||
}
|
||||
}
|
||||
|
||||
private func syncMailbox(_ remoteMailbox: IMAPMailboxInfo) async throws {
|
||||
let status = try await imapClient.selectMailbox(remoteMailbox.name)
|
||||
|
||||
let localMailboxes = try store.mailboxes(accountId: accountConfig.id)
|
||||
let localMailbox = localMailboxes.first { $0.name == remoteMailbox.name }
|
||||
|
||||
let mailboxId: String
|
||||
let lastUid: Int
|
||||
|
||||
if let local = localMailbox {
|
||||
mailboxId = local.id
|
||||
if local.uidValidity != status.uidValidity {
|
||||
// UID validity changed — re-fetch everything
|
||||
lastUid = 0
|
||||
} else {
|
||||
lastUid = local.uidNext - 1
|
||||
}
|
||||
} else {
|
||||
mailboxId = UUID().uuidString
|
||||
try store.upsertMailbox(MailboxRecord(
|
||||
id: mailboxId,
|
||||
accountId: accountConfig.id,
|
||||
name: remoteMailbox.name,
|
||||
uidValidity: status.uidValidity,
|
||||
uidNext: status.uidNext
|
||||
))
|
||||
lastUid = 0
|
||||
}
|
||||
|
||||
let envelopes = try await imapClient.fetchEnvelopes(uidsGreaterThan: lastUid)
|
||||
|
||||
if !envelopes.isEmpty {
|
||||
let records = envelopes.map { envelope -> MessageRecord in
|
||||
envelopeToRecord(envelope, accountId: accountConfig.id, mailboxId: mailboxId)
|
||||
}
|
||||
try store.insertMessages(records)
|
||||
|
||||
let reconstructor = ThreadReconstructor(store: store)
|
||||
try reconstructor.processMessages(records)
|
||||
|
||||
emit(.newMessages(count: envelopes.count, mailbox: remoteMailbox.name))
|
||||
}
|
||||
|
||||
try store.updateMailboxSync(
|
||||
id: mailboxId,
|
||||
uidValidity: status.uidValidity,
|
||||
uidNext: status.uidNext
|
||||
)
|
||||
}
|
||||
|
||||
private func envelopeToRecord(
|
||||
_ envelope: FetchedEnvelope, accountId: String, mailboxId: String
|
||||
) -> MessageRecord {
|
||||
let toJson = encodeAddresses(envelope.to)
|
||||
let ccJson = encodeAddresses(envelope.cc)
|
||||
return MessageRecord(
|
||||
id: UUID().uuidString,
|
||||
accountId: accountId,
|
||||
mailboxId: mailboxId,
|
||||
uid: envelope.uid,
|
||||
messageId: envelope.messageId,
|
||||
inReplyTo: envelope.inReplyTo,
|
||||
refs: envelope.references,
|
||||
subject: envelope.subject,
|
||||
fromAddress: envelope.from?.address,
|
||||
fromName: envelope.from?.name,
|
||||
toAddresses: toJson,
|
||||
ccAddresses: ccJson,
|
||||
date: envelope.date,
|
||||
snippet: envelope.snippet,
|
||||
bodyText: envelope.bodyText,
|
||||
bodyHtml: envelope.bodyHtml,
|
||||
isRead: envelope.isRead,
|
||||
isFlagged: envelope.isFlagged,
|
||||
size: envelope.size
|
||||
)
|
||||
}
|
||||
|
||||
private func encodeAddresses(_ addresses: [EmailAddress]) -> String? {
|
||||
guard !addresses.isEmpty else { return nil }
|
||||
struct Addr: Codable { var name: String?; var address: String }
|
||||
let addrs = addresses.map { Addr(name: $0.name, address: $0.address) }
|
||||
guard let data = try? JSONEncoder().encode(addrs) else { return nil }
|
||||
return String(data: data, encoding: .utf8)
|
||||
}
|
||||
|
||||
// MARK: - Periodic Sync
|
||||
|
||||
public func startPeriodicSync(interval: Duration = .seconds(300)) {
|
||||
stopSync()
|
||||
syncTask = Task { [weak self] in
|
||||
while !Task.isCancelled {
|
||||
try? await self?.syncNow()
|
||||
do {
|
||||
try await Task.sleep(for: interval)
|
||||
} catch {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public func stopSync() {
|
||||
syncTask?.cancel()
|
||||
syncTask = nil
|
||||
}
|
||||
}
|
||||
@@ -5,6 +5,8 @@ final class MockIMAPClient: IMAPClientProtocol, @unchecked Sendable {
|
||||
var mailboxes: [IMAPMailboxInfo] = []
|
||||
var mailboxStatuses: [String: IMAPMailboxStatus] = [:]
|
||||
var envelopes: [FetchedEnvelope] = []
|
||||
/// Per-mailbox envelopes — takes precedence over flat `envelopes` when set
|
||||
var mailboxEnvelopes: [String: [FetchedEnvelope]] = [:]
|
||||
var flagUpdates: [UIDFlagsPair] = []
|
||||
var bodies: [Int: (text: String?, html: String?)] = [:]
|
||||
|
||||
@@ -33,7 +35,13 @@ final class MockIMAPClient: IMAPClientProtocol, @unchecked Sendable {
|
||||
}
|
||||
|
||||
func fetchEnvelopes(uidsGreaterThan uid: Int) async throws -> [FetchedEnvelope] {
|
||||
envelopes.filter { $0.uid > uid }
|
||||
let source: [FetchedEnvelope]
|
||||
if let mailbox = selectedMailbox, let perMailbox = mailboxEnvelopes[mailbox] {
|
||||
source = perMailbox
|
||||
} else {
|
||||
source = envelopes
|
||||
}
|
||||
return source.filter { $0.uid > uid }
|
||||
}
|
||||
|
||||
func fetchFlags(uids: ClosedRange<Int>) async throws -> [UIDFlagsPair] {
|
||||
|
||||
@@ -0,0 +1,140 @@
|
||||
import Testing
|
||||
import GRDB
|
||||
@testable import SyncEngine
|
||||
@testable import IMAPClient
|
||||
@testable import MailStore
|
||||
@testable import Models
|
||||
|
||||
@Suite("SyncCoordinator")
|
||||
@MainActor
|
||||
struct SyncCoordinatorTests {
|
||||
func makeStore() throws -> MailStore {
|
||||
try MailStore(dbWriter: DatabaseSetup.openInMemoryDatabase())
|
||||
}
|
||||
|
||||
func makeMock() -> MockIMAPClient {
|
||||
let mock = MockIMAPClient()
|
||||
mock.mailboxes = [
|
||||
IMAPMailboxInfo(name: "INBOX"),
|
||||
IMAPMailboxInfo(name: "Sent"),
|
||||
]
|
||||
mock.mailboxStatuses = [
|
||||
"INBOX": IMAPMailboxStatus(name: "INBOX", uidValidity: 1, uidNext: 3, messageCount: 2, recentCount: 0),
|
||||
"Sent": IMAPMailboxStatus(name: "Sent", uidValidity: 1, uidNext: 1, messageCount: 0, recentCount: 0),
|
||||
]
|
||||
mock.mailboxEnvelopes = [
|
||||
"INBOX": [
|
||||
FetchedEnvelope(
|
||||
uid: 1, messageId: "msg001@example.com", inReplyTo: nil, references: nil,
|
||||
subject: "Hello", from: EmailAddress(name: "Alice", address: "alice@example.com"),
|
||||
to: [EmailAddress(address: "me@example.com")], cc: [],
|
||||
date: "2024-03-08T10:00:00Z", snippet: "Hi there",
|
||||
bodyText: nil, bodyHtml: nil, isRead: false, isFlagged: false, size: 1024
|
||||
),
|
||||
FetchedEnvelope(
|
||||
uid: 2, messageId: "msg002@example.com", inReplyTo: "msg001@example.com",
|
||||
references: "msg001@example.com",
|
||||
subject: "Re: Hello", from: EmailAddress(name: "Bob", address: "bob@example.com"),
|
||||
to: [EmailAddress(address: "alice@example.com")], cc: [],
|
||||
date: "2024-03-08T11:00:00Z", snippet: "Hey!",
|
||||
bodyText: nil, bodyHtml: nil, isRead: true, isFlagged: false, size: 512
|
||||
),
|
||||
],
|
||||
"Sent": [],
|
||||
]
|
||||
return mock
|
||||
}
|
||||
|
||||
@Test("full sync creates account, mailboxes, messages, and threads")
|
||||
func fullSync() async throws {
|
||||
let store = try makeStore()
|
||||
let mock = makeMock()
|
||||
let coordinator = SyncCoordinator(
|
||||
accountConfig: AccountConfig(
|
||||
id: "acc1", name: "Personal", email: "me@example.com",
|
||||
imapHost: "imap.example.com", imapPort: 993
|
||||
),
|
||||
imapClient: mock,
|
||||
store: store
|
||||
)
|
||||
|
||||
try await coordinator.syncNow()
|
||||
|
||||
// Account created
|
||||
let accounts = try store.accounts()
|
||||
#expect(accounts.count == 1)
|
||||
|
||||
// Mailboxes created
|
||||
let mailboxes = try store.mailboxes(accountId: "acc1")
|
||||
#expect(mailboxes.count == 2)
|
||||
|
||||
// Messages stored
|
||||
let inboxMb = mailboxes.first { $0.name == "INBOX" }!
|
||||
let messages = try store.messages(mailboxId: inboxMb.id)
|
||||
#expect(messages.count == 2)
|
||||
|
||||
// Threads created (msg002 replies to msg001, so 1 thread)
|
||||
let threads = try store.threads(accountId: "acc1")
|
||||
#expect(threads.count == 1)
|
||||
#expect(threads[0].messageCount == 2)
|
||||
|
||||
// uidNext updated
|
||||
let updatedMb = try store.mailbox(id: inboxMb.id)
|
||||
#expect(updatedMb?.uidNext == 3)
|
||||
|
||||
// IMAP client was connected and disconnected
|
||||
#expect(mock.connectCalled)
|
||||
#expect(mock.disconnectCalled)
|
||||
}
|
||||
|
||||
@Test("delta sync only fetches new messages")
|
||||
func deltaSync() async throws {
|
||||
let store = try makeStore()
|
||||
let mock = makeMock()
|
||||
let config = AccountConfig(
|
||||
id: "acc1", name: "Personal", email: "me@example.com",
|
||||
imapHost: "imap.example.com", imapPort: 993
|
||||
)
|
||||
let coordinator = SyncCoordinator(accountConfig: config, imapClient: mock, store: store)
|
||||
|
||||
// First sync
|
||||
try await coordinator.syncNow()
|
||||
|
||||
// Add a new message for delta sync
|
||||
mock.mailboxEnvelopes["INBOX"]!.append(FetchedEnvelope(
|
||||
uid: 3, messageId: "msg003@example.com", inReplyTo: nil, references: nil,
|
||||
subject: "New message", from: EmailAddress(name: "Charlie", address: "charlie@example.com"),
|
||||
to: [EmailAddress(address: "me@example.com")], cc: [],
|
||||
date: "2024-03-09T10:00:00Z", snippet: "Something new",
|
||||
bodyText: nil, bodyHtml: nil, isRead: false, isFlagged: false, size: 256
|
||||
))
|
||||
mock.mailboxStatuses["INBOX"] = IMAPMailboxStatus(
|
||||
name: "INBOX", uidValidity: 1, uidNext: 4, messageCount: 3, recentCount: 1
|
||||
)
|
||||
|
||||
// Second sync — should only fetch uid > 2
|
||||
try await coordinator.syncNow()
|
||||
|
||||
let inboxMb = try store.mailboxes(accountId: "acc1").first { $0.name == "INBOX" }!
|
||||
let messages = try store.messages(mailboxId: inboxMb.id)
|
||||
#expect(messages.count == 3)
|
||||
}
|
||||
|
||||
@Test("sync state transitions through syncing to idle")
|
||||
func syncStateTransitions() async throws {
|
||||
let store = try makeStore()
|
||||
let mock = makeMock()
|
||||
let coordinator = SyncCoordinator(
|
||||
accountConfig: AccountConfig(
|
||||
id: "acc1", name: "Personal", email: "me@example.com",
|
||||
imapHost: "imap.example.com", imapPort: 993
|
||||
),
|
||||
imapClient: mock,
|
||||
store: store
|
||||
)
|
||||
|
||||
#expect(coordinator.syncState == .idle)
|
||||
try await coordinator.syncNow()
|
||||
#expect(coordinator.syncState == .idle)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user