Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,31 @@ private Session TryFindSession(SessionName name)
throw new RpcException(new GrpcCore.Status(StatusCode.NotFound, $"Session not found: {name}"));
}

public override Task<PartitionResponse> PartitionQuery(PartitionQueryRequest request, ServerCallContext context)
{
_requests.Enqueue(request);
_contexts.Enqueue(context);
_headers.Enqueue(context.RequestHeaders);
_executionTimes.TryGetValue(nameof(PartitionQuery), out ExecutionTime? executionTime);
executionTime?.SimulateExecutionTime();

_ = TryFindSession(request.SessionAsSessionName);
var transaction = FindOrBeginTransaction(request.SessionAsSessionName, request.Transaction);
var numPartitions = request.PartitionOptions.MaxPartitions;
if (numPartitions == 0)
{
numPartitions = Random.Shared.NextInt64(1, Math.Max(4, 2 * Environment.ProcessorCount));
}
var response = new PartitionResponse();
for (var n = 0; n < numPartitions; n++)
{
var token = ByteString.CopyFromUtf8(request.Sql + ": " + n);
response.Partitions.Add(new Partition{PartitionToken = token});
}
response.Transaction = transaction;
return Task.FromResult(response);
}

public override Task<ExecuteBatchDmlResponse> ExecuteBatchDml(ExecuteBatchDmlRequest request, ServerCallContext context)
{
_requests.Enqueue(request);
Expand Down Expand Up @@ -761,7 +786,12 @@ private StatementResult PrepareExecuteSql(string method, ExecuteSqlRequest reque
{
returnTransaction = transaction;
}
if (_results.TryGetValue(request.Sql.Trim(), out StatementResult? result))
var resultKey = request.Sql.Trim();
if (!request.PartitionToken.IsEmpty)
{
resultKey = request.PartitionToken.ToStringUtf8().Trim();
}
if (_results.TryGetValue(resultKey, out StatementResult? result))
{
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,45 @@ public void TestRandomResults([Values] LibType libType, [Values(0, 1, 10)] int n
Assert.That(request.Transaction?.SingleUse?.ReadOnly?.HasStrong ?? false);
}

[Test]
public async Task TestPartitionedQuery([Values] LibType libType, [Values(0, 1, 10, 500)] int numRows, [Values(0, 1, 5, 9, 10, 11)] int prefetchRows)
{
const string query = "select * from random";
var numPartitions = Random.Shared.NextInt64(1, Math.Max(10, Environment.ProcessorCount));

var rowType = RandomResultSetGenerator.GenerateAllTypesRowType();
var remainingRows = numRows;
for (var n = 0; n < numPartitions; n++)
{
var key = $"{query}: {n}";
int numRowsInPartition;
if (remainingRows == 0 || n == numPartitions - 1)
{
numRowsInPartition = remainingRows;
}
else
{
numRowsInPartition = Random.Shared.Next(remainingRows);
}
remainingRows -= numRowsInPartition;
var results = RandomResultSetGenerator.Generate(rowType, numRowsInPartition);
Fixture.SpannerMock.AddOrUpdateStatementResult(key, StatementResult.CreateQuery(results));
}

await using var pool = Pool.Create(SpannerLibDictionary[libType], ConnectionString);
await using var connection = pool.CreateConnection();
await using var _ = await connection.ExecuteAsync(new ExecuteSqlRequest { Sql = $"set max_partitions={numPartitions}" });
await using var rows = await connection.ExecuteAsync(new ExecuteSqlRequest { Sql = $"run partitioned query {query}" }, prefetchRows);

var rowCount = 0;
while (await rows.NextAsync() is { } row)
{
rowCount++;
Assert.That(row.Values.Count, Is.EqualTo(rowType.Fields.Count));
}
Assert.That(rowCount, Is.EqualTo(numRows));
}

[Test]
public async Task TestCancellation([Values] LibType libType, [Values(0, 1, 10)] int numRows, [Values(0, 1, 5, 9, 10, 11)] int prefetchRows)
{
Expand Down
Loading