- detect and re-fetch bodies containing unparsed MIME content (boundary markers, Content-Transfer-Encoding headers) from pre-MIMEParser code path - fix MIMEParser section numbering: pass cumulative sectionPrefix in nested multiparts instead of resetting to empty string - generate snippet from parsed body text when envelope snippet is missing - add pendingAction(id:) direct lookup to MailStore, avoid re-fetching all actions - add updateSnippet method to MailStore - fix IMAPIdleClient.selectInbox: use incrementing tag counter instead of hardcoded tag - use static nonisolated(unsafe) ISO8601DateFormatter in ActionQueue (avoid repeated alloc) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
220 lines
6.6 KiB
Swift
220 lines
6.6 KiB
Swift
import Foundation
|
|
import NIO
|
|
import NIOIMAPCore
|
|
import NIOIMAP
|
|
import NIOSSL
|
|
import Models
|
|
|
|
public actor IMAPIdleClient {
|
|
private let host: String
|
|
private let port: Int
|
|
private let credentials: Credentials
|
|
private var channel: Channel?
|
|
private var group: EventLoopGroup?
|
|
private var isMonitoring = false
|
|
private var monitorTask: Task<Void, Never>?
|
|
private var tagCounter = 0
|
|
private let reIdleInterval: Duration = .seconds(29 * 60) // 29 minutes per RFC 2177
|
|
|
|
public init(host: String, port: Int, credentials: Credentials) {
|
|
self.host = host
|
|
self.port = port
|
|
self.credentials = credentials
|
|
}
|
|
|
|
/// Start monitoring INBOX via IMAP IDLE. Calls onNewMail when server sends EXISTS.
|
|
public func startMonitoring(onNewMail: @escaping @Sendable () -> Void) async throws {
|
|
guard !isMonitoring else { return }
|
|
isMonitoring = true
|
|
|
|
try await connectAndLogin()
|
|
try await selectInbox()
|
|
|
|
monitorTask = Task {
|
|
var backoffSeconds: UInt64 = 5
|
|
while !Task.isCancelled {
|
|
do {
|
|
try await self.idleLoop(onNewMail: onNewMail)
|
|
} catch {
|
|
if Task.isCancelled { break }
|
|
// Reconnect with exponential backoff
|
|
try? await Task.sleep(for: .seconds(Int(backoffSeconds)))
|
|
backoffSeconds = min(backoffSeconds * 2, 300) // cap at 5 min
|
|
do {
|
|
try await self.connectAndLogin()
|
|
try await self.selectInbox()
|
|
backoffSeconds = 5 // reset on success
|
|
} catch {
|
|
if Task.isCancelled { break }
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Stop monitoring and disconnect.
|
|
public func stopMonitoring() async {
|
|
isMonitoring = false
|
|
monitorTask?.cancel()
|
|
monitorTask = nil
|
|
try? await channel?.close()
|
|
channel = nil
|
|
try? await group?.shutdownGracefully()
|
|
group = nil
|
|
}
|
|
|
|
// MARK: - Connection
|
|
|
|
private func connectAndLogin() async throws {
|
|
// Clean up previous connection if reconnecting
|
|
try? await channel?.close()
|
|
channel = nil
|
|
try? await group?.shutdownGracefully()
|
|
group = nil
|
|
|
|
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
|
let sslContext = try NIOSSLContext(configuration: TLSConfiguration.makeClientConfiguration())
|
|
let hostname = host
|
|
|
|
let responseHandler = IMAPResponseHandler()
|
|
|
|
nonisolated(unsafe) let sslHandler = try NIOSSLClientHandler(context: sslContext, serverHostname: hostname)
|
|
let bootstrap = ClientBootstrap(group: eventLoopGroup)
|
|
.channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
|
|
.channelInitializer { channel in
|
|
channel.pipeline.addHandlers([
|
|
sslHandler,
|
|
IMAPClientHandler(),
|
|
responseHandler,
|
|
])
|
|
}
|
|
|
|
let chan = try await bootstrap.connect(host: host, port: port).get()
|
|
try await responseHandler.waitForGreeting()
|
|
|
|
// Login
|
|
var tagCounter = 0
|
|
func nextTag() -> String {
|
|
tagCounter += 1
|
|
return "IDLE\(tagCounter)"
|
|
}
|
|
|
|
let loginTag = nextTag()
|
|
let loginCommand = TaggedCommand(
|
|
tag: loginTag,
|
|
command: .login(username: credentials.username, password: credentials.password)
|
|
)
|
|
let loginResponses = try await withCheckedThrowingContinuation { (cont: CheckedContinuation<[Response], Error>) in
|
|
responseHandler.sendCommand(tag: loginTag, continuation: cont)
|
|
chan.writeAndFlush(IMAPClientHandler.Message.part(.tagged(loginCommand)), promise: nil)
|
|
}
|
|
guard loginResponses.contains(where: { isOK($0) }) else {
|
|
throw IMAPError.authenticationFailed
|
|
}
|
|
|
|
self.channel = chan
|
|
self.group = eventLoopGroup
|
|
}
|
|
|
|
private func selectInbox() async throws {
|
|
guard let channel else { throw IMAPError.notConnected }
|
|
|
|
let responseHandler = try await getResponseHandler()
|
|
tagCounter += 1
|
|
let tag = "IDLESEL\(tagCounter)"
|
|
let selectCommand = TaggedCommand(
|
|
tag: tag,
|
|
command: .select(MailboxName(ByteBuffer(string: "INBOX")))
|
|
)
|
|
let responses = try await withCheckedThrowingContinuation { (cont: CheckedContinuation<[Response], Error>) in
|
|
responseHandler.sendCommand(tag: tag, continuation: cont)
|
|
channel.writeAndFlush(IMAPClientHandler.Message.part(.tagged(selectCommand)), promise: nil)
|
|
}
|
|
guard responses.contains(where: { isOK($0) }) else {
|
|
throw IMAPError.unexpectedResponse("SELECT INBOX failed")
|
|
}
|
|
}
|
|
|
|
// MARK: - IDLE Loop
|
|
|
|
private func idleLoop(onNewMail: @escaping @Sendable () -> Void) async throws {
|
|
guard let channel else { throw IMAPError.notConnected }
|
|
let pipeline = channel.pipeline
|
|
|
|
// Iterative IDLE loop — avoids unbounded stack growth from recursion
|
|
while !Task.isCancelled {
|
|
// Swap response handler for IDLE handler
|
|
let idleTag = "IDLE1"
|
|
let (stream, streamContinuation) = AsyncStream<IMAPIdleEvent>.makeStream()
|
|
let idleHandler = IMAPIdleHandler(continuation: streamContinuation, idleTag: idleTag)
|
|
|
|
let oldHandler = try await getResponseHandler()
|
|
try await pipeline.removeHandler(oldHandler).get()
|
|
try await pipeline.addHandler(idleHandler).get()
|
|
|
|
// Send IDLE command
|
|
let idleCommand = TaggedCommand(tag: idleTag, command: .idleStart)
|
|
channel.writeAndFlush(IMAPClientHandler.Message.part(.tagged(idleCommand)), promise: nil)
|
|
|
|
// Set up re-IDLE timer — after 29 min, send DONE to break IDLE (RFC 2177)
|
|
let reIdleChannel = channel
|
|
let reIdleTask = Task {
|
|
try await Task.sleep(for: reIdleInterval)
|
|
// Timer fired — break IDLE so the outer loop re-enters
|
|
reIdleChannel.writeAndFlush(IMAPClientHandler.Message.part(.idleDone), promise: nil)
|
|
}
|
|
|
|
var shouldReIdle = false
|
|
|
|
// Consume events
|
|
for await event in stream {
|
|
switch event {
|
|
case .exists:
|
|
// New mail — break IDLE, trigger sync
|
|
channel.writeAndFlush(IMAPClientHandler.Message.part(.idleDone), promise: nil)
|
|
for await innerEvent in stream {
|
|
if case .idleTerminated = innerEvent { break }
|
|
}
|
|
reIdleTask.cancel()
|
|
onNewMail()
|
|
shouldReIdle = true
|
|
|
|
case .expunge:
|
|
break
|
|
|
|
case .idleTerminated:
|
|
reIdleTask.cancel()
|
|
shouldReIdle = true
|
|
}
|
|
if shouldReIdle { break }
|
|
}
|
|
|
|
// Restore response handler before re-entering IDLE
|
|
try await pipeline.removeHandler(idleHandler).get()
|
|
let newResponseHandler = IMAPResponseHandler()
|
|
try await pipeline.addHandler(newResponseHandler).get()
|
|
|
|
if !shouldReIdle {
|
|
// Stream ended — connection dropped
|
|
throw IMAPError.serverError("IDLE connection dropped")
|
|
}
|
|
// Otherwise: loop back to re-enter IDLE
|
|
}
|
|
}
|
|
|
|
// MARK: - Helpers
|
|
|
|
private func getResponseHandler() async throws -> IMAPResponseHandler {
|
|
guard let channel else { throw IMAPError.notConnected }
|
|
return try await channel.pipeline.handler(type: IMAPResponseHandler.self).get()
|
|
}
|
|
|
|
private func isOK(_ response: Response) -> Bool {
|
|
if case .tagged(let tagged) = response {
|
|
if case .ok = tagged.state { return true }
|
|
}
|
|
return false
|
|
}
|
|
}
|