Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ public class CipherSubscriber implements Subscriber<ByteBuffer> {
private Cipher cipher;
private final Long contentLength;
private boolean isLastPart;
private int tagLength;
private boolean onCompleteCalled = false;

private byte[] outputBuffer;

Expand All @@ -27,6 +29,16 @@ public class CipherSubscriber implements Subscriber<ByteBuffer> {
this.contentLength = contentLength;
cipher = materials.getCipher(iv);
this.isLastPart = isLastPart;

// Determine the tag length based on the cipher algorithm.
// This class uses the tag length to identify the end of the stream before the onComplete signal is sent.
if (cipher.getAlgorithm().contains("GCM")) {
tagLength = 16;
} else if (cipher.getAlgorithm().contains("CBC") || cipher.getAlgorithm().contains("CTR")) {
tagLength = 0;
} else {
throw new IllegalArgumentException("Unsupported cipher type: " + cipher.getAlgorithm());
}
}

CipherSubscriber(Subscriber<? super ByteBuffer> wrappedSubscriber, Long contentLength, CryptographicMaterials materials, byte[] iv) {
Expand All @@ -46,20 +58,48 @@ public void onNext(ByteBuffer byteBuffer) {
if (amountToReadFromByteBuffer > 0) {
byte[] buf = BinaryUtils.copyBytesFrom(byteBuffer, amountToReadFromByteBuffer);
outputBuffer = cipher.update(buf, 0, amountToReadFromByteBuffer);

if (outputBuffer == null || outputBuffer.length == 0) {
// The underlying data is too short to fill in the block cipher.
// Note that while the JCE Javadoc specifies that the outputBuffer is null in this case,
// in practice SunJCE and ACCP return an empty buffer instead, hence checks for
// null OR length == 0.
if (contentRead.get() == contentLength) {
if (contentRead.get() + tagLength >= contentLength) {
// All content has been read, so complete to get the final bytes
this.onComplete();
finalBytes();
return;
}
// Otherwise, wait for more bytes. To avoid blocking,
// send an empty buffer to the wrapped subscriber.
wrappedSubscriber.onNext(ByteBuffer.allocate(0));
} else {
wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer));
// Check if stream has read all expected content.
// Once all content has been read, call onComplete.
//
// This class determines that all content has been read by checking if
// the amount of data read so far plus the tag length is at least the content length.
// Once this is true, downstream will never call `request` again
// (beyond the current request that is being responded to in this onNext invocation.)
// As a result, this class can only call `wrappedSubscriber.onNext` one more time.
// (Reactive streams require that downstream sends a `request(n)`
// to indicate it is ready for more data, and upstream responds to that request by calling `onNext`.
// The `n` in request is the maximum number of `onNext` calls that downstream
// will allow upstream to make, and seems to always be 1 for the AsyncBodySubscriber.)
// Since this class can only call `wrappedSubscriber.onNext` once,
// it must send all remaining data in the next onNext call,
// including the result of cipher.doFinal(), if applicable.
// Calling `wrappedSubscriber.onNext` more than once for `request(1)`
// violates the Reactive Streams specification and can cause exceptions downstream.
if (contentRead.get() + tagLength >= contentLength) {
// All content has been read; complete the stream.
// (Signalling onComplete from here is Reactive Streams-spec compliant;
// this class is allowed to call onComplete, even if upstream has not yet signaled onComplete.)
finalBytes();
} else {
// Needs to read more data, so send the data downstream,
// expecting that downstream will continue to request more data.
wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer));
}
}
} else {
// Do nothing
Expand Down Expand Up @@ -91,20 +131,61 @@ public void onError(Throwable t) {

@Override
public void onComplete() {
wrappedSubscriber.onComplete();
}

public void finalBytes() {
// onComplete can be signalled to CipherSubscriber multiple times,
// but additional calls should be deduped to avoid calling onNext multiple times
// and raising exceptions.
if (onCompleteCalled) {
return;
}
onCompleteCalled = true;

// If this isn't the last part, skip doFinal and just send outputBuffer downstream.
// doFinal requires that all parts have been processed to compute the tag,
// so the tag will only be computed when the last part is processed.
if (!isLastPart) {
// If this isn't the last part, skip doFinal, we aren't done
// First, propagate the bytes that were in outputBuffer downstream.
wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer));
// Then, propagate the onComplete signal downstream.
wrappedSubscriber.onComplete();
return;
}

// If this is the last part, compute doFinal and include its result in the value sent downstream.
// The result of doFinal MUST be included with the bytes that were in outputBuffer in the final onNext call.
byte[] finalBytes = null;
try {
outputBuffer = cipher.doFinal();
// Send the final bytes to the wrapped subscriber
wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer));
finalBytes = cipher.doFinal();
} catch (final GeneralSecurityException exception) {
// Even if doFinal fails, downstream still expects to receive the bytes that were in outputBuffer
wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer));
// Forward error, else the wrapped subscriber waits indefinitely
wrappedSubscriber.onError(exception);
// Even though doFinal failed, propagate the onComplete signal downstream
wrappedSubscriber.onComplete();
throw new S3EncryptionClientSecurityException(exception.getMessage(), exception);
}

// Combine the bytes from outputBuffer and finalBytes into one onNext call.
// Downstream has requested one item in its request method, so this class can only call onNext once.
// This single onNext call must contain both the bytes from outputBuffer and the tag.
byte[] combinedBytes;
if (outputBuffer != null && outputBuffer.length > 0 && finalBytes != null && finalBytes.length > 0) {
combinedBytes = new byte[outputBuffer.length + finalBytes.length];
System.arraycopy(outputBuffer, 0, combinedBytes, 0, outputBuffer.length);
System.arraycopy(finalBytes, 0, combinedBytes, outputBuffer.length, finalBytes.length);
} else if (outputBuffer != null && outputBuffer.length > 0) {
combinedBytes = outputBuffer;
} else if (finalBytes != null && finalBytes.length > 0) {
combinedBytes = finalBytes;
} else {
combinedBytes = new byte[0];
}

wrappedSubscriber.onNext(ByteBuffer.wrap(combinedBytes));
wrappedSubscriber.onComplete();
}

Expand Down
Loading