From 1e2b7f7c00fbded9e9ebb83b10ce302155ba444f Mon Sep 17 00:00:00 2001 Message-ID: <1e2b7f7c00fbded9e9ebb83b10ce302155ba444f.1737052666.git.sam@gentoo.org> In-Reply-To: <1993383ddf67e296334c7916d6afc699ee6300c7.1737052666.git.sam@gentoo.org> References: <1993383ddf67e296334c7916d6afc699ee6300c7.1737052666.git.sam@gentoo.org> From: Wim Taymans Date: Tue, 26 Nov 2024 17:45:41 +0100 Subject: [PATCH 4/8] gst: add rate control to the sink Track the elapsed time between buffers and try to keep the buffer fill level constant by changing the rate of the stream. See #4374 --- src/gst/gstpipewiresink.c | 76 ++++++++++++++++++++++++++++++++++--- src/gst/gstpipewiresink.h | 4 ++ src/gst/gstpipewirestream.c | 1 + src/gst/gstpipewirestream.h | 8 ++++ src/gst/meson.build | 2 +- 5 files changed, 84 insertions(+), 7 deletions(-) diff --git a/src/gst/gstpipewiresink.c b/src/gst/gstpipewiresink.c index d79ceaa66..33f2322e9 100644 --- a/src/gst/gstpipewiresink.c +++ b/src/gst/gstpipewiresink.c @@ -26,6 +26,7 @@ #include #include +#include #include @@ -481,14 +482,13 @@ static void do_send_buffer (GstPipeWireSink *pwsink, GstBuffer *buffer) { GstPipeWirePoolData *data; + GstPipeWireStream *stream = pwsink->stream; gboolean res; guint i; struct spa_buffer *b; data = gst_pipewire_pool_get_data(buffer); - GST_LOG_OBJECT (pwsink, "queue buffer %p, pw_buffer %p", buffer, data->b); - b = data->b->buffer; if (data->header) { @@ -508,12 +508,15 @@ do_send_buffer (GstPipeWireSink *pwsink, GstBuffer *buffer) data->crop->region.size.height = meta->width; } } + data->b->size = 0; for (i = 0; i < b->n_datas; i++) { struct spa_data *d = &b->datas[i]; GstMemory *mem = gst_buffer_peek_memory (buffer, i); d->chunk->offset = mem->offset; d->chunk->size = mem->size; - d->chunk->stride = pwsink->stream->pool->video_info.stride[i]; + d->chunk->stride = stream->pool->video_info.stride[i]; + + data->b->size += mem->size / 4; } GstVideoMeta *meta = gst_buffer_get_video_meta (buffer); @@ -532,9 +535,50 @@ do_send_buffer (GstPipeWireSink *pwsink, GstBuffer *buffer) } } - if ((res = pw_stream_queue_buffer (pwsink->stream->pwstream, data->b)) < 0) { + if ((res = pw_stream_queue_buffer (stream->pwstream, data->b)) < 0) { g_warning ("can't send buffer %s", spa_strerror(res)); } + + if (pwsink->rate_match) { + double err, corr; + struct pw_time ts; + guint64 queued, now, elapsed, target; + + pw_stream_get_time_n(stream->pwstream, &ts, sizeof(ts)); + now = pw_stream_get_nsec(stream->pwstream); + if (ts.now != 0) + elapsed = gst_util_uint64_scale_int (now - ts.now, ts.rate.denom, GST_SECOND * ts.rate.num); + else + elapsed = 0; + + queued = ts.queued - ts.size; + target = 2 * elapsed; + err = ((gint64)queued - ((gint64)target)); + + corr = spa_dll_update(&stream->dll, SPA_CLAMPD(err, -128.0, 128.0)); + + stream->err_wdw = (double)ts.rate.denom/ts.size; + + double avg = (stream->err_avg * stream->err_wdw + (err - stream->err_avg)) / (stream->err_wdw + 1.0); + stream->err_var = (stream->err_var * stream->err_wdw + + (err - stream->err_avg) * (err - avg)) / (stream->err_wdw + 1.0); + stream->err_avg = avg; + + if (stream->last_ts == 0 || stream->last_ts + SPA_NSEC_PER_SEC < now) { + stream->last_ts = now; + spa_dll_set_bw(&stream->dll, SPA_CLAMPD(fabs(stream->err_avg) / sqrt(fabs(stream->err_var)), 0.001, SPA_DLL_BW_MAX), + ts.size, ts.rate.denom); + GST_INFO_OBJECT (pwsink, "queue buffer %p, pw_buffer %p q:%"PRIi64"/%"PRIi64" e:%"PRIu64 + " err:%+03f corr:%f %f %f %f", + buffer, data->b, ts.queued, ts.size, elapsed, err, corr, + stream->err_avg, stream->err_var, stream->dll.bw); + } + + if (pwsink->match) { + pwsink->match->rate = corr; + SPA_FLAG_UPDATE(pwsink->match->flags, SPA_IO_RATE_MATCH_FLAG_ACTIVE, true); + } + } } @@ -576,6 +620,18 @@ on_state_changed (void *data, enum pw_stream_state old, enum pw_stream_state sta pw_thread_loop_signal (pwsink->stream->core->loop, FALSE); } +static void +on_io_changed (void *obj, uint32_t id, void *data, uint32_t size) +{ + GstPipeWireSink *pwsink = obj; + + switch (id) { + case SPA_IO_RateMatch: + pwsink->match = data; + break; + } +} + static void on_param_changed (void *data, uint32_t id, const struct spa_pod *param) { @@ -613,9 +669,16 @@ gst_pipewire_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) pwsink = GST_PIPEWIRE_SINK (bsink); s = gst_caps_get_structure (caps, 0); - rate = 0; - if (gst_structure_has_name (s, "audio/x-raw")) + if (gst_structure_has_name (s, "audio/x-raw")) { gst_structure_get_int (s, "rate", &rate); + pwsink->rate = rate; + pwsink->rate_match = true; + } else { + pwsink->rate = rate = 0; + pwsink->rate_match = false; + } + + spa_dll_set_bw(&pwsink->stream->dll, SPA_DLL_BW_MIN, 4096, rate); possible = gst_caps_to_format_all (caps); @@ -791,6 +854,7 @@ not_negotiated: static const struct pw_stream_events stream_events = { PW_VERSION_STREAM_EVENTS, .state_changed = on_state_changed, + .io_changed = on_io_changed, .param_changed = on_param_changed, .add_buffer = on_add_buffer, .remove_buffer = on_remove_buffer, diff --git a/src/gst/gstpipewiresink.h b/src/gst/gstpipewiresink.h index 74e6667e6..33d7b5b4f 100644 --- a/src/gst/gstpipewiresink.h +++ b/src/gst/gstpipewiresink.h @@ -50,8 +50,12 @@ struct _GstPipeWireSink { /* video state */ gboolean negotiated; + gboolean rate_match; + gint rate; GstPipeWireSinkMode mode; + + struct spa_io_rate_match *match; }; GType gst_pipewire_sink_mode_get_type (void); diff --git a/src/gst/gstpipewirestream.c b/src/gst/gstpipewirestream.c index bf7641548..68cb9be21 100644 --- a/src/gst/gstpipewirestream.c +++ b/src/gst/gstpipewirestream.c @@ -19,6 +19,7 @@ gst_pipewire_stream_init (GstPipeWireStream * self) self->fd = -1; self->client_name = g_strdup (pw_get_client_name()); self->pool = gst_pipewire_pool_new (self); + spa_dll_init(&self->dll); } static void diff --git a/src/gst/gstpipewirestream.h b/src/gst/gstpipewirestream.h index ff8c8e2e6..a301375c7 100644 --- a/src/gst/gstpipewirestream.h +++ b/src/gst/gstpipewirestream.h @@ -11,6 +11,7 @@ #include "gstpipewirecore.h" #include +#include #include G_BEGIN_DECLS @@ -29,6 +30,13 @@ struct _GstPipeWireStream { GstPipeWirePool *pool; GstClock *clock; + guint64 position; + struct spa_dll dll; + double err_avg, err_var, err_wdw; + guint64 last_ts; + guint64 base_buffer_ts; + guint64 base_ts; + /* the actual pw stream */ struct pw_stream *pwstream; struct spa_hook pwstream_listener; diff --git a/src/gst/meson.build b/src/gst/meson.build index ba1f6d558..1e39bcf89 100644 --- a/src/gst/meson.build +++ b/src/gst/meson.build @@ -27,7 +27,7 @@ pipewire_gst_headers = [ pipewire_gst = shared_library('gstpipewire', pipewire_gst_sources, include_directories : [ configinc ], - dependencies : [ spa_dep, gst_dep, pipewire_dep ], + dependencies : [ spa_dep, gst_dep, pipewire_dep, mathlib ], install : true, install_dir : '@0@/gstreamer-1.0'.format(get_option('libdir')), ) -- 2.48.0