Skip to content

Commit b3f21b8

Browse files
committed
feat: Implement telemetry processing with schema validation and dead-letter queue support
- Added TelemetryQueueOptions for queue configuration including dead-letter settings. - Integrated SchemaRegistry for validating telemetry messages against defined schemas. - Enhanced TelemetryProcessor to handle schema validation and requeue logic. - Introduced SchemaRegistryService to manage schema registration and validation. - Created SchemaRegistrySeeder to seed initial schema definitions on startup. - Added new models and migrations for database schema related to telemetry and schema registry. - Updated appsettings.json to include new configuration options for telemetry and schema registry.
1 parent 253fb4b commit b3f21b8

19 files changed

+1217
-55
lines changed

.config/dotnet-tools.json

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
{
2+
"version": 1,
3+
"isRoot": true,
4+
"tools": {
5+
"dotnet-ef": {
6+
"version": "8.0.23",
7+
"commands": [
8+
"dotnet-ef"
9+
],
10+
"rollForward": false
11+
}
12+
}
13+
}

.gitignore

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ CMakeCache.txt
2323
cmake_install.cmake
2424
Makefile
2525
build/
26+
build_nogrpc/
2627
src/services/agent-cpp/build*/
2728
src/services/agent-cpp/build_*/
2829
*.o
@@ -68,6 +69,11 @@ coverage/
6869

6970
# Local runtime artifacts
7071
Data/
72+
!src/services/core-dotnet/AetherGuard.Core/Data/
73+
!src/services/core-dotnet/AetherGuard.Core/Data/Migrations/
74+
!src/services/core-dotnet/AetherGuard.Core/Data/Migrations/**
75+
src/services/core-dotnet/AetherGuard.Core/Data/Snapshots/
76+
src/services/core-dotnet/AetherGuard.Core/Data/market_signal.json
7177
dummy_snapshot.tar.gz
7278
.compose.override.temp.yml
7379
.act-logs/

README.md

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ v2.2 reference architecture with a concrete implementation guide.
1818

1919
## Project Status
2020

21-
- Stage: v2.2 baseline delivered (Phase 0-4). Remaining productization gaps tracked below.
21+
- Stage: v2.2 baseline delivered (Phase 0-4). Remaining productization gaps tracked below (currently none).
2222
- License: MIT
2323
- Authors: Qi Junyi, Xiao Erdong (2026)
2424

@@ -56,15 +56,13 @@ This project targets a product-grade release, not a demo. The following standard
5656
- Dashboard (Next.js): telemetry and command visibility with NextAuth credentials.
5757
- Storage: snapshots stored on local filesystem by default; optional S3/MinIO backend with retention sweeper and S3 lifecycle support.
5858
- Security: API key for command endpoints; SPIRE mTLS for agent/core; OpenTelemetry baseline across core/AI/dashboard.
59-
- Messaging: telemetry queue payloads are wrapped in a schema-versioned envelope (v1).
59+
- Messaging: telemetry queue payloads are wrapped in a schema-versioned envelope (v1) with DLQ routing and prefetch backpressure.
60+
- Schema registry: telemetry envelope and payload schemas are registered in Postgres and validated at ingest.
6061
- Supply chain: SBOM generation, cosign signing, and SLSA provenance in CI.
6162

6263
### Productization Gaps (v1.x)
6364

64-
- No schema registry or formal compatibility policy for MQ events (only a lightweight schema-version envelope is in place).
65-
- No EF Core migrations or formal upgrade path (production requires schema versioning + migrations).
66-
67-
Code scan note: no TODO/FIXME markers found in the repo; the remaining gaps are architectural items listed above.
65+
- None for the v2.2 baseline in this repo. Remaining enhancements are tracked via issues/roadmap.
6866

6967
## v2.2 Reference Architecture
7068

@@ -297,6 +295,20 @@ Telemetry schema policy (core-service):
297295
- TelemetrySchema__MaxSupportedVersion=1
298296
- TelemetrySchema__OnUnsupported=drop
299297

298+
Telemetry queue options (core-service):
299+
300+
- TelemetryQueue__QueueName=telemetry_data
301+
- TelemetryQueue__EnableDeadLettering=true
302+
- TelemetryQueue__DeadLetterExchange=telemetry_dlx
303+
- TelemetryQueue__DeadLetterQueueName=telemetry_data.dlq
304+
- TelemetryQueue__DeadLetterRoutingKey=telemetry_data.dlq
305+
- TelemetryQueue__PrefetchCount=50
306+
307+
Schema registry (core-service):
308+
309+
- SchemaRegistry__Enabled=true
310+
- SchemaRegistry__Compatibility=BACKWARD
311+
300312
Agent OpenTelemetry (agent-service):
301313

302314
- AG_OTEL_ENABLED=true
@@ -412,10 +424,24 @@ TelemetryRecord persisted to PostgreSQL:
412424
- PredictedCpu
413425
- Timestamp (UTC)
414426

415-
For production, add EF Core migrations and a formal upgrade process.
427+
EF Core migrations are included; use them as the basis for upgrade and rollback workflows.
416428

417429
## Development
418430

431+
### Database Migrations
432+
433+
Generate or update migrations:
434+
435+
```bash
436+
dotnet tool run dotnet-ef migrations add InitialCreate \
437+
--project src/services/core-dotnet/AetherGuard.Core/AetherGuard.Core.csproj \
438+
--startup-project src/services/core-dotnet/AetherGuard.Core/AetherGuard.Core.csproj \
439+
--context ApplicationDbContext \
440+
--output-dir Data/Migrations
441+
```
442+
443+
Use `AG_DB_CONNECTION` to override the connection string during migration generation.
444+
419445
Dashboard:
420446

421447
```bash

src/services/core-dotnet/AetherGuard.Core/AetherGuard.Core.csproj

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
<PackageReference Include="Google.Api.CommonProtos" Version="2.17.0" />
1212
<PackageReference Include="Google.Protobuf" Version="3.31.1" />
1313
<PackageReference Include="Grpc.AspNetCore" Version="2.62.0" />
14+
<PackageReference Include="JsonSchema.Net" Version="8.0.5" />
1415
<PackageReference Include="Microsoft.AspNetCore.Grpc.JsonTranscoding" Version="8.0.23" />
1516
<PackageReference Include="Grpc.Tools" Version="2.62.0" PrivateAssets="All" />
1617
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="8.0.23" />
@@ -32,15 +33,9 @@
3233
</ItemGroup>
3334

3435
<ItemGroup>
35-
<Protobuf Include="..\..\..\shared\protos\common.proto"
36-
GrpcServices="None"
37-
AdditionalImportDirs="..\..\..\shared\protos" />
38-
<Protobuf Include="..\..\..\shared\protos\agent_service.proto"
39-
GrpcServices="Server"
40-
AdditionalImportDirs="..\..\..\shared\protos" />
41-
<Protobuf Include="..\..\..\shared\protos\control_plane.proto"
42-
GrpcServices="Server"
43-
AdditionalImportDirs="..\..\..\shared\protos" />
36+
<Protobuf Include="..\..\..\shared\protos\common.proto" GrpcServices="None" AdditionalImportDirs="..\..\..\shared\protos" />
37+
<Protobuf Include="..\..\..\shared\protos\agent_service.proto" GrpcServices="Server" AdditionalImportDirs="..\..\..\shared\protos" />
38+
<Protobuf Include="..\..\..\shared\protos\control_plane.proto" GrpcServices="Server" AdditionalImportDirs="..\..\..\shared\protos" />
4439
</ItemGroup>
4540

4641
</Project>

src/services/core-dotnet/AetherGuard.Core/Data/ApplicationDbContext.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ public ApplicationDbContext(DbContextOptions<ApplicationDbContext> options)
1414
public DbSet<AgentCommand> AgentCommands => Set<AgentCommand>();
1515
public DbSet<CommandAudit> CommandAudits => Set<CommandAudit>();
1616
public DbSet<TelemetryRecord> TelemetryRecords => Set<TelemetryRecord>();
17+
public DbSet<SchemaRegistryEntry> SchemaRegistryEntries => Set<SchemaRegistryEntry>();
1718

1819
protected override void OnModelCreating(ModelBuilder modelBuilder)
1920
{
@@ -74,5 +75,17 @@ protected override void OnModelCreating(ModelBuilder modelBuilder)
7475
entity.Property(e => e.Error).HasColumnName("error");
7576
entity.Property(e => e.CreatedAt).HasColumnName("created_at");
7677
});
78+
79+
modelBuilder.Entity<SchemaRegistryEntry>(entity =>
80+
{
81+
entity.ToTable("schema_registry");
82+
entity.HasKey(e => e.Id);
83+
entity.HasIndex(e => new { e.Subject, e.Version }).IsUnique();
84+
entity.Property(e => e.Id).HasColumnName("id").ValueGeneratedOnAdd();
85+
entity.Property(e => e.Subject).HasColumnName("subject");
86+
entity.Property(e => e.Version).HasColumnName("version");
87+
entity.Property(e => e.Schema).HasColumnName("schema");
88+
entity.Property(e => e.CreatedAt).HasColumnName("created_at");
89+
});
7790
}
7891
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
using Microsoft.EntityFrameworkCore;
2+
using Microsoft.EntityFrameworkCore.Design;
3+
4+
namespace AetherGuard.Core.Data;
5+
6+
public sealed class DesignTimeDbContextFactory : IDesignTimeDbContextFactory<ApplicationDbContext>
7+
{
8+
public ApplicationDbContext CreateDbContext(string[] args)
9+
{
10+
var connectionString = Environment.GetEnvironmentVariable("AG_DB_CONNECTION")
11+
?? "Host=localhost;Database=AetherGuardDb;Username=postgres;Password=password";
12+
13+
var optionsBuilder = new DbContextOptionsBuilder<ApplicationDbContext>();
14+
optionsBuilder.UseNpgsql(connectionString);
15+
return new ApplicationDbContext(optionsBuilder.Options);
16+
}
17+
}

0 commit comments

Comments
 (0)