diff --git a/src/enhanced_adaptive_controller.hpp b/src/enhanced_adaptive_controller.hpp index 4cfe110..684aa65 100644 --- a/src/enhanced_adaptive_controller.hpp +++ b/src/enhanced_adaptive_controller.hpp @@ -238,7 +238,7 @@ class EnhancedAdaptiveFecController { struct Config { // Block size adaptation uint32_t min_k = 4; // Minimum block size - uint32_t max_k = 32; // Maximum block size + uint32_t max_k = 16; // Maximum block size (reduced from 32) uint32_t initial_k = 8; // Initial block size // FEC rate adaptation diff --git a/src/enhanced_sender.cpp b/src/enhanced_sender.cpp index 18ab0f8..37af666 100644 --- a/src/enhanced_sender.cpp +++ b/src/enhanced_sender.cpp @@ -6,7 +6,6 @@ #include #include #include -#include #include #include @@ -63,38 +62,56 @@ static int make_udp(const std::string &host, uint16_t port, return fd; } -static int send_batch_mmsg( - int sock, const sockaddr_in &dst, - const std::vector &pkts, - size_t off, size_t max_batch, uint64_t pacer_bps) { - const size_t N = std::min(max_batch, pkts.size() - off); - if (N == 0) - return 0; - - static thread_local struct mmsghdr msgs[64]; - static thread_local struct iovec iov[64]; - - for (size_t i = 0; i < N; ++i) { - iov[i].iov_base = (void *)pkts[off + i].bytes.data(); - iov[i].iov_len = pkts[off + i].bytes.size(); - std::memset(&msgs[i], 0, sizeof(msgs[i])); - msgs[i].msg_hdr.msg_iov = &iov[i]; - msgs[i].msg_hdr.msg_iovlen = 1; - msgs[i].msg_hdr.msg_name = (void *)&dst; - msgs[i].msg_hdr.msg_namelen = sizeof(dst); - } +static void send_packets(int sock, const sockaddr_in &dst, + const std::vector &pkts, + enhanced_fec::EnhancedNetworkStats *stats, + uint64_t pacer_bps) { + for (const auto &pkt : pkts) { + if (pacer_bps > 0) { + const double us = (pkt.bytes.size() * 8.0) * 1e6 / (double)pacer_bps; + if (us > 1.0) + std::this_thread::sleep_for( + std::chrono::microseconds(static_cast(us))); + } - if (pacer_bps > 0) { - size_t bytes_sum = 0; - for (size_t i = 0; i < N; ++i) - bytes_sum += pkts[off + i].bytes.size(); - const double us = (bytes_sum * 8.0) * 1e6 / (double)pacer_bps; - if (us > 1.0) - std::this_thread::sleep_for(std::chrono::microseconds((int)us)); + ::sendto(sock, pkt.bytes.data(), pkt.bytes.size(), 0, + (const sockaddr *)&dst, sizeof(dst)); + + if (stats) { + bool is_fec = pkt.seq > 1000000; // heuristik + stats->onPacketSent(pkt.bytes.size(), is_fec); + } } +} - int sent = ::sendmmsg(sock, msgs, (unsigned)N, 0); - return (sent < 0) ? -1 : sent; +struct AppData { + enhanced_fec::EnhancedBlockFecSender *fec_sender; + enhanced_fec::EnhancedNetworkStats *net_stats; + int sock; + sockaddr_in dst; + std::atomic *seq_counter; + uint64_t pacer_bps; +}; + +static GstFlowReturn on_new_sample(GstAppSink *sink, gpointer user_data) { + GstSample *sample = gst_app_sink_pull_sample(sink); + if (sample) { + AppData *data = static_cast(user_data); + GstBuffer *buffer = gst_sample_get_buffer(sample); + GstMapInfo map; + if (gst_buffer_map(buffer, &map, GST_MAP_READ)) { + if (!GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DELTA_UNIT)) { + data->fec_sender->forceStrongProtectionNextBlock(); + } + std::vector pkts; + uint64_t current_seq = data->seq_counter->fetch_add(1); + data->fec_sender->onRtp(current_seq, map.data, map.size, pkts); + send_packets(data->sock, data->dst, pkts, data->net_stats, data->pacer_bps); + gst_buffer_unmap(buffer, &map); + } + gst_sample_unref(sample); + } + return GST_FLOW_OK; } int main(int argc, char **argv) { @@ -280,7 +297,7 @@ int main(int argc, char **argv) { "x264enc name=encoder " "tune=zerolatency " "speed-preset=veryfast " - "threads=0 " + "threads=2 " "bitrate=" + std::to_string(adapt_config.initial_bitrate_kbps * 1000) + " " @@ -294,7 +311,7 @@ int main(int argc, char **argv) { "intra-refresh=false ! " "h264parse ! " "rtph264pay pt=96 config-interval=-1 mtu=1300 ! " - "appsink name=rtpout emit-signals=false sync=false max-buffers=1024 drop=false"; + "appsink name=rtpout emit-signals=true sync=false max-buffers=1024 drop=false"; } else if (enable_deinterlace) { // Deinterlacing without forcing progressive pipe = "v4l2src device=" + device + @@ -318,7 +335,7 @@ int main(int argc, char **argv) { "x264enc name=encoder " "tune=zerolatency " "speed-preset=veryfast " - "threads=0 " + "threads=2 " "bitrate=" + std::to_string(adapt_config.initial_bitrate_kbps * 1000) + " " @@ -332,7 +349,7 @@ int main(int argc, char **argv) { "intra-refresh=false ! " "h264parse ! " "rtph264pay pt=96 config-interval=-1 mtu=1300 ! " - "appsink name=rtpout emit-signals=false sync=false max-buffers=1024 drop=false"; + "appsink name=rtpout emit-signals=true sync=false max-buffers=1024 drop=false"; } else { // Basic pipeline without deinterlacing pipe = "v4l2src device=" + device + @@ -353,7 +370,7 @@ int main(int argc, char **argv) { "x264enc name=encoder " "tune=zerolatency " "speed-preset=veryfast " - "threads=0 " + "threads=2 " "bitrate=" + std::to_string(adapt_config.initial_bitrate_kbps * 1000) + " " @@ -367,7 +384,7 @@ int main(int argc, char **argv) { "intra-refresh=false ! " "h264parse ! " "rtph264pay pt=96 config-interval=-1 mtu=1300 ! " - "appsink name=rtpout emit-signals=false sync=false max-buffers=1024 drop=false"; + "appsink name=rtpout emit-signals=true sync=false max-buffers=1024 drop=false"; } GError *err = nullptr; @@ -391,19 +408,9 @@ int main(int argc, char **argv) { // ARTIK 'set_encoder_bitrate' fonksiyonu çalışır durumda. g_encoder = encoder; - gst_app_sink_set_emit_signals(GST_APP_SINK(appsink), FALSE); + g_object_set(appsink, "emit-signals", TRUE, NULL); gst_app_sink_set_drop(GST_APP_SINK(appsink), FALSE); - gst_element_set_state(pipeline, GST_STATE_PLAYING); - - // Force early IDR frames so receiver doesn't start mid-GOP - for (int i = 0; i < 3; ++i) { - GstEvent *ev = gst_video_event_new_upstream_force_key_unit( - GST_CLOCK_TIME_NONE, TRUE, 0); - gst_element_send_event(encoder, ev); - std::this_thread::sleep_for(std::chrono::milliseconds(150)); - } - // Enhanced Block-based FEC system enhanced_fec::EnhancedBlockFecSender fec_sender; if (!fec_sender.init(fec_config)) { @@ -426,6 +433,20 @@ int main(int argc, char **argv) { std::atomic running{true}; std::atomic rtp_seq_counter{0}; + AppData app_data{&fec_sender, &network_stats, sock, dst, &rtp_seq_counter, + pacer_bps}; + g_signal_connect(appsink, "new-sample", G_CALLBACK(on_new_sample), &app_data); + + gst_element_set_state(pipeline, GST_STATE_PLAYING); + + // Force early IDR frames so receiver doesn't start mid-GOP + for (int i = 0; i < 3; ++i) { + GstEvent *ev = gst_video_event_new_upstream_force_key_unit( + GST_CLOCK_TIME_NONE, TRUE, 0); + gst_element_send_event(encoder, ev); + std::this_thread::sleep_for(std::chrono::milliseconds(150)); + } + int feedback_listen_sock = ::socket(AF_INET, SOCK_DGRAM, 0); sockaddr_in fb_addr{}; std::memset(&fb_addr, 0, sizeof(fb_addr)); @@ -488,81 +509,34 @@ int main(int argc, char **argv) { : "✓ Adaptation: ENABLED\n"); std::cout << "===============================================\n\n"; - uint64_t seq = 0; auto last_adaptation_time = std::chrono::steady_clock::now(); auto last_stats_time = std::chrono::steady_clock::now(); const auto adaptation_interval = std::chrono::milliseconds(1000); // Every 1 second const auto stats_interval = std::chrono::seconds(5); // Every 5 seconds - while (true) { - GstSample *sample = gst_app_sink_pull_sample(GST_APP_SINK(appsink)); - if (!sample) - break; - - GstBuffer *buffer = gst_sample_get_buffer(sample); - GstMapInfo map{}; - if (gst_buffer_map(buffer, &map, GST_MAP_READ)) { - std::vector pkts; - fec_sender.onRtp(seq++, map.data, map.size, pkts); - rtp_seq_counter.store(seq); - - // Send all generated packets using sendmmsg batching - size_t off = 0; - const size_t BATCH = 32; - while (off < pkts.size()) { - int n = send_batch_mmsg(sock, dst, pkts, off, BATCH, pacer_bps); - if (n < 0) { - perror("sendmmsg"); - break; - } - for (int i = 0; i < n; ++i) { - const auto &pkt = pkts[off + i]; - bool is_fec = pkt.seq > 1000000; // heuristik - network_stats.onPacketSent(pkt.bytes.size(), is_fec); - } - off += (size_t)n; - } - - gst_buffer_unmap(buffer, &map); - } - gst_sample_unref(sample); - + std::cout << "Sender is running... (Press Ctrl-C to exit)" << std::endl; + while (running) { auto now = std::chrono::steady_clock::now(); - // Adaptive control (disabled in fixed mode) if (!no_adapt && now - last_adaptation_time >= adaptation_interval) { - // 1. GÜVENİLİR BİLGİYİ AL auto stats = network_stats.getStats(); - - // 2. TEK KAPTANA (CONTROLLER'A) SORARAK KARAR AL auto adaptation = adaptive_controller.adapt(stats); - - // 3. KARARLARI UYGULA (FEC) if (adaptation.k_changed || adaptation.fec_rate_changed) { fec_sender.updateFecParameters(adaptation.fec_rate, adaptation.k); } - - // (Bitrate uygulama işlemi zaten Controller içindeki callback ile - // yapılıyor) - - // 4. SONUCU RAPORLA if (adaptation.k_changed || adaptation.fec_rate_changed || adaptation.bitrate_changed) { - std::cout << "[ADAPTATION] " << adaptation.reason << "\n"; + std::cout << "[ADAPTATION] " << adaptation.reason << std::endl; } - last_adaptation_time = now; } - // Detailed statistics if (now - last_stats_time >= stats_interval) { auto net_stats = network_stats.getStats(); auto fec_stats = fec_sender.getStatistics(); std::cout << "\n--- STATISTICS ---\n"; - - // Network stats if (net_stats.network_data_valid) { std::cout << "Network: Loss=" << std::fixed << std::setprecision(2) << (net_stats.packet_loss_rate * 100) << "%, " @@ -573,14 +547,12 @@ int main(int argc, char **argv) { std::cout << "Network: Probing network conditions...\n"; } - // Throughput and efficiency std::cout << "Throughput: " << std::setprecision(1) << net_stats.throughput_kbps << " kbps, " << "FEC overhead=" << net_stats.fec_overhead_percent << "%, " << "Efficiency=" << net_stats.effective_bandwidth_utilization << "%\n"; - // Block-based FEC stats std::cout << "Blocks: Sent=" << fec_stats.blocks_sent << ", Avg size=" << std::setprecision(1) << net_stats.avg_block_size @@ -588,15 +560,15 @@ int main(int argc, char **argv) { << ", r=" << fec_stats.current_r << ", Pending=" << fec_stats.pending_packets << "\n"; - // Packet counts std::cout << "Packets: Total=" << net_stats.total_packets_sent << ", Original=" << fec_stats.originals_sent << ", Recovery=" << fec_stats.recovery_sent << "\n"; std::cout << "--- END STATS ---\n\n"; - last_stats_time = now; } + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); } // Cleanup