Skip to content

Commit b982d12

Browse files
authored
Merge pull request #31 from Recouse/concurrency-improvements
Concurrency improvements
2 parents 4231ad5 + 1423d27 commit b982d12

File tree

3 files changed

+216
-57
lines changed

3 files changed

+216
-57
lines changed

README.md

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ It also leverages Swift concurrency features to provide a more expressive and in
1616
- [x] Simple Swift API for SSE
1717
- [x] Supports data-only mode
1818
- [x] Data race safety with Swift 6
19-
- [ ] Broadcast event stream to multiple consumers (WIP)
2019

2120
## Installation
2221

@@ -57,9 +56,9 @@ Using EventSource is easy. Simply create a new data task from an instance of Eve
5756
import EventSource
5857

5958
let eventSource = EventSource()
60-
let dataTask = await eventSource.dataTask(for: urlRequest)
59+
let dataTask = eventSource.dataTask(for: urlRequest)
6160

62-
for await event in await dataTask.events() {
61+
for await event in dataTask.events() {
6362
switch event {
6463
case .open:
6564
print("Connection was opened.")
@@ -96,11 +95,11 @@ urlRequest.httpBody = """
9695
""".data(using: .utf8)!
9796

9897
let eventSource = EventSource(mode: .dataOnly)
99-
let dataTask = await eventSource.dataTask(for: urlRequest)
98+
let dataTask = eventSource.dataTask(for: urlRequest)
10099

101100
var response: String = ""
102101

103-
for await event in await dataTask.events() {
102+
for await event in dataTask.events() {
104103
switch event {
105104
case .event(let event):
106105
if let data = eventDevent.data?.data(using: .utf8) {
@@ -132,6 +131,10 @@ No dependencies.
132131

133132
Contributions to are always welcomed! For more details see [CONTRIBUTING.md](CONTRIBUTING.md).
134133

134+
## Credits
135+
136+
* Mutex backport from [swift-sharing](https://github.com/pointfreeco/swift-sharing)
137+
135138
## License
136139

137140
EventSource is released under the MIT License. See [LICENSE](LICENSE) for more information.

Sources/EventSource/EventSource.swift

Lines changed: 101 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,6 @@ import Foundation
1111
import FoundationNetworking
1212
#endif
1313

14-
/// The global actor used for isolating ``EventSource/EventSource/DataTask``.
15-
@globalActor public actor EventSourceActor: GlobalActor {
16-
public static let shared = EventSourceActor()
17-
}
18-
1914
///
2015
/// An `EventSource` instance opens a persistent connection to an HTTP server,
2116
/// which sends events in `text/event-stream` format.
@@ -63,7 +58,6 @@ public struct EventSource: Sendable {
6358
self.timeoutInterval = timeoutInterval
6459
}
6560

66-
@EventSourceActor
6761
public func dataTask(for urlRequest: URLRequest) -> DataTask {
6862
DataTask(
6963
urlRequest: urlRequest,
@@ -79,29 +73,70 @@ public extension EventSource {
7973
/// Creation of a task is exclusively handled by ``EventSource``. A new task can be created by calling
8074
/// ``EventSource/EventSource/dataTask(for:)`` method on the EventSource instance. After creating a task,
8175
/// it can be started by iterating event stream returned by ``DataTask/events()``.
82-
@EventSourceActor final class DataTask {
76+
final class DataTask: Sendable {
77+
private let _readyState: Mutex<ReadyState> = Mutex(.none)
78+
8379
/// A value representing the state of the connection.
84-
public private(set) var readyState: ReadyState = .none
80+
public var readyState: ReadyState {
81+
get {
82+
_readyState.withLock { $0 }
83+
}
84+
set {
85+
_readyState.withLock { $0 = newValue }
86+
}
87+
}
88+
89+
private let _lastMessageId: Mutex<String> = Mutex("")
8590

8691
/// Last event's ID string value.
8792
///
8893
/// Sent in a HTTP request header and used when a user is to reestablish the connection.
89-
public private(set) var lastMessageId: String = ""
94+
public var lastMessageId: String {
95+
get {
96+
_lastMessageId.withLock { $0 }
97+
}
98+
set {
99+
_lastMessageId.withLock { $0 = newValue }
100+
}
101+
}
90102

91103
/// A URLRequest of the events source.
92104
public let urlRequest: URLRequest
93105

94-
private var eventParser: EventParser
106+
private let _eventParser: Mutex<EventParser>
107+
108+
private var eventParser: EventParser {
109+
get {
110+
_eventParser.withLock { $0 }
111+
}
112+
set {
113+
_eventParser.withLock { $0 = newValue }
114+
}
115+
}
95116

96117
private let timeoutInterval: TimeInterval
97118

98-
private var continuation: AsyncStream<EventType>.Continuation?
119+
private let _httpResponseErrorStatusCode: Mutex<Int?> = Mutex(nil)
99120

100-
private var urlSession: URLSession?
121+
private var httpResponseErrorStatusCode: Int? {
122+
get {
123+
_httpResponseErrorStatusCode.withLock { $0 }
124+
}
125+
set {
126+
_httpResponseErrorStatusCode.withLock { $0 = newValue }
127+
}
128+
}
101129

102-
private var urlSessionDataTask: URLSessionDataTask?
130+
private let _consumed: Mutex<Bool> = Mutex(false)
103131

104-
private var httpResponseErrorStatusCode: Int?
132+
private var consumed: Bool {
133+
get {
134+
_consumed.withLock { $0 }
135+
}
136+
set {
137+
_consumed.withLock { $0 = newValue }
138+
}
139+
}
105140

106141
private var urlSessionConfiguration: URLSessionConfiguration {
107142
let configuration = URLSessionConfiguration.default
@@ -121,13 +156,13 @@ public extension EventSource {
121156
timeoutInterval: TimeInterval
122157
) {
123158
self.urlRequest = urlRequest
124-
self.eventParser = eventParser
159+
self._eventParser = Mutex(eventParser)
125160
self.timeoutInterval = timeoutInterval
126161
}
127162

128163
/// Creates and returns event stream.
129164
public func events() -> AsyncStream<EventType> {
130-
if urlSessionDataTask != nil {
165+
if consumed {
131166
return AsyncStream { continuation in
132167
continuation.yield(.error(EventSourceError.alreadyConsumed))
133168
continuation.finish()
@@ -136,67 +171,76 @@ public extension EventSource {
136171

137172
return AsyncStream { continuation in
138173
let sessionDelegate = SessionDelegate()
174+
let urlSession = URLSession(
175+
configuration: urlSessionConfiguration,
176+
delegate: sessionDelegate,
177+
delegateQueue: nil
178+
)
179+
let urlSessionDataTask = urlSession.dataTask(with: urlRequest)
180+
139181
let sessionDelegateTask = Task { [weak self] in
140182
for await event in sessionDelegate.eventStream {
141183
guard let self else { return }
142184

143185
switch event {
144186
case let .didCompleteWithError(error):
145-
handleSessionError(error)
187+
handleSessionError(error, stream: continuation, urlSession: urlSession)
146188
case let .didReceiveResponse(response, completionHandler):
147-
handleSessionResponse(response, completionHandler: completionHandler)
189+
handleSessionResponse(
190+
response,
191+
stream: continuation,
192+
urlSession: urlSession,
193+
completionHandler: completionHandler
194+
)
148195
case let .didReceiveData(data):
149-
parseMessages(from: data)
196+
parseMessages(from: data, stream: continuation, urlSession: urlSession)
150197
}
151198
}
152199
}
153200

154201
#if compiler(>=6.0)
155202
continuation.onTermination = { @Sendable [weak self] _ in
156203
sessionDelegateTask.cancel()
157-
Task { await self?.close() }
204+
Task { self?.close(stream: continuation, urlSession: urlSession) }
158205
}
159206
#else
160207
continuation.onTermination = { @Sendable _ in
161208
sessionDelegateTask.cancel()
162209
Task { [weak self] in
163-
await self?.close()
210+
await self?.close(stream: continuation, urlSession: urlSession)
164211
}
165212
}
166213
#endif
167214

168-
self.continuation = continuation
169-
170-
171-
urlSession = URLSession(
172-
configuration: urlSessionConfiguration,
173-
delegate: sessionDelegate,
174-
delegateQueue: nil
175-
)
176-
177-
urlSessionDataTask = urlSession!.dataTask(with: urlRequest)
178-
urlSessionDataTask!.resume()
215+
urlSessionDataTask.resume()
179216
readyState = .connecting
217+
consumed = true
180218
}
181219
}
182220

183-
private func handleSessionError(_ error: Error?) {
221+
private func handleSessionError(
222+
_ error: Error?,
223+
stream continuation: AsyncStream<EventType>.Continuation,
224+
urlSession: URLSession
225+
) {
184226
guard readyState != .closed else {
185-
close()
227+
close(stream: continuation, urlSession: urlSession)
186228
return
187229
}
188230

189231
// Send error event
190232
if let error {
191-
sendErrorEvent(with: error)
233+
sendErrorEvent(with: error, stream: continuation)
192234
}
193235

194236
// Close connection
195-
close()
237+
close(stream: continuation, urlSession: urlSession)
196238
}
197239

198240
private func handleSessionResponse(
199241
_ response: URLResponse,
242+
stream continuation: AsyncStream<EventType>.Continuation,
243+
urlSession: URLSession,
200244
completionHandler: @escaping (URLSession.ResponseDisposition) -> Void
201245
) {
202246
guard readyState != .closed else {
@@ -212,13 +256,13 @@ public extension EventSource {
212256
// Stop connection when 204 response code, otherwise keep open
213257
guard httpResponse.statusCode != 204 else {
214258
completionHandler(.cancel)
215-
close()
259+
close(stream: continuation, urlSession: urlSession)
216260
return
217261
}
218262

219263
if 200...299 ~= httpResponse.statusCode {
220264
if readyState != .open {
221-
setOpen()
265+
setOpen(stream: continuation)
222266
}
223267
} else {
224268
httpResponseErrorStatusCode = httpResponse.statusCode
@@ -230,20 +274,26 @@ public extension EventSource {
230274
/// Closes the connection, if one was made,
231275
/// and sets the `readyState` property to `.closed`.
232276
/// - Returns: State before closing.
233-
private func close() {
277+
private func close(stream continuation: AsyncStream<EventType>.Continuation, urlSession: URLSession) {
234278
let previousState = self.readyState
235279
if previousState != .closed {
236-
continuation?.yield(.closed)
237-
continuation?.finish()
280+
continuation.yield(.closed)
281+
continuation.finish()
238282
}
239-
cancel()
283+
cancel(urlSession: urlSession)
240284
}
241285

242-
private func parseMessages(from data: Data) {
286+
private func parseMessages(
287+
from data: Data,
288+
stream continuation: AsyncStream<EventType>.Continuation,
289+
urlSession: URLSession
290+
) {
243291
if let httpResponseErrorStatusCode {
244292
self.httpResponseErrorStatusCode = nil
245293
handleSessionError(
246-
EventSourceError.connectionError(statusCode: httpResponseErrorStatusCode, response: data)
294+
EventSourceError.connectionError(statusCode: httpResponseErrorStatusCode, response: data),
295+
stream: continuation,
296+
urlSession: urlSession
247297
)
248298
return
249299
}
@@ -256,17 +306,17 @@ public extension EventSource {
256306
}
257307

258308
events.forEach {
259-
continuation?.yield(.event($0))
309+
continuation.yield(.event($0))
260310
}
261311
}
262312

263-
private func setOpen() {
313+
private func setOpen(stream continuation: AsyncStream<EventType>.Continuation) {
264314
readyState = .open
265-
continuation?.yield(.open)
315+
continuation.yield(.open)
266316
}
267317

268-
private func sendErrorEvent(with error: Error) {
269-
continuation?.yield(.error(error))
318+
private func sendErrorEvent(with error: Error, stream continuation: AsyncStream<EventType>.Continuation) {
319+
continuation.yield(.error(error))
270320
}
271321

272322
/// Cancels the task.
@@ -275,11 +325,10 @@ public extension EventSource {
275325
/// The event stream supports cooperative task cancellation. However, it should be noted that
276326
/// canceling the parent Task only cancels the underlying `URLSessionDataTask` of
277327
/// ``EventSource/EventSource/DataTask``; this does not actually stop the ongoing request.
278-
public func cancel() {
328+
public func cancel(urlSession: URLSession) {
279329
readyState = .closed
280330
lastMessageId = ""
281-
urlSessionDataTask?.cancel()
282-
urlSession?.invalidateAndCancel()
331+
urlSession.invalidateAndCancel()
283332
}
284333
}
285334
}

0 commit comments

Comments
 (0)