Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/enhanced_adaptive_controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ class EnhancedAdaptiveFecController {
struct Config {
// Block size adaptation
uint32_t min_k = 4; // Minimum block size
uint32_t max_k = 32; // Maximum block size
uint32_t max_k = 16; // Maximum block size (reduced from 32)
uint32_t initial_k = 8; // Initial block size

// FEC rate adaptation
Expand Down
174 changes: 73 additions & 101 deletions src/enhanced_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/uio.h>
#include <unistd.h>

#include <algorithm>
Expand Down Expand Up @@ -63,38 +62,56 @@ static int make_udp(const std::string &host, uint16_t port,
return fd;
}

static int send_batch_mmsg(
int sock, const sockaddr_in &dst,
const std::vector<enhanced_fec::EnhancedBlockFecSender::NetPacket> &pkts,
size_t off, size_t max_batch, uint64_t pacer_bps) {
const size_t N = std::min(max_batch, pkts.size() - off);
if (N == 0)
return 0;

static thread_local struct mmsghdr msgs[64];
static thread_local struct iovec iov[64];

for (size_t i = 0; i < N; ++i) {
iov[i].iov_base = (void *)pkts[off + i].bytes.data();
iov[i].iov_len = pkts[off + i].bytes.size();
std::memset(&msgs[i], 0, sizeof(msgs[i]));
msgs[i].msg_hdr.msg_iov = &iov[i];
msgs[i].msg_hdr.msg_iovlen = 1;
msgs[i].msg_hdr.msg_name = (void *)&dst;
msgs[i].msg_hdr.msg_namelen = sizeof(dst);
}
static void send_packets(int sock, const sockaddr_in &dst,
const std::vector<enhanced_fec::EnhancedBlockFecSender::NetPacket> &pkts,
enhanced_fec::EnhancedNetworkStats *stats,
uint64_t pacer_bps) {
for (const auto &pkt : pkts) {
if (pacer_bps > 0) {
const double us = (pkt.bytes.size() * 8.0) * 1e6 / (double)pacer_bps;
if (us > 1.0)
std::this_thread::sleep_for(
std::chrono::microseconds(static_cast<int>(us)));
}

if (pacer_bps > 0) {
size_t bytes_sum = 0;
for (size_t i = 0; i < N; ++i)
bytes_sum += pkts[off + i].bytes.size();
const double us = (bytes_sum * 8.0) * 1e6 / (double)pacer_bps;
if (us > 1.0)
std::this_thread::sleep_for(std::chrono::microseconds((int)us));
::sendto(sock, pkt.bytes.data(), pkt.bytes.size(), 0,
(const sockaddr *)&dst, sizeof(dst));

if (stats) {
bool is_fec = pkt.seq > 1000000; // heuristik
stats->onPacketSent(pkt.bytes.size(), is_fec);
}
}
}

int sent = ::sendmmsg(sock, msgs, (unsigned)N, 0);
return (sent < 0) ? -1 : sent;
struct AppData {
enhanced_fec::EnhancedBlockFecSender *fec_sender;
enhanced_fec::EnhancedNetworkStats *net_stats;
int sock;
sockaddr_in dst;
std::atomic<uint64_t> *seq_counter;
uint64_t pacer_bps;
};

static GstFlowReturn on_new_sample(GstAppSink *sink, gpointer user_data) {
GstSample *sample = gst_app_sink_pull_sample(sink);
if (sample) {
AppData *data = static_cast<AppData *>(user_data);
GstBuffer *buffer = gst_sample_get_buffer(sample);
GstMapInfo map;
if (gst_buffer_map(buffer, &map, GST_MAP_READ)) {
if (!GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DELTA_UNIT)) {
data->fec_sender->forceStrongProtectionNextBlock();
}
std::vector<enhanced_fec::EnhancedBlockFecSender::NetPacket> pkts;
uint64_t current_seq = data->seq_counter->fetch_add(1);
data->fec_sender->onRtp(current_seq, map.data, map.size, pkts);
send_packets(data->sock, data->dst, pkts, data->net_stats, data->pacer_bps);
gst_buffer_unmap(buffer, &map);
}
gst_sample_unref(sample);
}
return GST_FLOW_OK;
}

int main(int argc, char **argv) {
Expand Down Expand Up @@ -280,7 +297,7 @@ int main(int argc, char **argv) {
"x264enc name=encoder "
"tune=zerolatency "
"speed-preset=veryfast "
"threads=0 "
"threads=2 "
"bitrate=" +
std::to_string(adapt_config.initial_bitrate_kbps * 1000) +
" "
Expand All @@ -294,7 +311,7 @@ int main(int argc, char **argv) {
"intra-refresh=false ! "
"h264parse ! "
"rtph264pay pt=96 config-interval=-1 mtu=1300 ! "
"appsink name=rtpout emit-signals=false sync=false max-buffers=1024 drop=false";
"appsink name=rtpout emit-signals=true sync=false max-buffers=1024 drop=false";
} else if (enable_deinterlace) {
// Deinterlacing without forcing progressive
pipe = "v4l2src device=" + device +
Expand All @@ -318,7 +335,7 @@ int main(int argc, char **argv) {
"x264enc name=encoder "
"tune=zerolatency "
"speed-preset=veryfast "
"threads=0 "
"threads=2 "
"bitrate=" +
std::to_string(adapt_config.initial_bitrate_kbps * 1000) +
" "
Expand All @@ -332,7 +349,7 @@ int main(int argc, char **argv) {
"intra-refresh=false ! "
"h264parse ! "
"rtph264pay pt=96 config-interval=-1 mtu=1300 ! "
"appsink name=rtpout emit-signals=false sync=false max-buffers=1024 drop=false";
"appsink name=rtpout emit-signals=true sync=false max-buffers=1024 drop=false";
} else {
// Basic pipeline without deinterlacing
pipe = "v4l2src device=" + device +
Expand All @@ -353,7 +370,7 @@ int main(int argc, char **argv) {
"x264enc name=encoder "
"tune=zerolatency "
"speed-preset=veryfast "
"threads=0 "
"threads=2 "
"bitrate=" +
std::to_string(adapt_config.initial_bitrate_kbps * 1000) +
" "
Expand All @@ -367,7 +384,7 @@ int main(int argc, char **argv) {
"intra-refresh=false ! "
"h264parse ! "
"rtph264pay pt=96 config-interval=-1 mtu=1300 ! "
"appsink name=rtpout emit-signals=false sync=false max-buffers=1024 drop=false";
"appsink name=rtpout emit-signals=true sync=false max-buffers=1024 drop=false";
}

GError *err = nullptr;
Expand All @@ -391,19 +408,9 @@ int main(int argc, char **argv) {
// ARTIK 'set_encoder_bitrate' fonksiyonu çalışır durumda.
g_encoder = encoder;

gst_app_sink_set_emit_signals(GST_APP_SINK(appsink), FALSE);
g_object_set(appsink, "emit-signals", TRUE, NULL);
gst_app_sink_set_drop(GST_APP_SINK(appsink), FALSE);

gst_element_set_state(pipeline, GST_STATE_PLAYING);

// Force early IDR frames so receiver doesn't start mid-GOP
for (int i = 0; i < 3; ++i) {
GstEvent *ev = gst_video_event_new_upstream_force_key_unit(
GST_CLOCK_TIME_NONE, TRUE, 0);
gst_element_send_event(encoder, ev);
std::this_thread::sleep_for(std::chrono::milliseconds(150));
}

// Enhanced Block-based FEC system
enhanced_fec::EnhancedBlockFecSender fec_sender;
if (!fec_sender.init(fec_config)) {
Expand All @@ -426,6 +433,20 @@ int main(int argc, char **argv) {
std::atomic<bool> running{true};
std::atomic<uint64_t> rtp_seq_counter{0};

AppData app_data{&fec_sender, &network_stats, sock, dst, &rtp_seq_counter,
pacer_bps};
g_signal_connect(appsink, "new-sample", G_CALLBACK(on_new_sample), &app_data);

gst_element_set_state(pipeline, GST_STATE_PLAYING);

// Force early IDR frames so receiver doesn't start mid-GOP
for (int i = 0; i < 3; ++i) {
GstEvent *ev = gst_video_event_new_upstream_force_key_unit(
GST_CLOCK_TIME_NONE, TRUE, 0);
gst_element_send_event(encoder, ev);
std::this_thread::sleep_for(std::chrono::milliseconds(150));
}

int feedback_listen_sock = ::socket(AF_INET, SOCK_DGRAM, 0);
sockaddr_in fb_addr{};
std::memset(&fb_addr, 0, sizeof(fb_addr));
Expand Down Expand Up @@ -488,81 +509,34 @@ int main(int argc, char **argv) {
: "✓ Adaptation: ENABLED\n");
std::cout << "===============================================\n\n";

uint64_t seq = 0;
auto last_adaptation_time = std::chrono::steady_clock::now();
auto last_stats_time = std::chrono::steady_clock::now();
const auto adaptation_interval =
std::chrono::milliseconds(1000); // Every 1 second
const auto stats_interval = std::chrono::seconds(5); // Every 5 seconds

while (true) {
GstSample *sample = gst_app_sink_pull_sample(GST_APP_SINK(appsink));
if (!sample)
break;

GstBuffer *buffer = gst_sample_get_buffer(sample);
GstMapInfo map{};
if (gst_buffer_map(buffer, &map, GST_MAP_READ)) {
std::vector<enhanced_fec::EnhancedBlockFecSender::NetPacket> pkts;
fec_sender.onRtp(seq++, map.data, map.size, pkts);
rtp_seq_counter.store(seq);

// Send all generated packets using sendmmsg batching
size_t off = 0;
const size_t BATCH = 32;
while (off < pkts.size()) {
int n = send_batch_mmsg(sock, dst, pkts, off, BATCH, pacer_bps);
if (n < 0) {
perror("sendmmsg");
break;
}
for (int i = 0; i < n; ++i) {
const auto &pkt = pkts[off + i];
bool is_fec = pkt.seq > 1000000; // heuristik
network_stats.onPacketSent(pkt.bytes.size(), is_fec);
}
off += (size_t)n;
}

gst_buffer_unmap(buffer, &map);
}
gst_sample_unref(sample);

std::cout << "Sender is running... (Press Ctrl-C to exit)" << std::endl;
while (running) {
auto now = std::chrono::steady_clock::now();

// Adaptive control (disabled in fixed mode)
if (!no_adapt && now - last_adaptation_time >= adaptation_interval) {
// 1. GÜVENİLİR BİLGİYİ AL
auto stats = network_stats.getStats();

// 2. TEK KAPTANA (CONTROLLER'A) SORARAK KARAR AL
auto adaptation = adaptive_controller.adapt(stats);

// 3. KARARLARI UYGULA (FEC)
if (adaptation.k_changed || adaptation.fec_rate_changed) {
fec_sender.updateFecParameters(adaptation.fec_rate, adaptation.k);
}

// (Bitrate uygulama işlemi zaten Controller içindeki callback ile
// yapılıyor)

// 4. SONUCU RAPORLA
if (adaptation.k_changed || adaptation.fec_rate_changed ||
adaptation.bitrate_changed) {
std::cout << "[ADAPTATION] " << adaptation.reason << "\n";
std::cout << "[ADAPTATION] " << adaptation.reason << std::endl;
}

last_adaptation_time = now;
}

// Detailed statistics
if (now - last_stats_time >= stats_interval) {
auto net_stats = network_stats.getStats();
auto fec_stats = fec_sender.getStatistics();

std::cout << "\n--- STATISTICS ---\n";

// Network stats
if (net_stats.network_data_valid) {
std::cout << "Network: Loss=" << std::fixed << std::setprecision(2)
<< (net_stats.packet_loss_rate * 100) << "%, "
Expand All @@ -573,30 +547,28 @@ int main(int argc, char **argv) {
std::cout << "Network: Probing network conditions...\n";
}

// Throughput and efficiency
std::cout << "Throughput: " << std::setprecision(1)
<< net_stats.throughput_kbps << " kbps, "
<< "FEC overhead=" << net_stats.fec_overhead_percent << "%, "
<< "Efficiency=" << net_stats.effective_bandwidth_utilization
<< "%\n";

// Block-based FEC stats
std::cout << "Blocks: Sent=" << fec_stats.blocks_sent
<< ", Avg size=" << std::setprecision(1)
<< net_stats.avg_block_size
<< ", Current k=" << fec_stats.current_k
<< ", r=" << fec_stats.current_r
<< ", Pending=" << fec_stats.pending_packets << "\n";

// Packet counts
std::cout << "Packets: Total=" << net_stats.total_packets_sent
<< ", Original=" << fec_stats.originals_sent
<< ", Recovery=" << fec_stats.recovery_sent << "\n";

std::cout << "--- END STATS ---\n\n";

last_stats_time = now;
}

std::this_thread::sleep_for(std::chrono::milliseconds(100));
}

// Cleanup
Expand Down