This is only true if those notifications are different; if they are identical, such as in the same the notification is to alert listeners some table has new data (for cache invalidation), they are sent out as one notification only. See source code comment in async.c:
* Duplicate notifications from the same transaction are sent out as one
* notification only. This is done to save work when for example a trigger
* on a 2 million row table fires a notification for each row that has been
* changed. If the application needs to receive every single notification
* that has been sent, it can easily add some unique string into the extra
* payload parameter.It allows to publish all changes from the db to Kafka.
Naturally, if the consumer is down, WAL retained for that replication slot continues to grow until it comes back up again, hence monitoring is key (or the slot gets invalidated at a certain threshold, it will restart with a new initial snapshot).
Disclaimer: I used to lead the Debezium project
[1] https://www.morling.dev/blog/mastering-postgres-replication-...
It sounds silly, but caused enormous headaches and problems for the project I was working on (Materialize), one of whose main use cases is creating incrementally maintained live materialized views on top of replicated Postgres (or MySql) data.
(Disclaimer: I used to lead the Debezium project)
In particular for Postgres, consumers can detect and reject duplicates really easy though, by tracking a watermark for the {Commit LSN / Event LSN} tuple which is monotonically increasing. So a consumer just needs to compare the value for that tuple from the incoming event to the highest event it has received before. If the incoming value is lower, the event must be a duplicate. We added support for exposing this via the `source.sequence` field a while back upon request by the Materialize team btw.
[1] https://debezium.io/documentation/reference/stable/configura....
For our use case, it didn't matter if it was rare or not: the fact that it could happen at all meant we needed to be robust to it, which basically meant storing the entire database in memory.
> We added support for exposing this via the `source.sequence` field a while back upon request by the Materialize team btw.
Yes, I helped work on this! I'm not sure whether Materialize is still using it (it's been years since I've thought about MZ/Debezium integration) but it was helpful, thanks.
https://debezium.io/documentation/reference/stable/configura...
You could probably achieve something similar with the NATS Jetstream sink as well, which has similar capabilities - though I think it doesnt have quite the same guarantees.
I switched to using Debezium a few months ago, after a Golang alternative to Debezium + Kafka Connect - ConduitIO - went defunct. I should have been using Debezium all along, as it is clearly the most mature, most stable option in the space, with the best prospects for long-term survival. Highly recommended, even if it is JVM (though they're currently doing some interesting stuff with Quarkus and GraalVM that might lead to a jvm-free binary at some point)
> Even if you have 20 database connections making 20 transactions in parallel, all of them need to wait for their turn to lock the notification queue, add their notification, and unlock the queue again. This creates a bottleneck especially in high-throughput databases.
We're currently working hard on optimizing LISTEN/NOTIFY: https://www.postgresql.org/message-id/flat/6899c044-4a82-49b...
If you have any experiences of actual workload where you are currently experiencing performance/scalability problems, I would be interested in hearing from you, to better understand the actual workload. In some workloads, you might only listen to a single channel. For such single-channel workloads, the current implementation seems hard to tweak further, given the semantics and in-commit-order guarantees. However, for multi-channel workloads, we could do a lot better, which is what the linked patch is about. The main problem with the current implementation for multi-channel workloads, is that we currently signal and wake all listening backends (a backend is the PostgreSQL processes your client is connected to), even if they are not interested in the specific channels being notified in the current commit. This means that if you have 100 connections open in which each connect client has made a LISTEN on a different channel, then when someone does a NOTIFY on one of those channels, instead of just signaling the backend that listen on that channel, all 100 backends will be signaled. For multi-channel workloads, this could mean an enormous extra cost coming from the context-switching due to the signaling.
I would greatly appreciate if you could please reply to this comment and share your different workloads when you've had problems with LISTEN/NOTIFY, to better understand approximately how many listening backends you had, and how many channels you had, and the mix of volume on such channels. Anything that could help us do better realistic simulations of such workloads, to improve the benchmark tests we're working on. Thank you.
CREATE TRIGGER notify_events_trg AFTER INSERT ON xxx.events FOR EACH ROW EXECUTE PROCEDURE public.notify_events();
CREATE FUNCTION public.notify_events() RETURNS trigger
LANGUAGE plpgsql
AS $$
BEGIN
PERFORM pg_notify('events', row_to_json(NEW)::text);
RETURN NEW;
END;
$$;
And then we have a bunch of triggers like this on many tables: CREATE TRIGGER create_category_event_trg AFTER INSERT OR DELETE OR UPDATE ON public.categories FOR EACH ROW EXECUTE PROCEDURE public.create_category_event();
CREATE FUNCTION public.create_category_event() RETURNS trigger
LANGUAGE plpgsql SECURITY DEFINER
AS $$
DECLARE
category RECORD;
payload JSONB;
BEGIN
category := COALESCE(NEW, OLD);
payload := jsonb_build_object('id', category.id);
IF NEW IS NULL OR NEW.deleted_at IS NOT NULL THEN
payload := jsonb_set(payload, '{deleted}', 'true');
END IF;
INSERT INTO xxx.events (channel, inserted_at, payload)
VALUES ('category', NOW() AT TIME ZONE 'utc', payload);
RETURN NULL;
END;
$$;
We found no notable performance issues. We have a single LISTEN in another application. We did some stress testing and found that it performs way better than we would ever needThe work-around solution we used at Trustly (a company I co-founded), is a component named `allas` that a colleague of mine at that time, Marko Tikkaja, created to solve our problems, that massively reduced the load on our servers. Marko has open sourced and published this work here: https://github.com/johto/allas
Basically, `allas` opens up a single connection to PostgreSQL, on which it LISTEN on all the channels it needs to listen on. Then clients connect to `allas` over the PostgreSQL protocol, so it's basically faking a PostgreSQL server, and when clients do LISTEN on a channel with allas, allas will then LISTEN on that channel on the real PostgreSQL server on the single connection it needs. Thanks to `allas` being implemented in Go, using Go's efficient goroutines for concurrency, it efficiently scales with lots and lots of connections. I'm not a Go-expert myself, but I've understood Go is quite well suited for this type of application.
This component is still being used at Trustly, and is battle-tested and production grade.
That said, it would of course be much better to avoid the need for a separate component, and fix the scalability issues in core PostgreSQL, so that's what I'm currently working on.
https://danielmangum.com/posts/k8s-asa-watching-and-caching/
In my case I have an IoT setting, where my devices can change their "DesiredState", and I want to listen on this to push some message to MQTT... but then there might be also other cases where I want to listen to some messages elsewhere (eg do something when there is an alert on a device, or listen to some unrelated object, eg users, etc)
I'm not clear right now what would be the best setting to do this, the tradeoffs, etc
Imagine I have eg 100k to 10M range of devices, that sometimes these are updated in bulks and change their DesiredState 10k at a time, would NOTIFY work in that case? Should I use the WAL/Debezium/etc?
Can you try to "dumb down" in which cases we can use NOTIFY/LISTEN and in which case it's best not to? you're saying something about single-channel/multi-channel/etc but to a newcomer I'm not clear on what all these are
I'm only sharing this should it be helpful:
def up do
whitelist = Enum.join(@user_columns ++ ["tick"], "', '")
execute """
CREATE OR REPLACE FUNCTION notify_phrasing() RETURNS trigger AS $$
DECLARE
notif jsonb;
col_name text;
col_value text;
uuids jsonb := '{}'::jsonb;
user_columns text[] := ARRAY['#{whitelist}'];
BEGIN
-- First, add all UUID columns
FOR col_name IN
SELECT column_name
FROM information_schema.columns
WHERE table_name = TG_TABLE_NAME AND data_type = 'uuid'
LOOP
EXECUTE format('SELECT ($1).%I::text', col_name)
INTO col_value
USING CASE WHEN TG_OP = 'DELETE' THEN OLD ELSE NEW END;
IF col_value IS NOT NULL THEN
uuids := uuids || jsonb_build_object(col_name, col_value);
END IF;
END LOOP;
-- Then, add user columns if they exist in the table
FOREACH col_name IN ARRAY user_columns
LOOP
IF EXISTS (
SELECT 1
FROM information_schema.columns
WHERE table_name = TG_TABLE_NAME AND column_name = col_name
) THEN
EXECUTE format('SELECT ($1).%I::text', col_name)
INTO col_value
USING CASE WHEN TG_OP = 'DELETE' THEN OLD ELSE NEW END;
IF col_value IS NOT NULL THEN
uuids := uuids || jsonb_build_object(col_name, col_value);
END IF;
END IF;
END LOOP;
notif = jsonb_build_object(
'table', TG_TABLE_NAME,
'event', TG_OP,
'uuids', uuids
);
PERFORM pg_notify('phrasing', notif::text);
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
"""
# Create trigger for each table
Enum.each(@tables, fn table ->
execute """
CREATE TRIGGER notify_phrasing__#{table}
AFTER INSERT OR UPDATE OR DELETE ON #{table}
FOR EACH ROW EXECUTE FUNCTION notify_phrasing();
"""
end)
end
I do react to most (70%) of my database changes in some way shape or form, and post them to a PubSub topic with the uuids. All of my dispatching can be done off of uuids.The listeners were scattered between replicas, so we took advantage of Advisory Locks (https://www.postgresql.org/docs/current/explicit-locking.htm...) by choosing a unique key (unique to the data being sent) and before performing any task on the JSON payload, that the notification would send, we would first check the lock and continue the routine.
The NOTIFY routine was triggered after an insert in an Outbox table, so we could replay it if it failed for some reason.
Unfortunately I don't remember the exact reason we didn't use it, but I was bit sceptical for 2 reasons:
- I had a feeling that I was use this feature in a wrong way; and
- I read both the article and comments from this HN entry: https://news.ycombinator.com/item?id=44490510, and my first point felt validated;
but to this day I'm still unsure. Anyway, since it was a complementary system, it didn't hurt to leave it out, we had another background job that would process the outbox table regardless, but I felt it could/would give something closer to "real time" in our system.
In general, a single-queue design doesn’t make throughput collapse when you add more parallelism; it just gives you a fixed ceiling. With a well-designed queue, throughput goes up with concurrency, then flattens when the serialized section (the queue) saturates, maybe sagging a bit from context switching.
If instead you see performance severely degrade as you add workers, that typically means there’s an additional problem beyond “we have one queue” — things like broadcast wakeups (“every event wakes every listener”), global scans on each event, or other O(N) work per operation. That’s a very different, and more serious, scalability bug than simply relying on a single queue.
Ever since I saw Martin Kleppman’s “Turning the database inside out” talk I’ve wanted an easy way to hook into a transaction log. Apache samza/kafka is very cool but I’m not going to set it up for personal projects. It’d be VERY cool to make materialized views straight from the log!!
This is what Materialize does. You point it at some PG (or MySQL, or... probably lots more by now) sources, and then you can define arbitrary views on top of those sources using SQL (with the full power of relational logic: fully precise joins, etc.) The views are maintained incrementally, so they update almost instantly when the underlying data updates; you don't have to manually refresh them.
Disclaimer: I worked on Materialize for five years, and it is a commercial, proprietary project. But it is source-available (https://github.com/materializeinc/materialize) under a BSL-style license, and it has fairly generous terms for free usage.
I recently had the opportunity to play with PostgreSQL WAL in the scope of implementing opensearch cdc pipeline, and it was really exciting to see what is possible to achieve with it.
Be cautious with idle replica slots though, I got bitten by inactive slots filling up the production database storage.
PostgreSQL 18 introduces idle_replication_slot_timeout to mitigate this.
Some examples of difficulties we've ran into: 1. LSNs for transactions (commits) are strictly increasing, but not for individual operations across transactions. You may not pick this up during basic testing, but it starts showing up when you have concurrent transactions. 2. You cannot resume logical replication in the middle of a transaction (you have to restart the transaction), which becomes relevant when you have large transactions. 3. In most cases, replication slots cannot be preserved when upgrading Postgres major versions. 4. When you have multiple Postgres clusters in a HA setup, you _can_ use logical replication, but it becomes more tricky (better in recent Postgres versions, but you're still responsible for making sure the slots are synced). 5. Replication slots can break in many different ways, and there's no good way to know all the possible failure modes until you've run into them. Especially fun when your server ran out of disk space at some point. It's a little better with Postgres 17+ exposing wal_status and invalidation_reason on pg_replication_slots. 6. You need to make sure to acknowledge keepalive messages and not only data messages, otherwise the WAL can keep growing indefinitely when you don't have incoming changes (depending on the hosting provider). 7. Common drivers often either don't implement the replication protocol at all, or attempt to abstract away low-level details that you actually need. Here it's great that the article actually explains the low-level protocol details.
At the end of the day the simplicity of L/N made it well worth the performance degradation. Still making thousands-to-millions of writes per second, so when the original article said they were 'exaggerating' I think they may have gone too far.
I've been hoping WAL gets some more documentation love in the years/decades L/N will serve me should I ever need to upgrade, so please share more! :D
I never worked with WalEx but have experience with Supabase Realtime and it is a joy to work with and fits great into the Supa ecosystem. So many layers are disappearing when you rely more on Postgres!
[0] https://github.com/supabase/realtime [1] https://github.com/cpursley/walex
I'm building kafka connect (i.e. Debezium) compatible replication:
https://github.com/turbolytics/librarian/pull/41
-----
I really like the mongo change stream API. I think it elegantly hides a lot of the complexity of replication. As a side-effect of building postres -> kafka replication, I'm also trying to build a nicer replication primitive for postgres, that hides some of the underlying replication protocol complexity!
Please stop doing this via database. Scaling issues can develop and we in Ops catch flak for your bad design choices. Use Kafka/RabbitMQ/Redis PLEASE.
Sure, if you are tiny, will forever remain tiny, then whatever, ignore this Ops person.
It is probably best to use that unless there is a strong reason against.
Even with Debezium I have run into myriad issues. It blows my mind that someone would want to roll their own logic to set up replicas. IMHO this post should be advertised more as a fun intellectual exercise.
Specifically: If the server requests a reply on a heartbeat, the status update should include the heartbeat's LSN on the next loop. But a standby status update includes the LSN values + 1.
I was able to get it working and properly disconnecting to a fast shutdown, but when you get into the internals of the logical WAL receiver loop, it can get nuanced.
And my largest compliment is that the Postgres discord is filled with some extremely knowledgeable and helpful people. I was able to figure out some really specific and nuanced behavior around the different status messages being sent to the primary server, thanks to the in-depth responses there.
- PG did logical logging natively instead of via WAL decoding
- PG logged commit records in the WAL so that any code replaying transactions from following the logical replication stream can isolate each transaction's changes and serialize everything correctly
morshu9001•2mo ago
JoelJacobson•2mo ago