Skip to content

Commit 754a486

Browse files
committed
Add streaming database API support to database service
This introduces the streaming variants of Read/WriteMetadataBlob, along with implementations in the test service, and some tests verifying that the two of them work together. No sealed memory production code is changed in this commit, that will be follow-up work. Bug: b/476476987 Change-Id: I8bb0d9e4d3540551eb0b354a929fd10d6a6a6964
1 parent f3ed3d4 commit 754a486

File tree

5 files changed

+336
-13
lines changed

5 files changed

+336
-13
lines changed

oak_private_memory/BUILD

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -254,15 +254,14 @@ rust_test(
254254
)
255255

256256
rust_test(
257-
name = "search_test",
258-
srcs = ["test/search_test.rs"],
257+
name = "external_db_client_test",
258+
srcs = ["test/external_db_client_test.rs"],
259259
deps = [
260-
"//database",
260+
":external_db_client",
261+
":private_memory_test_database_server_lib",
261262
"//proto:sealed_memory_rust_proto",
262-
"@oak_crates_index//:anyhow",
263-
"@oak_crates_index//:googletest",
264-
"@oak_crates_index//:prost-types",
265-
"@oak_crates_index//:tempfile",
263+
"@oak_crates_index//:tokio",
264+
"@oak_crates_index//:tonic",
266265
],
267266
)
268267

oak_private_memory/proto/database.proto

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,41 @@ message WriteMetadataBlobRequest {
8484

8585
message WriteMetadataBlobResponse {}
8686

87+
message ReadMetadataBlobStreamRequest {
88+
string id = 1;
89+
}
90+
91+
message ReadMetadataBlobStreamResponse {
92+
oneof response {
93+
// The initial response containing the metadata. The blob field in the
94+
// data_blob can be empty or contain the first chunk.
95+
//
96+
// This variant should only be sent once, as the first request. Sending it
97+
// more than once will result in an error.
98+
MetadataBlob metadata_blob = 1;
99+
100+
// Subsequent chunks of the blob. These will be concatenated in the order
101+
// received to construct the final payload. Sending this variant before
102+
// sending the metadata_blob will result in an error.
103+
bytes chunk = 2;
104+
}
105+
}
106+
107+
message WriteMetadataBlobStreamRequest {
108+
oneof request {
109+
// The initial request containing the metadata. The blob field in the
110+
// data_blob can be empty or contain the first chunk.
111+
MetadataBlob metadata_blob = 1;
112+
113+
// Subsequent chunks of the blob. They can be concatenated in the order
114+
// received to construct the final payload. The server will indicate the end
115+
// of the file by sending a half-close.
116+
bytes chunk = 2;
117+
}
118+
}
119+
120+
message WriteMetadataBlobStreamResponse {}
121+
87122
message ResetDatabaseRequest {}
88123

89124
message ResetDatabaseResponse {}
@@ -121,6 +156,26 @@ service SealedMemoryDatabaseService {
121156
rpc ReadMetadataBlob(ReadMetadataBlobRequest)
122157
returns (ReadMetadataBlobResponse) {}
123158

159+
// Write a metadata blob to the database using a stream.
160+
// The first request message must contain the metadata_blob.
161+
// Subsequent request messages should contain chunks of the blob.
162+
//
163+
// There's no requirement on the chunk szies; they can very from
164+
// request-to-request, and be of any size, as llong as it fits in the message
165+
// size limit.
166+
rpc WriteMetadataBlobStream(stream WriteMetadataBlobStreamRequest)
167+
returns (WriteMetadataBlobStreamResponse) {}
168+
169+
// Read a metadata blob from the database using a stream.
170+
// The first response message will contain the metadata_blob.
171+
// Subsequent response messages will contain chunks of the blob.
172+
//
173+
// The returned chunks can be of any size, and the size can vary from
174+
// response-to-response. The client should concatenate them in the order
175+
// received to construct the final payload.
176+
rpc ReadMetadataBlobStream(ReadMetadataBlobStreamRequest)
177+
returns (stream ReadMetadataBlobStreamResponse) {}
178+
124179
// Writes an unencrypted data blob to the database.
125180
rpc WriteUnencryptedDataBlob(WriteUnencryptedDataBlobRequest)
126181
returns (WriteUnencryptedDataBlobResponse) {}

oak_private_memory/src/external_db_client.rs

Lines changed: 124 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,27 @@
1515

1616
use anyhow::Context;
1717
use async_trait::async_trait;
18+
use futures::StreamExt;
1819
use log::info;
1920
use prost::Message;
2021
use sealed_memory_grpc_proto::oak::private_memory::sealed_memory_database_service_client::SealedMemoryDatabaseServiceClient;
2122
use sealed_memory_rust_proto::oak::private_memory::{
2223
DataBlob, DeleteBlobsRequest, EncryptedDataBlob, EncryptedMetadataBlob, MetadataBlob,
2324
ReadDataBlobRequest, ReadMetadataBlobRequest, ReadMetadataBlobResponse,
24-
ReadUnencryptedDataBlobRequest, WriteDataBlobRequest, WriteMetadataBlobRequest,
25-
WriteUnencryptedDataBlobRequest,
25+
ReadMetadataBlobStreamRequest, ReadUnencryptedDataBlobRequest, WriteDataBlobRequest,
26+
WriteMetadataBlobRequest, WriteMetadataBlobStreamRequest, WriteUnencryptedDataBlobRequest,
27+
read_metadata_blob_stream_response, write_metadata_blob_stream_request,
2628
};
2729
use tonic::{Code, transport::Channel};
2830

2931
pub type ExternalDbClient = SealedMemoryDatabaseServiceClient<Channel>;
3032
// The unique id for a opaque blob stored in the disk.
3133
pub type BlobId = String;
3234

35+
// The size of the chunks to use when streaming data to/from the external
36+
// database. 1MB.
37+
const CHUNK_SIZE: usize = 1024 * 1024;
38+
3339
// A non-fatal result from an attempt to persist the metadata db.
3440
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
3541
pub enum MetadataPersistResult {
@@ -68,6 +74,15 @@ pub trait DataBlobHandler {
6874
&mut self,
6975
id: &BlobId,
7076
) -> anyhow::Result<Option<EncryptedMetadataBlob>>;
77+
async fn add_metadata_blob_stream(
78+
&mut self,
79+
id: &BlobId,
80+
metadata_blob: EncryptedMetadataBlob,
81+
) -> anyhow::Result<MetadataPersistResult>;
82+
async fn get_metadata_blob_stream(
83+
&mut self,
84+
id: &BlobId,
85+
) -> anyhow::Result<Option<EncryptedMetadataBlob>>;
7186
async fn get_blobs(
7287
&mut self,
7388
ids: &[BlobId],
@@ -144,6 +159,51 @@ impl DataBlobHandler for ExternalDbClient {
144159
Ok(MetadataPersistResult::Succeeded)
145160
}
146161

162+
async fn add_metadata_blob_stream(
163+
&mut self,
164+
id: &BlobId,
165+
metadata_blob: EncryptedMetadataBlob,
166+
) -> anyhow::Result<MetadataPersistResult> {
167+
let blob =
168+
metadata_blob.encrypted_data_blob.as_ref().context("no blob contents")?.encode_to_vec();
169+
let blob_size = blob.len() as u64;
170+
171+
let mut chunks = blob.chunks(CHUNK_SIZE);
172+
let first_chunk = chunks.next().unwrap_or_default();
173+
let mut messages = vec![WriteMetadataBlobStreamRequest {
174+
request: Some(write_metadata_blob_stream_request::Request::MetadataBlob(
175+
MetadataBlob {
176+
data_blob: Some(DataBlob { id: id.clone(), blob: first_chunk.to_vec() }),
177+
version: metadata_blob.version,
178+
},
179+
)),
180+
}];
181+
182+
for chunk in chunks {
183+
messages.push(WriteMetadataBlobStreamRequest {
184+
request: Some(write_metadata_blob_stream_request::Request::Chunk(chunk.to_vec())),
185+
});
186+
}
187+
188+
let start_time = tokio::time::Instant::now();
189+
let write_result = self.write_metadata_blob_stream(futures::stream::iter(messages)).await;
190+
191+
if let Err(ref status) = write_result {
192+
if status.code() == Code::FailedPrecondition {
193+
return Ok(MetadataPersistResult::RetryNeeded);
194+
}
195+
write_result?;
196+
}
197+
198+
let mut elapsed_time = start_time.elapsed().as_millis() as u64;
199+
if elapsed_time == 0 {
200+
elapsed_time = 1;
201+
}
202+
let speed = blob_size / 1024 / elapsed_time;
203+
metrics::get_global_metrics().record_db_save_speed(speed);
204+
Ok(MetadataPersistResult::Succeeded)
205+
}
206+
147207
async fn add_blobs(
148208
&mut self,
149209
data_blobs: Vec<EncryptedDataBlob>,
@@ -238,6 +298,68 @@ impl DataBlobHandler for ExternalDbClient {
238298
}
239299
}
240300

301+
async fn get_metadata_blob_stream(
302+
&mut self,
303+
id: &BlobId,
304+
) -> anyhow::Result<Option<EncryptedMetadataBlob>> {
305+
let start_time = tokio::time::Instant::now();
306+
match self.read_metadata_blob_stream(ReadMetadataBlobStreamRequest { id: id.clone() }).await
307+
{
308+
Ok(response) => {
309+
let mut response_stream = response.into_inner();
310+
let mut full_blob = Vec::new();
311+
312+
let first_response = match response_stream.next().await {
313+
Some(res) => res?,
314+
None => return Ok(None),
315+
};
316+
317+
let version = match first_response.response {
318+
Some(read_metadata_blob_stream_response::Response::MetadataBlob(metadata)) => {
319+
if let Some(data_blob) = metadata.data_blob {
320+
full_blob.extend_from_slice(&data_blob.blob);
321+
}
322+
metadata.version
323+
}
324+
_ => anyhow::bail!("Expected MetadataBlob as the first message in the stream"),
325+
};
326+
327+
while let Some(response) = response_stream.next().await {
328+
let response = response?;
329+
match response.response {
330+
Some(read_metadata_blob_stream_response::Response::Chunk(chunk)) => {
331+
full_blob.extend_from_slice(&chunk);
332+
}
333+
_ => anyhow::bail!("Expected Chunk message in the stream"),
334+
}
335+
}
336+
337+
let metadata_blob = EncryptedMetadataBlob {
338+
encrypted_data_blob: Some(
339+
EncryptedDataBlob::decode(&*full_blob)
340+
.context("Failed to decode EncryptedMetadataBlob")?,
341+
),
342+
version,
343+
};
344+
345+
let mut elapsed_time = start_time.elapsed().as_millis() as u64;
346+
if elapsed_time == 0 {
347+
elapsed_time = 1;
348+
}
349+
let speed = full_blob.len() as u64 / 1024 / elapsed_time;
350+
metrics::get_global_metrics().record_db_load_speed(speed);
351+
Ok(Some(metadata_blob))
352+
}
353+
Err(status) => {
354+
if status.code() == Code::NotFound {
355+
Ok(None)
356+
} else {
357+
Err(status.into())
358+
}
359+
}
360+
}
361+
}
362+
241363
async fn get_blobs(
242364
&mut self,
243365
ids: &[BlobId],

oak_private_memory/test/database/service.rs

Lines changed: 71 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,15 @@ use sealed_memory_grpc_proto::oak::private_memory::sealed_memory_database_servic
2323
use sealed_memory_rust_proto::oak::private_memory::{
2424
DataBlob, DeleteBlobsRequest, DeleteBlobsResponse, MetadataBlob, ReadDataBlobRequest,
2525
ReadDataBlobResponse, ReadMetadataBlobRequest, ReadMetadataBlobResponse,
26-
ReadUnencryptedDataBlobRequest, ReadUnencryptedDataBlobResponse, ResetDatabaseRequest,
27-
ResetDatabaseResponse, WriteDataBlobRequest, WriteDataBlobResponse, WriteMetadataBlobRequest,
28-
WriteMetadataBlobResponse, WriteUnencryptedDataBlobRequest, WriteUnencryptedDataBlobResponse,
26+
ReadMetadataBlobStreamRequest, ReadMetadataBlobStreamResponse, ReadUnencryptedDataBlobRequest,
27+
ReadUnencryptedDataBlobResponse, ResetDatabaseRequest, ResetDatabaseResponse,
28+
WriteDataBlobRequest, WriteDataBlobResponse, WriteMetadataBlobRequest,
29+
WriteMetadataBlobResponse, WriteMetadataBlobStreamRequest, WriteMetadataBlobStreamResponse,
30+
WriteUnencryptedDataBlobRequest, WriteUnencryptedDataBlobResponse,
31+
read_metadata_blob_stream_response, write_metadata_blob_stream_request,
2932
};
3033
use tokio::{net::TcpListener, sync::Mutex};
31-
use tokio_stream::wrappers::TcpListenerStream;
34+
use tokio_stream::{StreamExt, wrappers::TcpListenerStream};
3235

3336
pub struct SealedMemoryDatabaseServiceTestImpl {
3437
pub database: Mutex<HashMap<String, DataBlob>>,
@@ -140,6 +143,70 @@ impl SealedMemoryDatabaseService for SealedMemoryDatabaseServiceTestImpl {
140143
}
141144
}
142145

146+
type ReadMetadataBlobStreamStream = std::pin::Pin<
147+
Box<
148+
dyn tokio_stream::Stream<Item = Result<ReadMetadataBlobStreamResponse, tonic::Status>>
149+
+ Send,
150+
>,
151+
>;
152+
153+
async fn write_metadata_blob_stream(
154+
&self,
155+
request: tonic::Request<tonic::Streaming<WriteMetadataBlobStreamRequest>>,
156+
) -> Result<tonic::Response<WriteMetadataBlobStreamResponse>, tonic::Status> {
157+
let mut stream = request.into_inner();
158+
let mut full_blob = Vec::new();
159+
let mut version = String::new();
160+
let mut id = String::new();
161+
162+
while let Some(request) = stream.next().await {
163+
let request = request?;
164+
match request.request {
165+
Some(write_metadata_blob_stream_request::Request::MetadataBlob(metadata)) => {
166+
version = metadata.version;
167+
if let Some(data_blob) = metadata.data_blob {
168+
id = data_blob.id;
169+
full_blob.extend_from_slice(&data_blob.blob);
170+
}
171+
}
172+
Some(write_metadata_blob_stream_request::Request::Chunk(chunk)) => {
173+
full_blob.extend_from_slice(&chunk);
174+
}
175+
None => {}
176+
}
177+
}
178+
179+
let added = self
180+
.add_metadata_blob_inner(
181+
id.clone(),
182+
MetadataBlob { data_blob: Some(DataBlob { id, blob: full_blob }), version },
183+
)
184+
.await;
185+
if !added {
186+
Err(tonic::Status::failed_precondition("metadata blob version mismatch"))
187+
} else {
188+
Ok(tonic::Response::new(WriteMetadataBlobStreamResponse {}))
189+
}
190+
}
191+
192+
async fn read_metadata_blob_stream(
193+
&self,
194+
request: tonic::Request<ReadMetadataBlobStreamRequest>,
195+
) -> Result<tonic::Response<Self::ReadMetadataBlobStreamStream>, tonic::Status> {
196+
let request = request.into_inner();
197+
let blob = self.get_metdata_blob_inner(&request.id).await;
198+
199+
if let Some(blob) = blob {
200+
let response = ReadMetadataBlobStreamResponse {
201+
response: Some(read_metadata_blob_stream_response::Response::MetadataBlob(blob)),
202+
};
203+
let stream = tokio_stream::iter(vec![Ok(response)]);
204+
Ok(tonic::Response::new(Box::pin(stream)))
205+
} else {
206+
Err(tonic::Status::not_found("Blob not found"))
207+
}
208+
}
209+
143210
async fn write_unencrypted_data_blob(
144211
&self,
145212
request: tonic::Request<WriteUnencryptedDataBlobRequest>,

0 commit comments

Comments
 (0)