add IMAPIdleClient actor with IDLE loop, reconnect backoff, re-IDLE timer
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
214
Packages/MagnumOpusCore/Sources/IMAPClient/IMAPIdleClient.swift
Normal file
214
Packages/MagnumOpusCore/Sources/IMAPClient/IMAPIdleClient.swift
Normal file
@@ -0,0 +1,214 @@
|
||||
import Foundation
|
||||
import NIO
|
||||
import NIOIMAPCore
|
||||
@preconcurrency import NIOIMAP
|
||||
@preconcurrency import NIOSSL
|
||||
import Models
|
||||
|
||||
public actor IMAPIdleClient {
|
||||
private let host: String
|
||||
private let port: Int
|
||||
private let credentials: Credentials
|
||||
private var channel: Channel?
|
||||
private var group: EventLoopGroup?
|
||||
private var isMonitoring = false
|
||||
private var monitorTask: Task<Void, Never>?
|
||||
private let reIdleInterval: Duration = .seconds(29 * 60) // 29 minutes per RFC 2177
|
||||
|
||||
public init(host: String, port: Int, credentials: Credentials) {
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.credentials = credentials
|
||||
}
|
||||
|
||||
/// Start monitoring INBOX via IMAP IDLE. Calls onNewMail when server sends EXISTS.
|
||||
public func startMonitoring(onNewMail: @escaping @Sendable () -> Void) async throws {
|
||||
guard !isMonitoring else { return }
|
||||
isMonitoring = true
|
||||
|
||||
try await connectAndLogin()
|
||||
try await selectInbox()
|
||||
|
||||
monitorTask = Task { [weak self] in
|
||||
var backoffSeconds: UInt64 = 5
|
||||
while !Task.isCancelled {
|
||||
guard let self else { break }
|
||||
do {
|
||||
try await self.idleLoop(onNewMail: onNewMail)
|
||||
} catch {
|
||||
if Task.isCancelled { break }
|
||||
// Reconnect with exponential backoff
|
||||
try? await Task.sleep(for: .seconds(Int(backoffSeconds)))
|
||||
backoffSeconds = min(backoffSeconds * 2, 300) // cap at 5 min
|
||||
do {
|
||||
try await self.connectAndLogin()
|
||||
try await self.selectInbox()
|
||||
backoffSeconds = 5 // reset on success
|
||||
} catch {
|
||||
if Task.isCancelled { break }
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Stop monitoring and disconnect.
|
||||
public func stopMonitoring() async {
|
||||
isMonitoring = false
|
||||
monitorTask?.cancel()
|
||||
monitorTask = nil
|
||||
try? await channel?.close()
|
||||
channel = nil
|
||||
try? await group?.shutdownGracefully()
|
||||
group = nil
|
||||
}
|
||||
|
||||
// MARK: - Connection
|
||||
|
||||
private func connectAndLogin() async throws {
|
||||
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
||||
let sslContext = try NIOSSLContext(configuration: TLSConfiguration.makeClientConfiguration())
|
||||
let hostname = host
|
||||
|
||||
let responseHandler = IMAPResponseHandler()
|
||||
|
||||
let bootstrap = ClientBootstrap(group: eventLoopGroup)
|
||||
.channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
|
||||
.channelInitializer { channel in
|
||||
let sslHandler = try! NIOSSLClientHandler(context: sslContext, serverHostname: hostname)
|
||||
return channel.pipeline.addHandlers([
|
||||
sslHandler,
|
||||
IMAPClientHandler(),
|
||||
responseHandler,
|
||||
])
|
||||
}
|
||||
|
||||
let chan = try await bootstrap.connect(host: host, port: port).get()
|
||||
try await responseHandler.waitForGreeting()
|
||||
|
||||
// Login
|
||||
var tagCounter = 0
|
||||
func nextTag() -> String {
|
||||
tagCounter += 1
|
||||
return "IDLE\(tagCounter)"
|
||||
}
|
||||
|
||||
let loginTag = nextTag()
|
||||
let loginCommand = TaggedCommand(
|
||||
tag: loginTag,
|
||||
command: .login(username: credentials.username, password: credentials.password)
|
||||
)
|
||||
let loginResponses = try await withCheckedThrowingContinuation { (cont: CheckedContinuation<[Response], Error>) in
|
||||
responseHandler.sendCommand(tag: loginTag, continuation: cont)
|
||||
chan.writeAndFlush(IMAPClientHandler.Message.part(.tagged(loginCommand)), promise: nil)
|
||||
}
|
||||
guard loginResponses.contains(where: { isOK($0) }) else {
|
||||
throw IMAPError.authenticationFailed
|
||||
}
|
||||
|
||||
self.channel = chan
|
||||
self.group = eventLoopGroup
|
||||
}
|
||||
|
||||
private func selectInbox() async throws {
|
||||
guard let channel else { throw IMAPError.notConnected }
|
||||
|
||||
let responseHandler = try await getResponseHandler()
|
||||
let tag = "IDLESEL1"
|
||||
let selectCommand = TaggedCommand(
|
||||
tag: tag,
|
||||
command: .select(MailboxName(ByteBuffer(string: "INBOX")))
|
||||
)
|
||||
let responses = try await withCheckedThrowingContinuation { (cont: CheckedContinuation<[Response], Error>) in
|
||||
responseHandler.sendCommand(tag: tag, continuation: cont)
|
||||
channel.writeAndFlush(IMAPClientHandler.Message.part(.tagged(selectCommand)), promise: nil)
|
||||
}
|
||||
guard responses.contains(where: { isOK($0) }) else {
|
||||
throw IMAPError.unexpectedResponse("SELECT INBOX failed")
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - IDLE Loop
|
||||
|
||||
private func idleLoop(onNewMail: @escaping @Sendable () -> Void) async throws {
|
||||
guard let channel else { throw IMAPError.notConnected }
|
||||
let pipeline = channel.pipeline
|
||||
|
||||
// Iterative IDLE loop — avoids unbounded stack growth from recursion
|
||||
while !Task.isCancelled {
|
||||
// Swap response handler for IDLE handler
|
||||
let (stream, streamContinuation) = AsyncStream<IMAPIdleEvent>.makeStream()
|
||||
let idleHandler = IMAPIdleHandler(continuation: streamContinuation)
|
||||
|
||||
let oldHandler = try await getResponseHandler()
|
||||
try await pipeline.removeHandler(oldHandler).get()
|
||||
try await pipeline.addHandler(idleHandler).get()
|
||||
|
||||
let idleTag = "IDLE1"
|
||||
idleHandler.setIdleTag(idleTag)
|
||||
|
||||
// Send IDLE command
|
||||
let idleCommand = TaggedCommand(tag: idleTag, command: .idleStart)
|
||||
channel.writeAndFlush(IMAPClientHandler.Message.part(.tagged(idleCommand)), promise: nil)
|
||||
|
||||
// Set up re-IDLE timer — after 29 min, send DONE to break IDLE (RFC 2177)
|
||||
let reIdleChannel = channel
|
||||
let reIdleTask = Task {
|
||||
try await Task.sleep(for: reIdleInterval)
|
||||
// Timer fired — break IDLE so the outer loop re-enters
|
||||
reIdleChannel.writeAndFlush(IMAPClientHandler.Message.part(.idleDone), promise: nil)
|
||||
}
|
||||
|
||||
var shouldReIdle = false
|
||||
|
||||
// Consume events
|
||||
for await event in stream {
|
||||
switch event {
|
||||
case .exists:
|
||||
// New mail — break IDLE, trigger sync
|
||||
channel.writeAndFlush(IMAPClientHandler.Message.part(.idleDone), promise: nil)
|
||||
for await innerEvent in stream {
|
||||
if case .idleTerminated = innerEvent { break }
|
||||
}
|
||||
reIdleTask.cancel()
|
||||
onNewMail()
|
||||
shouldReIdle = true
|
||||
|
||||
case .expunge:
|
||||
break
|
||||
|
||||
case .idleTerminated:
|
||||
reIdleTask.cancel()
|
||||
shouldReIdle = true
|
||||
}
|
||||
if shouldReIdle { break }
|
||||
}
|
||||
|
||||
// Restore response handler before re-entering IDLE
|
||||
try await pipeline.removeHandler(idleHandler).get()
|
||||
let newResponseHandler = IMAPResponseHandler()
|
||||
try await pipeline.addHandler(newResponseHandler).get()
|
||||
|
||||
if !shouldReIdle {
|
||||
// Stream ended — connection dropped
|
||||
throw IMAPError.serverError("IDLE connection dropped")
|
||||
}
|
||||
// Otherwise: loop back to re-enter IDLE
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - Helpers
|
||||
|
||||
private func getResponseHandler() async throws -> IMAPResponseHandler {
|
||||
guard let channel else { throw IMAPError.notConnected }
|
||||
return try await channel.pipeline.handler(type: IMAPResponseHandler.self).get()
|
||||
}
|
||||
|
||||
private func isOK(_ response: Response) -> Bool {
|
||||
if case .tagged(let tagged) = response {
|
||||
if case .ok = tagged.state { return true }
|
||||
}
|
||||
return false
|
||||
}
|
||||
}
|
||||
@@ -12,7 +12,7 @@ public enum IMAPIdleEvent: Sendable {
|
||||
/// NIO ChannelInboundHandler that processes untagged responses during IMAP IDLE.
|
||||
/// Unlike the standard IMAPResponseHandler (which uses CheckedContinuation for tagged responses),
|
||||
/// this handler uses AsyncStream to deliver a continuous stream of events.
|
||||
final class IMAPIdleHandler: ChannelInboundHandler, @unchecked Sendable {
|
||||
final class IMAPIdleHandler: ChannelInboundHandler, RemovableChannelHandler, @unchecked Sendable {
|
||||
typealias InboundIn = Response
|
||||
|
||||
private let continuation: AsyncStream<IMAPIdleEvent>.Continuation
|
||||
|
||||
@@ -4,7 +4,7 @@ import NIOIMAP
|
||||
|
||||
// NIO handlers are confined to their event loop; @unchecked Sendable is the
|
||||
// standard pattern for crossing the actor/event-loop boundary.
|
||||
final class IMAPResponseHandler: ChannelInboundHandler, @unchecked Sendable {
|
||||
final class IMAPResponseHandler: ChannelInboundHandler, RemovableChannelHandler, @unchecked Sendable {
|
||||
typealias InboundIn = Response
|
||||
|
||||
private var buffer: [Response] = []
|
||||
|
||||
@@ -0,0 +1,37 @@
|
||||
import Testing
|
||||
import Foundation
|
||||
@testable import IMAPClient
|
||||
import Models
|
||||
|
||||
@Suite("IMAPIdleClient")
|
||||
struct IMAPIdleClientTests {
|
||||
|
||||
@Test("IMAPIdleClient can be initialized")
|
||||
func initialization() {
|
||||
let client = IMAPIdleClient(
|
||||
host: "imap.example.com",
|
||||
port: 993,
|
||||
credentials: Credentials(username: "user", password: "pass")
|
||||
)
|
||||
// Just verify it compiles and initializes — actual IDLE testing
|
||||
// requires a real server or more sophisticated mocking
|
||||
#expect(true)
|
||||
}
|
||||
|
||||
@Test("IMAPIdleEvent cases exist")
|
||||
func eventCases() {
|
||||
let exists = IMAPIdleEvent.exists(42)
|
||||
let expunge = IMAPIdleEvent.expunge(1)
|
||||
let terminated = IMAPIdleEvent.idleTerminated
|
||||
|
||||
if case .exists(let count) = exists {
|
||||
#expect(count == 42)
|
||||
}
|
||||
if case .expunge(let num) = expunge {
|
||||
#expect(num == 1)
|
||||
}
|
||||
if case .idleTerminated = terminated {
|
||||
#expect(true)
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user