Files
MagnumOpus/Packages/MagnumOpusCore/Sources/IMAPClient/IMAPResponseHandler.swift
Felix Förtsch ff91e397e8 fix sync: INSERT OR IGNORE for duplicate messages, fix leaked IMAP continuations
- insertMessages uses INSERT OR IGNORE on (mailboxId, uid) conflict instead of
  crashing on UNIQUE constraint when messages are re-fetched after restart
- IMAPResponseHandler.sendCommand resumes any leaked previous continuation before
  registering a new one, preventing DuplicateCommandTag errors
- add channelInactive handler to resume pending continuations on connection drop
- add error type to sync failure log for better diagnostics

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-15 11:58:29 +01:00

97 lines
2.8 KiB
Swift

import NIO
import NIOIMAPCore
import NIOIMAP
// NIO handlers are confined to their event loop; @unchecked Sendable is the
// standard pattern for crossing the actor/event-loop boundary.
final class IMAPResponseHandler: ChannelInboundHandler, RemovableChannelHandler, @unchecked Sendable {
typealias InboundIn = Response
private var buffer: [Response] = []
private var expectedTag: String?
private var continuation: CheckedContinuation<[Response], Error>?
private var greetingContinuation: CheckedContinuation<Void, Error>?
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let response = unwrapInboundIn(data)
buffer.append(response)
switch response {
case .untagged(let payload):
if case .conditionalState(let status) = payload, greetingContinuation != nil {
switch status {
case .ok:
greetingContinuation?.resume()
greetingContinuation = nil
case .preauth:
greetingContinuation?.resume()
greetingContinuation = nil
case .bye(let text):
let error = IMAPError.serverError("BYE: \(text)")
greetingContinuation?.resume(throwing: error)
greetingContinuation = nil
default:
break
}
}
case .tagged(let tagged):
if tagged.tag == expectedTag {
let collected = buffer
buffer = []
expectedTag = nil
continuation?.resume(returning: collected)
continuation = nil
}
case .fatal(let text):
let error = IMAPError.serverError("FATAL: \(text)")
continuation?.resume(throwing: error)
continuation = nil
greetingContinuation?.resume(throwing: error)
greetingContinuation = nil
case .fetch, .authenticationChallenge, .idleStarted:
break
}
}
func channelInactive(context: ChannelHandlerContext) {
let error = IMAPError.notConnected
continuation?.resume(throwing: error)
continuation = nil
greetingContinuation?.resume(throwing: error)
greetingContinuation = nil
context.fireChannelInactive()
}
func errorCaught(context: ChannelHandlerContext, error: Error) {
continuation?.resume(throwing: error)
continuation = nil
greetingContinuation?.resume(throwing: error)
greetingContinuation = nil
context.close(promise: nil)
}
func waitForGreeting() async throws {
try await withCheckedThrowingContinuation { cont in
greetingContinuation = cont
}
}
func sendCommand(tag: String, continuation cont: CheckedContinuation<[Response], Error>) {
// Resume any leaked continuation from a previous command to avoid
// "SWIFT TASK CONTINUATION MISUSE: leaked its continuation"
if let old = continuation {
old.resume(throwing: IMAPError.serverError("Previous command interrupted"))
}
expectedTag = tag
continuation = cont
buffer = []
}
}
public enum IMAPError: Error, Sendable {
case notConnected
case serverError(String)
case authenticationFailed
case unexpectedResponse(String)
}