Fixes #4748 - Fixed cached callbacks race condition, serialized all async operations, properly cleaning up callbacks on failure.

This commit is contained in:
Stefan Ceriu
2021-08-25 12:56:03 +03:00
committed by Stefan Ceriu
parent 79c5fe7dab
commit 33396c146b
2 changed files with 48 additions and 20 deletions
@@ -23,6 +23,7 @@ enum VoiceMessageAttachmentCacheManagerError: Error {
case decryptionError(Error)
case preparationError(Error)
case conversionError(Error)
case durationError(Error?)
case invalidNumberOfSamples
case samplingError
}
@@ -52,6 +53,10 @@ struct VoiceMessageAttachmentCacheManagerLoadResult {
class VoiceMessageAttachmentCacheManager {
private struct Constants {
static let taskSemaphoreTimeout = 5.0
}
static let sharedManager = VoiceMessageAttachmentCacheManager()
private var completionCallbacks = [CompletionCallbackKey: [CompletionWrapper]]()
@@ -60,9 +65,12 @@ class VoiceMessageAttachmentCacheManager {
private var finalURLs = [String: URL]()
private let workQueue: DispatchQueue
private let operationQueue: OperationQueue
private init() {
workQueue = DispatchQueue(label: "io.element.VoiceMessageAttachmentCacheManager.queue", qos: .userInitiated)
operationQueue = OperationQueue()
operationQueue.maxConcurrentOperationCount = 1
}
func loadAttachment(_ attachment: MXKAttachment, numberOfSamples: Int, completion: @escaping (Result<VoiceMessageAttachmentCacheManagerLoadResult, Error>) -> Void) {
@@ -81,8 +89,9 @@ class VoiceMessageAttachmentCacheManager {
return
}
workQueue.async {
// Run this in the work queue to preserve order
operationQueue.addOperation {
MXLog.debug("[VoiceMessageAttachmentCacheManager] Started task")
if let finalURL = self.finalURLs[identifier], let duration = self.durations[identifier], let samples = self.samples[identifier]?[numberOfSamples] {
MXLog.debug("[VoiceMessageAttachmentCacheManager] Finished task - using cached results")
let result = VoiceMessageAttachmentCacheManagerLoadResult(eventIdentifier: identifier, url: finalURL, duration: duration, samples: samples)
@@ -97,8 +106,6 @@ class VoiceMessageAttachmentCacheManager {
}
private func enqueueLoadAttachment(_ attachment: MXKAttachment, identifier: String, numberOfSamples: Int, completion: @escaping (Result<VoiceMessageAttachmentCacheManagerLoadResult, Error>) -> Void) {
MXLog.debug("[VoiceMessageAttachmentCacheManager] Started task")
let callbackKey = CompletionCallbackKey(eventIdentifier: identifier, requiredNumberOfSamples: numberOfSamples)
if var callbacks = completionCallbacks[callbackKey] {
@@ -110,8 +117,14 @@ class VoiceMessageAttachmentCacheManager {
completionCallbacks[callbackKey] = [CompletionWrapper(completion)]
}
let semaphore = DispatchSemaphore(value: 0)
if let finalURL = finalURLs[identifier], let duration = durations[identifier] {
sampleFileAtURL(finalURL, duration: duration, numberOfSamples: numberOfSamples, identifier: identifier)
sampleFileAtURL(finalURL, duration: duration, numberOfSamples: numberOfSamples, identifier: identifier, semaphore: semaphore)
let result = semaphore.wait(timeout: .now() + Constants.taskSemaphoreTimeout)
if case DispatchTimeoutResult.timedOut = result {
MXLog.error("[VoiceMessageAttachmentCacheManager] Timed out waiting for tasks to finish.")
}
return
}
@@ -119,32 +132,39 @@ class VoiceMessageAttachmentCacheManager {
if attachment.isEncrypted {
attachment.decrypt(toTempFile: { filePath in
self.workQueue.async {
self.convertFileAtPath(filePath, numberOfSamples: numberOfSamples, identifier: identifier)
self.convertFileAtPath(filePath, numberOfSamples: numberOfSamples, identifier: identifier, semaphore: semaphore)
}
}, failure: { error in
// A nil error in this case is a cancellation on the MXMediaLoader
if let error = error {
MXLog.error("Failed decrypting attachment with error: \(String(describing: error))")
self.invokeFailureCallbacksForIdentifier(identifier, error: VoiceMessageAttachmentCacheManagerError.decryptionError(error))
MXLog.error("[VoiceMessageAttachmentCacheManager] Failed decrypting attachment with error: \(String(describing: error))")
self.invokeFailureCallbacksForIdentifier(identifier, requiredNumberOfSamples: numberOfSamples, error: VoiceMessageAttachmentCacheManagerError.decryptionError(error))
}
semaphore.signal()
})
} else {
attachment.prepare({
self.workQueue.async {
self.convertFileAtPath(attachment.cacheFilePath, numberOfSamples: numberOfSamples, identifier: identifier)
self.convertFileAtPath(attachment.cacheFilePath, numberOfSamples: numberOfSamples, identifier: identifier, semaphore: semaphore)
}
}, failure: { error in
// A nil error in this case is a cancellation on the MXMediaLoader
if let error = error {
MXLog.error("Failed preparing attachment with error: \(String(describing: error))")
self.invokeFailureCallbacksForIdentifier(identifier, error: VoiceMessageAttachmentCacheManagerError.preparationError(error))
MXLog.error("[VoiceMessageAttachmentCacheManager] Failed preparing attachment with error: \(String(describing: error))")
self.invokeFailureCallbacksForIdentifier(identifier, requiredNumberOfSamples: numberOfSamples, error: VoiceMessageAttachmentCacheManagerError.preparationError(error))
}
semaphore.signal()
})
}
}
let result = semaphore.wait(timeout: .now() + Constants.taskSemaphoreTimeout)
if case DispatchTimeoutResult.timedOut = result {
MXLog.error("[VoiceMessageAttachmentCacheManager] Timed out waiting for tasks to finish.")
}
}
private func convertFileAtPath(_ path: String?, numberOfSamples: Int, identifier: String) {
private func convertFileAtPath(_ path: String?, numberOfSamples: Int, identifier: String, semaphore: DispatchSemaphore) {
guard let filePath = path else {
return
}
@@ -153,10 +173,10 @@ class VoiceMessageAttachmentCacheManager {
let newURL = temporaryDirectoryURL.appendingPathComponent(ProcessInfo().globallyUniqueString).appendingPathExtension("m4a")
VoiceMessageAudioConverter.convertToMPEG4AAC(sourceURL: URL(fileURLWithPath: filePath), destinationURL: newURL) { result in
MXLog.debug("[VoiceMessageAttachmentCacheManager] Finished converting voice message")
self.workQueue.async {
switch result {
case .success:
MXLog.debug("[VoiceMessageAttachmentCacheManager] Finished converting voice message")
self.finalURLs[identifier] = newURL
VoiceMessageAudioConverter.mediaDurationAt(newURL) { result in
@@ -167,31 +187,37 @@ class VoiceMessageAttachmentCacheManager {
case .success:
if let duration = try? result.get() {
self.durations[identifier] = duration
self.sampleFileAtURL(newURL, duration: duration, numberOfSamples: numberOfSamples, identifier: identifier)
self.sampleFileAtURL(newURL, duration: duration, numberOfSamples: numberOfSamples, identifier: identifier, semaphore: semaphore)
} else {
MXLog.error("[VoiceMessageAttachmentCacheManager] Failed retrieving media duration")
self.invokeFailureCallbacksForIdentifier(identifier, requiredNumberOfSamples: numberOfSamples, error: VoiceMessageAttachmentCacheManagerError.durationError(nil))
semaphore.signal()
}
case .failure(let error):
MXLog.error("[VoiceMessageAttachmentCacheManager] Failed retrieving audio duration with error: \(error)")
self.invokeFailureCallbacksForIdentifier(identifier, requiredNumberOfSamples: numberOfSamples, error: VoiceMessageAttachmentCacheManagerError.durationError(error))
semaphore.signal()
}
}
}
case .failure(let error):
MXLog.error("[VoiceMessageAttachmentCacheManager] Failed decoding audio message with error: \(error)")
self.invokeFailureCallbacksForIdentifier(identifier, error: VoiceMessageAttachmentCacheManagerError.conversionError(error))
MXLog.error("[VoiceMessageAttachmentCacheManager] Failed converting voice message with error: \(error)")
self.invokeFailureCallbacksForIdentifier(identifier, requiredNumberOfSamples: numberOfSamples, error: VoiceMessageAttachmentCacheManagerError.conversionError(error))
semaphore.signal()
}
}
}
}
private func sampleFileAtURL(_ url: URL, duration: TimeInterval, numberOfSamples: Int, identifier: String) {
private func sampleFileAtURL(_ url: URL, duration: TimeInterval, numberOfSamples: Int, identifier: String, semaphore: DispatchSemaphore) {
let analyser = WaveformAnalyzer(audioAssetURL: url)
analyser?.samples(count: numberOfSamples, completionHandler: { samples in
self.workQueue.async {
guard let samples = samples else {
MXLog.debug("[VoiceMessageAttachmentCacheManager] Failed sampling voice message")
self.invokeFailureCallbacksForIdentifier(identifier, error: VoiceMessageAttachmentCacheManagerError.samplingError)
self.invokeFailureCallbacksForIdentifier(identifier, requiredNumberOfSamples: numberOfSamples, error: VoiceMessageAttachmentCacheManagerError.samplingError)
semaphore.signal()
return
}
@@ -205,6 +231,7 @@ class VoiceMessageAttachmentCacheManager {
}
self.invokeSuccessCallbacksForIdentifier(identifier, url: url, duration: duration, samples: samples)
semaphore.signal()
}
})
}
@@ -230,8 +257,8 @@ class VoiceMessageAttachmentCacheManager {
MXLog.debug("[VoiceMessageAttachmentCacheManager] Successfully finished task")
}
private func invokeFailureCallbacksForIdentifier(_ identifier: String, error: Error) {
let callbackKey = CompletionCallbackKey(eventIdentifier: identifier, requiredNumberOfSamples: samples.count)
private func invokeFailureCallbacksForIdentifier(_ identifier: String, requiredNumberOfSamples: Int, error: Error) {
let callbackKey = CompletionCallbackKey(eventIdentifier: identifier, requiredNumberOfSamples: requiredNumberOfSamples)
guard let callbacks = completionCallbacks[callbackKey] else {
return