Skip to content

Commit e357421

Browse files
committed
Added device update event for hello
1 parent a2969ec commit e357421

File tree

6 files changed

+59
-19
lines changed

6 files changed

+59
-19
lines changed

TASKS.md

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,16 @@
2424
- [x] BUG why invalid wired format
2525
- [x] Add logger to mir sdk
2626
- [x] Deploy
27-
- [ ] Perf Improvements
27+
- [x] Perf Improvements
28+
- [x] protocfg/cmd schema protocache
29+
- [x] need an event on device hello for schema
30+
1. new schema update event maybe? no just use update
31+
2. on hello received or on the buffer flush? buffer flush
32+
- [x] change protocache to accept new schema even if not request before so its ready
2833
- [x] Test hearthbeat with hello
29-
- [ ] prototlm/protocfg, if device dont exist, dont ask schema
30-
- [ ] Bulk create devices
31-
- [ ] Use for swarm
34+
- [x] prototlm/protocfg, if device dont exist, dont ask schema
35+
- [x] Bulk create devices
36+
- [x] Use for swarm
3237
- [x] Stack Events and flush them every x seconds
3338
- [x] With many create at the same time
3439
- [x] Stack Hearbeaths and flush them every x seconds

internal/servers/core_srv/server.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,6 @@ func (s *CoreServer) hearthbeatOnlinePulsor() {
398398
s.hearthbeatsWriteBufferMu.Unlock()
399399

400400
degradedMode := false
401-
402401
devsResp, err := retry.RetryOnErrorContainsWithResult(func() ([]mir_v1.Device, error) {
403402
res, err := s.store.UpdateDeviceHello(tempBuffer)
404403
return res, err
@@ -418,7 +417,7 @@ func (s *CoreServer) hearthbeatOnlinePulsor() {
418417
}).WithStatus(mir_v1.DeviceStatus{
419418
Online: &degradedMode,
420419
LastHearthbeat: &surrealdbModels.CustomDateTime{Time: t.Hearthbeat},
421-
})
420+
}).WithSchema(t.Schema, t.Hearthbeat)
422421
i += 1
423422
}
424423
}
@@ -471,6 +470,12 @@ func (s *CoreServer) hearthbeatOnlinePulsor() {
471470
newOnline = append(newOnline, dev.Spec.DeviceId)
472471
}
473472
s.hearthbeats[mir_v1.DeviceId(dev.Spec.DeviceId)] = dev.Status.LastHearthbeat.Time
473+
474+
if hello, ok := tempBuffer[mir_v1.DeviceId(dev.Spec.DeviceId)]; ok && hello.Schema != nil {
475+
if err := publishDeviceUpdateEvent(s.m, nil, dev); err != nil {
476+
l.Warn().Err(err).Str("device_id", dev.Spec.DeviceId).Msg("error occure while publishing device update event")
477+
}
478+
}
474479
}
475480
s.hearthbeatsMutex.Unlock()
476481

internal/servers/core_srv/server_integration_test.go

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2513,6 +2513,17 @@ func TestDeviceAutoProvisionWithSchema(t *testing.T) {
25132513
msg.Ack()
25142514
})
25152515

2516+
// Subscribe to device update event
2517+
updEvent := make(chan bool, 1)
2518+
updateEventCount := 0
2519+
u, err := mSdk.Bus.Subscribe(
2520+
core_client.DeviceUpdatedEvent.WithId(deviceIds[0]),
2521+
func(msg *nats.Msg) {
2522+
updateEventCount += 1
2523+
updEvent <- true
2524+
msg.Ack()
2525+
})
2526+
25162527
// Act
25172528
fSet := &descriptorpb.FileDescriptorSet{
25182529
File: []*descriptorpb.FileDescriptorProto{
@@ -2526,11 +2537,19 @@ func TestDeviceAutoProvisionWithSchema(t *testing.T) {
25262537
t.Error(err)
25272538
}
25282539

2529-
select {
2530-
case <-msgReceived:
2531-
// Device created event received
2532-
case <-time.After(20 * time.Second):
2533-
t.Error("Timeout waiting for device auto-provision")
2540+
count := 0
2541+
EvtLoop:
2542+
for count < 2 {
2543+
select {
2544+
case <-msgReceived:
2545+
// Device created event received
2546+
count++
2547+
case <-updEvent:
2548+
count++
2549+
case <-time.After(20 * time.Second):
2550+
t.Error("Timeout waiting for device auto-provision")
2551+
break EvtLoop
2552+
}
25342553
}
25352554

25362555
respList, err := core_client.PublishDeviceListRequest(mSdk.Bus, reqList)
@@ -2551,7 +2570,9 @@ func TestDeviceAutoProvisionWithSchema(t *testing.T) {
25512570
}
25522571
assert.Equal(t, 1, onlineEventCount)
25532572
assert.Equal(t, 1, createEventCount)
2573+
assert.Equal(t, 1, updateEventCount)
25542574
s.Unsubscribe()
2575+
u.Unsubscribe()
25552576
c.Unsubscribe()
25562577
}
25572578

internal/services/schema_cache/proto_cache.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -241,13 +241,13 @@ func (c *MirSchemaCache) deviceUpdateSub(msg *mir.Msg, deviceId string, device m
241241

242242
// We dont update the cache with new elements.
243243
// It has to be requested first
244-
c.cacheLock.RLock()
245-
if _, ok := c.cache[deviceId]; !ok {
246-
msg.Ack()
247-
c.cacheLock.RUnlock()
248-
return
249-
}
250-
c.cacheLock.RUnlock()
244+
// c.cacheLock.RLock()
245+
// if _, ok := c.cache[deviceId]; !ok {
246+
// msg.Ack()
247+
// c.cacheLock.RUnlock()
248+
// return
249+
// }
250+
// c.cacheLock.RUnlock()
251251

252252
if err != nil {
253253
l.Error().Str("device_id", deviceId).Err(err).Msg("error deserializing event")

internal/services/schema_cache/proto_cache_integration_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ func TestPublishDeviceUpdateCache(t *testing.T) {
143143
// Assert
144144
assert.Equal(t, true, mir_proto.AreSchemaEqual(ogSch, sch))
145145
assert.Equal(t, devPostUpd.Meta.Labels["test"], str)
146-
assert.Equal(t, 1, count)
146+
assert.Equal(t, 2, count)
147147
cancel()
148148
wg.Wait()
149149
}

pkgs/mir_v1/device.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,15 @@ func (d Device) WithStatus(s DeviceStatus) Device {
7474
return d
7575
}
7676

77+
func (d Device) WithSchema(s *mir_proto.MirProtoSchema, t time.Time) Device {
78+
if s == nil {
79+
return d
80+
}
81+
d.Status.Schema, _ = NewSchemaFromProtoSchema(s)
82+
d.Status.Schema.LastSchemaFetch = &surrealdbModels.CustomDateTime{Time: t}
83+
return d
84+
}
85+
7786
func (d Device) GetNameNamespace() string {
7887
return d.Meta.Name + "/" + d.Meta.Namespace
7988
}

0 commit comments

Comments
 (0)