Skip to content

Commit c0ad49e

Browse files
authored
Various fixes for HBByteBufferStreamer (#34)
Renamed dropped -> isFInished Set isFinished after feeding error or end Don't add new promise on until we are sure everything is good Both consumeAll functions now use whenComplete instead of map
1 parent a3f48ed commit c0ad49e

File tree

1 file changed

+25
-18
lines changed

1 file changed

+25
-18
lines changed

Sources/HummingbirdCore/Request/ByteBufferStreamer.swift

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,8 @@ public final class HBByteBufferStreamer: HBStreamerProtocol {
6969
var currentSize: Int
7070
/// bytes fed to streamer so far
7171
var sizeFed: Int
72-
/// has request streamer data been dropped
73-
var dropped: Bool
72+
/// is request streamer finished
73+
var isFinished: Bool
7474

7575
public init(eventLoop: EventLoop, maxSize: Int, maxStreamingBufferSize: Int? = nil) {
7676
self.queue = .init()
@@ -82,7 +82,7 @@ public final class HBByteBufferStreamer: HBStreamerProtocol {
8282
self.maxSize = maxSize
8383
self.maxStreamingBufferSize = maxStreamingBufferSize ?? maxSize
8484
self.onConsume = nil
85-
self.dropped = false
85+
self.isFinished = false
8686
}
8787

8888
/// Feed a ByteBuffer to the request, while applying back pressure
@@ -134,24 +134,27 @@ public final class HBByteBufferStreamer: HBStreamerProtocol {
134134

135135
switch result {
136136
case .byteBuffer(let byteBuffer):
137-
// don't add more ByteBuffers to queue if we are dropped
138-
guard self.dropped == false else { return }
139-
140-
self.queue.append(self.eventLoop.makePromise())
137+
// don't add more ByteBuffers to queue if we are finished
138+
guard self.isFinished == false else { return }
141139

142140
self.sizeFed += byteBuffer.readableBytes
143141
self.currentSize += byteBuffer.readableBytes
144142
if self.currentSize > self.maxStreamingBufferSize {
145143
self.backPressurePromise = self.eventLoop.makePromise()
146144
}
147145
if self.sizeFed > self.maxSize {
146+
self.isFinished = true
148147
promise.fail(HBHTTPError(.payloadTooLarge))
149148
} else {
149+
self.queue.append(self.eventLoop.makePromise())
150150
promise.succeed(.byteBuffer(byteBuffer))
151151
}
152152
case .error(let error):
153+
self.isFinished = true
153154
promise.fail(error)
154155
case .end:
156+
guard self.isFinished == false else { return }
157+
self.isFinished = true
155158
promise.succeed(.end)
156159
}
157160
}
@@ -174,9 +177,9 @@ public final class HBByteBufferStreamer: HBStreamerProtocol {
174177
public func consumeAll(on eventLoop: EventLoop, _ process: @escaping (ByteBuffer) -> EventLoopFuture<Void>) -> EventLoopFuture<Void> {
175178
let promise = self.eventLoop.makePromise(of: Void.self)
176179
func _consumeAll() {
177-
self.consume().map { output in
178-
switch output {
179-
case .byteBuffer(let buffer):
180+
self.consume().whenComplete { result in
181+
switch result {
182+
case .success(.byteBuffer(let buffer)):
180183
process(buffer).whenComplete { result in
181184
switch result {
182185
case .failure(let error):
@@ -186,11 +189,13 @@ public final class HBByteBufferStreamer: HBStreamerProtocol {
186189
}
187190
}
188191

189-
case .end:
192+
case .success(.end):
190193
promise.succeed(())
194+
195+
case .failure(let error):
196+
promise.fail(error)
191197
}
192198
}
193-
.cascadeFailure(to: promise)
194199
}
195200
self.eventLoop.execute {
196201
_consumeAll()
@@ -204,7 +209,7 @@ public final class HBByteBufferStreamer: HBStreamerProtocol {
204209
/// - eventLoop: EventLoop to run on
205210
func drop() -> EventLoopFuture<Void> {
206211
self.eventLoop.assertInEventLoop()
207-
self.dropped = true
212+
self.isFinished = true
208213

209214
let promise = self.eventLoop.makePromise(of: Void.self)
210215
func _dropAll() {
@@ -258,21 +263,23 @@ public final class HBByteBufferStreamer: HBStreamerProtocol {
258263
let promise = self.eventLoop.makePromise(of: ByteBuffer?.self)
259264
var completeBuffer: ByteBuffer?
260265
func _consumeAll() {
261-
self.consume().map { output in
262-
switch output {
263-
case .byteBuffer(var buffer):
266+
self.consume().whenComplete { result in
267+
switch result {
268+
case .success(.byteBuffer(var buffer)):
264269
if completeBuffer != nil {
265270
completeBuffer!.writeBuffer(&buffer)
266271
} else {
267272
completeBuffer = buffer
268273
}
269274
_consumeAll()
270275

271-
case .end:
276+
case .success(.end):
272277
promise.succeed(completeBuffer)
278+
279+
case .failure(let error):
280+
promise.fail(error)
273281
}
274282
}
275-
.cascadeFailure(to: promise)
276283
}
277284
_consumeAll()
278285
return promise.futureResult

0 commit comments

Comments
 (0)