Skip to content

Commit da8432d

Browse files
committed
Update DelegateMQ library
1 parent dd46973 commit da8432d

File tree

8 files changed

+496
-9
lines changed

8 files changed

+496
-9
lines changed

DelegateMQ/DelegateMQ.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,10 @@
114114
#error "Transport implementation not found."
115115
#endif
116116

117+
#if defined(DMQ_TRANSPORT_ZEROMQ) || defined(DMQ_TRANSPORT_WIN32_UDP) || defined(DMQ_TRANSPORT_LINUX_UDP)
118+
#include "predef/util/NetworkEngine.h"
119+
#endif
120+
117121
#include "predef/util/Fault.h"
118122
#include "predef/util/Timer.h"
119123
#include "predef/util/TransportMonitor.h"

DelegateMQ/predef/os/stdlib/Thread.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,13 @@ bool Thread::CreateThread(std::optional<dmq::Duration> watchdogTimeout)
5757
// Timer to ensure the Thread instance runs periodically. ThreadCheck invoked
5858
// on this thread instance.
5959
m_threadTimer = std::make_unique<Timer>();
60-
m_threadTimerConn = m_threadTimer->Expired->Connect(MakeDelegate(this, &Thread::ThreadCheck, *this));
60+
m_threadTimerConn = m_threadTimer->OnExpired->Connect(MakeDelegate(this, &Thread::ThreadCheck, *this));
6161
m_threadTimer->Start(m_watchdogTimeout.load() / 4);
6262

6363
// Timer to check that this Thread instance runs. WatchdogCheck invoked
6464
// on Timer::ProcessTimers() thread.
6565
m_watchdogTimer = std::make_unique<Timer>();
66-
m_watchdogTimerConn = m_watchdogTimer->Expired->Connect(MakeDelegate(this, &Thread::WatchdogCheck));
66+
m_watchdogTimerConn = m_watchdogTimer->OnExpired->Connect(MakeDelegate(this, &Thread::WatchdogCheck));
6767
m_watchdogTimer->Start(m_watchdogTimeout.load() / 2);
6868
}
6969

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
#include "NetworkEngine.h"
2+
3+
// Only compile implementation if a compatible transport is selected
4+
#if defined(DMQ_TRANSPORT_ZEROMQ) || defined(DMQ_TRANSPORT_WIN32_UDP) || defined(DMQ_TRANSPORT_LINUX_UDP)
5+
6+
using namespace dmq;
7+
using namespace std;
8+
9+
const std::chrono::milliseconds NetworkEngine::SEND_TIMEOUT(100);
10+
const std::chrono::milliseconds NetworkEngine::RECV_TIMEOUT(2000);
11+
12+
NetworkEngine::NetworkEngine()
13+
: m_thread("NetworkEngine"),
14+
m_transportMonitor(RECV_TIMEOUT)
15+
{
16+
m_thread.CreateThread(std::chrono::milliseconds(5000));
17+
}
18+
19+
NetworkEngine::~NetworkEngine()
20+
{
21+
Stop();
22+
m_thread.ExitThread();
23+
delete m_recvThread;
24+
}
25+
26+
// SWITCH: Initialize Implementation
27+
28+
#if defined(DMQ_TRANSPORT_ZEROMQ)
29+
30+
// --------------------------------------------------------
31+
// ZeroMQ Implementation
32+
// --------------------------------------------------------
33+
int NetworkEngine::Initialize(const std::string& sendAddr, const std::string& recvAddr, bool isServer)
34+
{
35+
if (Thread::GetCurrentThreadId() != m_thread.GetThreadId())
36+
return MakeDelegate(this, &NetworkEngine::Initialize, m_thread, WAIT_INFINITE)(sendAddr, recvAddr, isServer);
37+
38+
int err = 0;
39+
auto type = isServer ? ZeroMqTransport::Type::PAIR_SERVER : ZeroMqTransport::Type::PAIR_CLIENT;
40+
41+
err += m_sendTransport.Create(type, sendAddr.c_str());
42+
err += m_recvTransport.Create(type, recvAddr.c_str());
43+
44+
m_transportMonitor.SendStatusCb += dmq::MakeDelegate(this, &NetworkEngine::InternalStatusHandler);
45+
46+
m_sendTransport.SetTransportMonitor(&m_transportMonitor);
47+
m_recvTransport.SetTransportMonitor(&m_transportMonitor);
48+
49+
m_sendTransport.SetRecvTransport(&m_recvTransport);
50+
m_recvTransport.SetSendTransport(&m_sendTransport);
51+
52+
m_dispatcher.SetTransport(&m_sendTransport);
53+
54+
return err;
55+
}
56+
57+
#elif defined(DMQ_TRANSPORT_WIN32_UDP) || defined(DMQ_TRANSPORT_LINUX_UDP)
58+
59+
// --------------------------------------------------------
60+
// UDP Implementation (Windows & Linux)
61+
// --------------------------------------------------------
62+
int NetworkEngine::Initialize(const std::string& sendIp, int sendPort, const std::string& recvIp, int recvPort)
63+
{
64+
if (Thread::GetCurrentThreadId() != m_thread.GetThreadId())
65+
return MakeDelegate(this, &NetworkEngine::Initialize, m_thread, WAIT_INFINITE)(sendIp, sendPort, recvIp, recvPort);
66+
67+
int err = 0;
68+
// UDP typically uses PUB/SUB or generic send/recv
69+
err += m_sendTransport.Create(UdpTransport::Type::PUB, sendIp.c_str(), sendPort);
70+
err += m_recvTransport.Create(UdpTransport::Type::SUB, recvIp.c_str(), recvPort);
71+
72+
m_transportMonitor.SendStatusCb += dmq::MakeDelegate(this, &NetworkEngine::InternalStatusHandler);
73+
74+
m_sendTransport.SetTransportMonitor(&m_transportMonitor);
75+
m_recvTransport.SetTransportMonitor(&m_transportMonitor);
76+
77+
m_sendTransport.SetRecvTransport(&m_recvTransport);
78+
m_recvTransport.SetSendTransport(&m_sendTransport);
79+
80+
m_dispatcher.SetTransport(&m_sendTransport);
81+
82+
return err;
83+
}
84+
85+
#endif
86+
87+
void NetworkEngine::Start()
88+
{
89+
if (Thread::GetCurrentThreadId() != m_thread.GetThreadId())
90+
return MakeDelegate(this, &NetworkEngine::Start, m_thread)();
91+
92+
if (!m_recvThread) {
93+
m_recvThread = new std::thread(&NetworkEngine::RecvThread, this);
94+
}
95+
96+
m_timeoutTimerConn = m_timeoutTimer.OnExpired->Connect(MakeDelegate(this, &NetworkEngine::Timeout, m_thread));
97+
m_timeoutTimer.Start(std::chrono::milliseconds(100));
98+
}
99+
100+
void NetworkEngine::Stop()
101+
{
102+
if (Thread::GetCurrentThreadId() != m_thread.GetThreadId()) {
103+
104+
// Close calls are safe for both transport types
105+
m_recvTransport.Close();
106+
m_sendTransport.Close();
107+
108+
m_recvThreadExit = true;
109+
if (m_recvThread && m_recvThread->joinable()) {
110+
m_recvThread->join();
111+
delete m_recvThread;
112+
m_recvThread = nullptr;
113+
}
114+
return MakeDelegate(this, &NetworkEngine::Stop, m_thread, WAIT_INFINITE)();
115+
}
116+
m_timeoutTimer.Stop();
117+
m_timeoutTimerConn.Disconnect();
118+
}
119+
120+
void NetworkEngine::RegisterEndpoint(dmq::DelegateRemoteId id, dmq::IRemoteInvoker* endpoint)
121+
{
122+
// Thread Safety: Ensure this runs on the Network Thread to avoid
123+
// racing with 'Incoming()' which reads this map.
124+
if (Thread::GetCurrentThreadId() != m_thread.GetThreadId())
125+
{
126+
// Marshal the call to the Network Thread
127+
MakeDelegate(this, &NetworkEngine::RegisterEndpoint, m_thread, dmq::WAIT_INFINITE)(id, endpoint);
128+
return;
129+
}
130+
131+
// Actual insertion (Safe because we are now on the correct thread)
132+
m_receiveIdMap[id] = endpoint;
133+
}
134+
135+
//------------------------------------------------------------------------------
136+
// RecvThread
137+
//------------------------------------------------------------------------------
138+
/// @brief The main loop for the background receive thread.
139+
///
140+
/// @details This function continuously polls the underlying transport layer for
141+
/// incoming data. When a packet is successfully received:
142+
/// 1. It extracts the header and payload into a stream.
143+
/// 2. It marshals the data to the internal `NetworkEngine` thread by asynchronously
144+
/// invoking `Incoming()`.
145+
///
146+
/// The `INVOKE_TIMEOUT` ensures that if the main Network Thread is deadlocked or
147+
/// its queue is full, this receive thread won't hang indefinitely trying to
148+
/// enqueue the message.
149+
void NetworkEngine::RecvThread()
150+
{
151+
// Timeout for enqueuing the message to the main thread.
152+
static const std::chrono::milliseconds INVOKE_TIMEOUT(1000);
153+
154+
while (!m_recvThreadExit)
155+
{
156+
DmqHeader header;
157+
// Use a shared_ptr for the stream to efficiently pass data between threads
158+
std::shared_ptr<xstringstream> arg_data(new xstringstream(std::ios::in | std::ios::out | std::ios::binary));
159+
160+
// Block reading from the physical transport
161+
int error = m_recvTransport.Receive(*arg_data, header);
162+
163+
if (!error && !arg_data->str().empty() && !m_recvThreadExit)
164+
{
165+
// Dispatch processing to the main NetworkEngine thread.
166+
// Passes ownership of the data stream via shared_ptr (no deep copy).
167+
MakeDelegate(this, &NetworkEngine::Incoming, m_thread, INVOKE_TIMEOUT).AsyncInvoke(header, arg_data);
168+
}
169+
}
170+
}
171+
172+
//------------------------------------------------------------------------------
173+
// Incoming
174+
//------------------------------------------------------------------------------
175+
/// @brief Handles incoming messages on the main Network Thread.
176+
///
177+
/// @details This function acts as the central dispatcher. It:
178+
/// 1. Ignores ACK messages (which are typically handled by the TransportMonitor
179+
/// or blocking wait logic).
180+
/// 2. Looks up the registered `IRemoteInvoker` endpoint associated with the message ID.
181+
/// 3. Invokes the endpoint to deserialize the arguments and trigger the
182+
/// corresponding application callback.
183+
///
184+
/// @param[in] header The message header containing the Remote ID.
185+
/// @param[in] arg_data The serialized payload stream.
186+
void NetworkEngine::Incoming(DmqHeader& header, std::shared_ptr<xstringstream> arg_data)
187+
{
188+
// Filter out ACKs; we only dispatch application data here.
189+
if (header.GetId() != ACK_REMOTE_ID) {
190+
// Find the registered endpoint for this Message ID
191+
auto it = m_receiveIdMap.find(header.GetId());
192+
193+
// If found and valid, let the endpoint handle deserialization and execution
194+
if (it != m_receiveIdMap.end() && it->second) {
195+
it->second->Invoke(*arg_data);
196+
}
197+
}
198+
}
199+
200+
void NetworkEngine::Timeout() { m_transportMonitor.Process(); }
201+
202+
void NetworkEngine::InternalErrorHandler(DelegateRemoteId id, DelegateError error, DelegateErrorAux aux) {
203+
OnError(id, error, aux);
204+
}
205+
206+
void NetworkEngine::InternalStatusHandler(DelegateRemoteId id, uint16_t seq, TransportMonitor::Status status) {
207+
OnStatus(id, seq, status);
208+
}
209+
210+
// Default virtual implementations
211+
void NetworkEngine::OnError(DelegateRemoteId, DelegateError, DelegateErrorAux) {}
212+
void NetworkEngine::OnStatus(DelegateRemoteId, uint16_t, TransportMonitor::Status) {}
213+
214+
#endif // Defined Transport Check

0 commit comments

Comments
 (0)