- insertMessages uses INSERT OR IGNORE on (mailboxId, uid) conflict instead of crashing on UNIQUE constraint when messages are re-fetched after restart - IMAPResponseHandler.sendCommand resumes any leaked previous continuation before registering a new one, preventing DuplicateCommandTag errors - add channelInactive handler to resume pending continuations on connection drop - add error type to sync failure log for better diagnostics Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
97 lines
2.8 KiB
Swift
97 lines
2.8 KiB
Swift
import NIO
|
|
import NIOIMAPCore
|
|
import NIOIMAP
|
|
|
|
// NIO handlers are confined to their event loop; @unchecked Sendable is the
|
|
// standard pattern for crossing the actor/event-loop boundary.
|
|
final class IMAPResponseHandler: ChannelInboundHandler, RemovableChannelHandler, @unchecked Sendable {
|
|
typealias InboundIn = Response
|
|
|
|
private var buffer: [Response] = []
|
|
private var expectedTag: String?
|
|
private var continuation: CheckedContinuation<[Response], Error>?
|
|
private var greetingContinuation: CheckedContinuation<Void, Error>?
|
|
|
|
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
|
|
let response = unwrapInboundIn(data)
|
|
buffer.append(response)
|
|
|
|
switch response {
|
|
case .untagged(let payload):
|
|
if case .conditionalState(let status) = payload, greetingContinuation != nil {
|
|
switch status {
|
|
case .ok:
|
|
greetingContinuation?.resume()
|
|
greetingContinuation = nil
|
|
case .preauth:
|
|
greetingContinuation?.resume()
|
|
greetingContinuation = nil
|
|
case .bye(let text):
|
|
let error = IMAPError.serverError("BYE: \(text)")
|
|
greetingContinuation?.resume(throwing: error)
|
|
greetingContinuation = nil
|
|
default:
|
|
break
|
|
}
|
|
}
|
|
case .tagged(let tagged):
|
|
if tagged.tag == expectedTag {
|
|
let collected = buffer
|
|
buffer = []
|
|
expectedTag = nil
|
|
continuation?.resume(returning: collected)
|
|
continuation = nil
|
|
}
|
|
case .fatal(let text):
|
|
let error = IMAPError.serverError("FATAL: \(text)")
|
|
continuation?.resume(throwing: error)
|
|
continuation = nil
|
|
greetingContinuation?.resume(throwing: error)
|
|
greetingContinuation = nil
|
|
case .fetch, .authenticationChallenge, .idleStarted:
|
|
break
|
|
}
|
|
}
|
|
|
|
func channelInactive(context: ChannelHandlerContext) {
|
|
let error = IMAPError.notConnected
|
|
continuation?.resume(throwing: error)
|
|
continuation = nil
|
|
greetingContinuation?.resume(throwing: error)
|
|
greetingContinuation = nil
|
|
context.fireChannelInactive()
|
|
}
|
|
|
|
func errorCaught(context: ChannelHandlerContext, error: Error) {
|
|
continuation?.resume(throwing: error)
|
|
continuation = nil
|
|
greetingContinuation?.resume(throwing: error)
|
|
greetingContinuation = nil
|
|
context.close(promise: nil)
|
|
}
|
|
|
|
func waitForGreeting() async throws {
|
|
try await withCheckedThrowingContinuation { cont in
|
|
greetingContinuation = cont
|
|
}
|
|
}
|
|
|
|
func sendCommand(tag: String, continuation cont: CheckedContinuation<[Response], Error>) {
|
|
// Resume any leaked continuation from a previous command to avoid
|
|
// "SWIFT TASK CONTINUATION MISUSE: leaked its continuation"
|
|
if let old = continuation {
|
|
old.resume(throwing: IMAPError.serverError("Previous command interrupted"))
|
|
}
|
|
expectedTag = tag
|
|
continuation = cont
|
|
buffer = []
|
|
}
|
|
}
|
|
|
|
public enum IMAPError: Error, Sendable {
|
|
case notConnected
|
|
case serverError(String)
|
|
case authenticationFailed
|
|
case unexpectedResponse(String)
|
|
}
|