Skip to content

Commit 4550653

Browse files
committed
Add SO_TIMESTAMP support for datagram channels
Motivation: NIO supports ECN, packet info, and segmentation metadata but not timestamps. Userspace timestamps are imprecise due to scheduler latency between packet arrival and recvmsg. SO_TIMESTAMP captures the exact kernel receive time with microsecond precision via control messages, which is critical for accurate RTT measurements and latency monitoring. Modifications: - Add timestamp field to AddressedEnvelope.Metadata - Add TimestampOption channel option - Parse SCM_TIMESTAMP control messages in ControlMessageParser - Add init(ecnState:packetInfo:segmentSize:timestamp:) to Metadata - Add test coverage Result: Datagram channels can enable .receiveTimestamp to receive kernel timestamps in AddressedEnvelope.Metadata.timestamp as a Double (seconds since epoch with microsecond precision).
1 parent c329d1e commit 4550653

File tree

5 files changed

+110
-10
lines changed

5 files changed

+110
-10
lines changed

Sources/NIOCore/AddressedEnvelope.swift

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,13 @@ public struct AddressedEnvelope<DataType> {
5050
/// (if attached).
5151
public var segmentSize: Int?
5252

53+
/// The timestamp this datagram was received by the kernel.
54+
///
55+
/// When `SO_TIMESTAMP` is enabled on a `DatagramChannel`, the kernel records the time each packet arrives.
56+
/// This value is expressed as the number of seconds since the Unix epoch (January 1, 1970), with
57+
/// sub-second precision.
58+
public var timestamp: Double?
59+
5360
public init(ecnState: NIOExplicitCongestionNotificationState) {
5461
self.ecnState = ecnState
5562
self.packetInfo = nil
@@ -71,6 +78,18 @@ public struct AddressedEnvelope<DataType> {
7178
self.packetInfo = packetInfo
7279
self.segmentSize = segmentSize
7380
}
81+
82+
public init(
83+
ecnState: NIOExplicitCongestionNotificationState,
84+
packetInfo: NIOPacketInfo?,
85+
segmentSize: Int?,
86+
timestamp: Double?
87+
) {
88+
self.ecnState = ecnState
89+
self.packetInfo = packetInfo
90+
self.segmentSize = segmentSize
91+
self.timestamp = timestamp
92+
}
7493
}
7594
}
7695

Sources/NIOCore/ChannelOption.swift

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,12 @@ extension ChannelOptions {
236236
public init() {}
237237
}
238238

239+
/// When set to true timestamp information will be reported through `AddressedEnvelope.Metadata`
240+
public struct TimestampOption: ChannelOption, Sendable {
241+
public typealias Value = Bool
242+
public init() {}
243+
}
244+
239245
/// The watermark used to detect when `Channel.isWritable` returns `true` or `false`.
240246
public struct WriteBufferWaterMark: Sendable {
241247
/// The low mark setting for a `Channel`.
@@ -379,6 +385,9 @@ public struct ChannelOptions: Sendable {
379385
/// - seealso: `ExplicitCongestionNotificationsOption`
380386
public static let explicitCongestionNotification = Types.ExplicitCongestionNotificationsOption()
381387

388+
/// - seealso: `TimestampOption`
389+
public static let timestamp = Types.TimestampOption()
390+
382391
/// - seealso: `ReceivePacketInfo`
383392
public static let receivePacketInfo = Types.ReceivePacketInfo()
384393

@@ -477,6 +486,11 @@ extension ChannelOption where Self == ChannelOptions.Types.ExplicitCongestionNot
477486
public static var explicitCongestionNotification: Self { .init() }
478487
}
479488

489+
/// - seealso: `TimestampOption`.
490+
extension ChannelOption where Self == ChannelOptions.Types.TimestampOption {
491+
public static var timestamp: Self { .init() }
492+
}
493+
480494
/// - seealso: `ReceivePacketInfo`.
481495
extension ChannelOption where Self == ChannelOptions.Types.ReceivePacketInfo {
482496
public static var receivePacketInfo: Self { .init() }

Sources/NIOPosix/ControlMessage.swift

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@ struct ControlMessageParser {
201201
var ecnValue: NIOExplicitCongestionNotificationState = .transportNotCapable // Default
202202
var packetInfo: NIOPacketInfo? = nil
203203
var segmentSize: Int? = nil
204+
var timestamp: Double? = nil
204205

205206
init(parsing controlMessagesReceived: UnsafeControlMessageCollection) {
206207
for controlMessage in controlMessagesReceived {
@@ -231,6 +232,8 @@ struct ControlMessageParser {
231232
self.receiveIPv6Message(controlMessage)
232233
} else if controlMessage.level == Posix.SOL_UDP {
233234
self.receiveUDPMessage(controlMessage)
235+
} else if controlMessage.level == SOL_SOCKET {
236+
self.receiveTimestampMessage(controlMessage)
234237
}
235238
}
236239

@@ -292,6 +295,15 @@ struct ControlMessageParser {
292295
}
293296
#endif
294297
}
298+
299+
private mutating func receiveTimestampMessage(_ controlMessage: UnsafeControlMessage) {
300+
if controlMessage.type == SCM_TIMESTAMP {
301+
if let data = controlMessage.data {
302+
let timestamp = data.loadUnaligned(as: timeval.self)
303+
self.timestamp = Double(timestamp.tv_sec) + Double(timestamp.tv_usec) / 1_000_000.0
304+
}
305+
}
306+
}
295307
}
296308

297309
extension NIOExplicitCongestionNotificationState {
@@ -437,7 +449,8 @@ extension AddressedEnvelope.Metadata {
437449
self.init(
438450
ecnState: controlMessageReceiver.ecnValue,
439451
packetInfo: controlMessageReceiver.packetInfo,
440-
segmentSize: controlMessageReceiver.segmentSize
452+
segmentSize: controlMessageReceiver.segmentSize,
453+
timestamp: controlMessageReceiver.timestamp
441454
)
442455
}
443456
}

Sources/NIOPosix/SocketChannel.swift

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,7 @@ final class DatagramChannel: BaseSocketChannel<Socket>, @unchecked Sendable {
475475
private var reportExplicitCongestionNotifications = false
476476
private var receivePacketInfo = false
477477
private var receiveSegmentSize = false
478+
private var receiveTimestamp = false
478479

479480
// Guard against re-entrance of flushNow() method.
480481
private let pendingWrites: PendingDatagramWritesManager
@@ -647,6 +648,14 @@ final class DatagramChannel: BaseSocketChannel<Socket>, @unchecked Sendable {
647648
}
648649
let enable = value as! ChannelOptions.Types.DatagramReceiveSegmentSize.Value
649650
self.receiveSegmentSize = enable
651+
case _ as ChannelOptions.Types.TimestampOption:
652+
self.receiveTimestamp = value as! Bool
653+
let valueAsInt = self.receiveTimestamp ? 1 : 0
654+
try self.socket.setOption(
655+
level: .socket,
656+
name: .so_timestamp,
657+
value: valueAsInt
658+
)
650659
default:
651660
try super.setOption0(option, value: value)
652661
}
@@ -727,6 +736,8 @@ final class DatagramChannel: BaseSocketChannel<Socket>, @unchecked Sendable {
727736
return self.receiveSegmentSize as! Option.Value
728737
case _ as ChannelOptions.Types.BufferedWritableBytesOption:
729738
return Int(self.pendingWrites.bufferedBytes) as! Option.Value
739+
case _ as ChannelOptions.Types.TimestampOption:
740+
return self.receiveTimestamp as! Option.Value
730741
default:
731742
return try super.getOption0(option)
732743
}
@@ -771,7 +782,9 @@ final class DatagramChannel: BaseSocketChannel<Socket>, @unchecked Sendable {
771782
override func readFromSocket() throws -> ReadResult {
772783
if self.vectorReadManager != nil {
773784
return try self.vectorReadFromSocket()
774-
} else if self.reportExplicitCongestionNotifications || self.receivePacketInfo || self.receiveSegmentSize {
785+
} else if self.reportExplicitCongestionNotifications || self.receivePacketInfo || self.receiveSegmentSize
786+
|| self.receiveTimestamp
787+
{
775788
let pooledMsgBuffer = self.selectableEventLoop.msgBufferPool.get()
776789
defer { self.selectableEventLoop.msgBufferPool.put(pooledMsgBuffer) }
777790
return try pooledMsgBuffer.withUnsafePointers { _, _, controlMessageStorage in
@@ -816,7 +829,8 @@ final class DatagramChannel: BaseSocketChannel<Socket>, @unchecked Sendable {
816829
readPending = false
817830

818831
let metadata: AddressedEnvelope<ByteBuffer>.Metadata?
819-
if self.reportExplicitCongestionNotifications || self.receivePacketInfo || self.receiveSegmentSize,
832+
if self.reportExplicitCongestionNotifications || self.receivePacketInfo || self.receiveSegmentSize
833+
|| self.receiveTimestamp,
820834
let controlMessagesReceived = controlBytes.receivedControlMessages
821835
{
822836
metadata = .init(from: controlMessagesReceived)
@@ -861,7 +875,7 @@ final class DatagramChannel: BaseSocketChannel<Socket>, @unchecked Sendable {
861875
socket: self.socket,
862876
buffer: &buffer,
863877
parseControlMessages: self.reportExplicitCongestionNotifications || self.receivePacketInfo
864-
|| self.receiveSegmentSize
878+
|| self.receiveSegmentSize || self.receiveTimestamp
865879
)
866880
}
867881

Tests/NIOPosixTests/DatagramChannelTests.swift

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1071,6 +1071,46 @@ class DatagramChannelTests: XCTestCase {
10711071
testEcnAndPacketInfoReceive(address: "::1", vectorRead: true, vectorSend: true, receivePacketInfo: true)
10721072
}
10731073

1074+
func testReceiveTimestamp() throws {
1075+
let address = "127.0.0.1"
1076+
let receiveChannel = try DatagramBootstrap(group: group)
1077+
.channelOption(.timestamp, value: true)
1078+
.channelInitializer { channel in
1079+
channel.eventLoop.makeCompletedFuture {
1080+
try channel.pipeline.syncOperations.addHandler(
1081+
DatagramReadRecorder<ByteBuffer>(),
1082+
name: "ByteReadRecorder"
1083+
)
1084+
}
1085+
}
1086+
.bind(host: address, port: 0)
1087+
.wait()
1088+
defer {
1089+
XCTAssertNoThrow(try receiveChannel.close().wait())
1090+
}
1091+
let sendChannel = try DatagramBootstrap(group: group)
1092+
.bind(host: address, port: 0)
1093+
.wait()
1094+
defer {
1095+
XCTAssertNoThrow(try sendChannel.close().wait())
1096+
}
1097+
1098+
var buffer = sendChannel.allocator.buffer(capacity: 1)
1099+
buffer.writeRepeatingByte(0, count: 1)
1100+
1101+
let writeData = AddressedEnvelope(
1102+
remoteAddress: receiveChannel.localAddress!,
1103+
data: buffer,
1104+
metadata: .init(ecnState: .transportNotCapable)
1105+
)
1106+
try sendChannel.writeAndFlush(writeData).wait()
1107+
1108+
let expectedReads = 1
1109+
let reads = try receiveChannel.waitForDatagrams(count: 1)
1110+
XCTAssertEqual(reads.count, expectedReads)
1111+
XCTAssertNotNil(reads[0].metadata?.timestamp)
1112+
}
1113+
10741114
func testDoingICMPWithoutRoot() throws {
10751115
// This test validates we can send ICMP messages on a datagram socket without having root privilege.
10761116
//
@@ -1886,7 +1926,7 @@ class DatagramChannelTests: XCTestCase {
18861926
let writeData = AddressedEnvelope(
18871927
remoteAddress: self.secondChannel.localAddress!,
18881928
data: buffer,
1889-
metadata: .init(ecnState: .transportNotCapable, packetInfo: nil, segmentSize: segmentSize)
1929+
metadata: .init(ecnState: .transportNotCapable, packetInfo: nil, segmentSize: segmentSize, timestamp: nil)
18901930
)
18911931
XCTAssertNoThrow(try self.firstChannel.writeAndFlush(writeData).wait())
18921932

@@ -1936,12 +1976,12 @@ class DatagramChannelTests: XCTestCase {
19361976
let writeData1 = AddressedEnvelope(
19371977
remoteAddress: self.secondChannel.localAddress!,
19381978
data: buffer1,
1939-
metadata: .init(ecnState: .transportNotCapable, packetInfo: nil, segmentSize: segmentSize1)
1979+
metadata: .init(ecnState: .transportNotCapable, packetInfo: nil, segmentSize: segmentSize1, timestamp: nil)
19401980
)
19411981
let writeData2 = AddressedEnvelope(
19421982
remoteAddress: self.secondChannel.localAddress!,
19431983
data: buffer2,
1944-
metadata: .init(ecnState: .transportNotCapable, packetInfo: nil, segmentSize: segmentSize2)
1984+
metadata: .init(ecnState: .transportNotCapable, packetInfo: nil, segmentSize: segmentSize2, timestamp: nil)
19451985
)
19461986
let write1 = self.firstChannel.write(writeData1)
19471987
let write2 = self.firstChannel.write(writeData2)
@@ -1969,7 +2009,7 @@ class DatagramChannelTests: XCTestCase {
19692009
let gsoEnvelope = AddressedEnvelope(
19702010
remoteAddress: self.secondChannel.localAddress!,
19712011
data: gsoBuffer,
1972-
metadata: .init(ecnState: .transportNotCapable, packetInfo: nil, segmentSize: segmentSize)
2012+
metadata: .init(ecnState: .transportNotCapable, packetInfo: nil, segmentSize: segmentSize, timestamp: nil)
19732013
)
19742014

19752015
// Non-GSO message
@@ -2010,7 +2050,7 @@ class DatagramChannelTests: XCTestCase {
20102050
let envelope = AddressedEnvelope(
20112051
remoteAddress: self.secondChannel.localAddress!,
20122052
data: buffer,
2013-
metadata: .init(ecnState: .transportNotCapable, packetInfo: nil, segmentSize: segmentSize)
2053+
metadata: .init(ecnState: .transportNotCapable, packetInfo: nil, segmentSize: segmentSize, timestamp: nil)
20142054
)
20152055

20162056
XCTAssertNoThrow(try self.firstChannel.writeAndFlush(envelope).wait())
@@ -2033,7 +2073,7 @@ class DatagramChannelTests: XCTestCase {
20332073
let envelope = AddressedEnvelope(
20342074
remoteAddress: self.secondChannel.localAddress!,
20352075
data: buffer,
2036-
metadata: .init(ecnState: .transportNotCapable, packetInfo: nil, segmentSize: 500)
2076+
metadata: .init(ecnState: .transportNotCapable, packetInfo: nil, segmentSize: 500, timestamp: nil)
20372077
)
20382078

20392079
XCTAssertThrowsError(try self.firstChannel.writeAndFlush(envelope).wait()) { error in

0 commit comments

Comments
 (0)