add nio connection layer: tls bootstrap, response handler, command runner
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,21 @@
|
||||
import NIOIMAPCore
|
||||
|
||||
struct IMAPCommandRunner {
|
||||
private let connection: IMAPConnection
|
||||
private var tagCounter: Int = 0
|
||||
|
||||
init(connection: IMAPConnection) {
|
||||
self.connection = connection
|
||||
}
|
||||
|
||||
mutating func nextTag() -> String {
|
||||
tagCounter += 1
|
||||
return "A\(tagCounter)"
|
||||
}
|
||||
|
||||
mutating func run(_ command: Command) async throws -> [Response] {
|
||||
let tag = nextTag()
|
||||
let tagged = TaggedCommand(tag: tag, command: command)
|
||||
return try await connection.sendCommand(tag, command: .tagged(tagged))
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,57 @@
|
||||
import NIO
|
||||
import NIOIMAPCore
|
||||
@preconcurrency import NIOIMAP
|
||||
@preconcurrency import NIOSSL
|
||||
|
||||
actor IMAPConnection {
|
||||
private let host: String
|
||||
private let port: Int
|
||||
private let group: EventLoopGroup
|
||||
private var channel: Channel?
|
||||
private let responseHandler: IMAPResponseHandler
|
||||
|
||||
init(host: String, port: Int) {
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
||||
self.responseHandler = IMAPResponseHandler()
|
||||
}
|
||||
|
||||
func connect() async throws {
|
||||
let sslContext = try NIOSSLContext(configuration: TLSConfiguration.makeClientConfiguration())
|
||||
let handler = responseHandler
|
||||
let hostname = host
|
||||
|
||||
let bootstrap = ClientBootstrap(group: group)
|
||||
.channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
|
||||
.channelInitializer { channel in
|
||||
let sslHandler = try! NIOSSLClientHandler(context: sslContext, serverHostname: hostname)
|
||||
return channel.pipeline.addHandlers([
|
||||
sslHandler,
|
||||
IMAPClientHandler(),
|
||||
handler,
|
||||
])
|
||||
}
|
||||
|
||||
channel = try await bootstrap.connect(host: host, port: port).get()
|
||||
try await handler.waitForGreeting()
|
||||
}
|
||||
|
||||
func sendCommand(_ tag: String, command: CommandStreamPart) async throws -> [Response] {
|
||||
guard let channel else { throw IMAPError.notConnected }
|
||||
let handler = responseHandler
|
||||
return try await withCheckedThrowingContinuation { continuation in
|
||||
handler.sendCommand(tag: tag, continuation: continuation)
|
||||
channel.writeAndFlush(IMAPClientHandler.Message.part(command), promise: nil)
|
||||
}
|
||||
}
|
||||
|
||||
func disconnect() async throws {
|
||||
try await channel?.close()
|
||||
channel = nil
|
||||
}
|
||||
|
||||
func shutdown() async throws {
|
||||
try await group.shutdownGracefully()
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,82 @@
|
||||
import NIO
|
||||
import NIOIMAPCore
|
||||
import NIOIMAP
|
||||
|
||||
// NIO handlers are confined to their event loop; @unchecked Sendable is the
|
||||
// standard pattern for crossing the actor/event-loop boundary.
|
||||
final class IMAPResponseHandler: ChannelInboundHandler, @unchecked Sendable {
|
||||
typealias InboundIn = Response
|
||||
|
||||
private var buffer: [Response] = []
|
||||
private var expectedTag: String?
|
||||
private var continuation: CheckedContinuation<[Response], Error>?
|
||||
private var greetingContinuation: CheckedContinuation<Void, Error>?
|
||||
|
||||
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
|
||||
let response = unwrapInboundIn(data)
|
||||
buffer.append(response)
|
||||
|
||||
switch response {
|
||||
case .untagged(let payload):
|
||||
if case .conditionalState(let status) = payload, greetingContinuation != nil {
|
||||
switch status {
|
||||
case .ok:
|
||||
greetingContinuation?.resume()
|
||||
greetingContinuation = nil
|
||||
case .preauth:
|
||||
greetingContinuation?.resume()
|
||||
greetingContinuation = nil
|
||||
case .bye(let text):
|
||||
let error = IMAPError.serverError("BYE: \(text)")
|
||||
greetingContinuation?.resume(throwing: error)
|
||||
greetingContinuation = nil
|
||||
default:
|
||||
break
|
||||
}
|
||||
}
|
||||
case .tagged(let tagged):
|
||||
if tagged.tag == expectedTag {
|
||||
let collected = buffer
|
||||
buffer = []
|
||||
expectedTag = nil
|
||||
continuation?.resume(returning: collected)
|
||||
continuation = nil
|
||||
}
|
||||
case .fatal(let text):
|
||||
let error = IMAPError.serverError("FATAL: \(text)")
|
||||
continuation?.resume(throwing: error)
|
||||
continuation = nil
|
||||
greetingContinuation?.resume(throwing: error)
|
||||
greetingContinuation = nil
|
||||
case .fetch, .authenticationChallenge, .idleStarted:
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
func errorCaught(context: ChannelHandlerContext, error: Error) {
|
||||
continuation?.resume(throwing: error)
|
||||
continuation = nil
|
||||
greetingContinuation?.resume(throwing: error)
|
||||
greetingContinuation = nil
|
||||
context.close(promise: nil)
|
||||
}
|
||||
|
||||
func waitForGreeting() async throws {
|
||||
try await withCheckedThrowingContinuation { cont in
|
||||
greetingContinuation = cont
|
||||
}
|
||||
}
|
||||
|
||||
func sendCommand(tag: String, continuation cont: CheckedContinuation<[Response], Error>) {
|
||||
expectedTag = tag
|
||||
continuation = cont
|
||||
buffer = []
|
||||
}
|
||||
}
|
||||
|
||||
public enum IMAPError: Error, Sendable {
|
||||
case notConnected
|
||||
case serverError(String)
|
||||
case authenticationFailed
|
||||
case unexpectedResponse(String)
|
||||
}
|
||||
Reference in New Issue
Block a user