
# Copyright (c) 2021-2025, PostgreSQL Global Development Group

# Binary mode logical replication test

use strict;
use warnings FATAL => 'all';
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;

# Create and initialize a publisher node
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
$node_publisher->init(allows_streaming => 'logical');
$node_publisher->start;

# Create and initialize subscriber node
my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
$node_subscriber->init;
$node_subscriber->start;

# Create tables on both sides of the replication
my $ddl = qq(
	CREATE TABLE public.test_numerical (
		a INTEGER PRIMARY KEY,
		b NUMERIC,
		c FLOAT,
		d BIGINT
		);
	CREATE TABLE public.test_arrays (
		a INTEGER[] PRIMARY KEY,
		b NUMERIC[],
		c TEXT[]
		););

$node_publisher->safe_psql('postgres', $ddl);
$node_subscriber->safe_psql('postgres', $ddl);

# Configure logical replication
$node_publisher->safe_psql('postgres',
	"CREATE PUBLICATION tpub FOR ALL TABLES");

# ------------------------------------------------------
# Ensure binary mode also executes COPY in binary format
# ------------------------------------------------------

# Insert some content before creating a subscription
$node_publisher->safe_psql(
	'postgres', qq(
    INSERT INTO public.test_numerical (a, b, c, d) VALUES
		(1, 1.2, 1.3, 10),
        (2, 2.2, 2.3, 20);
	INSERT INTO public.test_arrays (a, b, c) VALUES
		('{1,2,3}', '{1.1, 1.2, 1.3}', '{"one", "two", "three"}'),
        ('{3,1,2}', '{1.3, 1.1, 1.2}', '{"three", "one", "two"}');
	));

my $publisher_connstring = $node_publisher->connstr . ' dbname=postgres';
$node_subscriber->safe_psql('postgres',
		"CREATE SUBSCRIPTION tsub CONNECTION '$publisher_connstring' "
	  . "PUBLICATION tpub WITH (slot_name = tpub_slot, binary = true)");

# Ensure the COPY command is executed in binary format on the publisher
$node_publisher->wait_for_log(
	qr/LOG: ( [A-Z0-9]+:)? statement: COPY (.+)? TO STDOUT WITH \(FORMAT binary\)/
);

# Ensure nodes are in sync with each other
$node_subscriber->wait_for_subscription_sync($node_publisher, 'tsub');

my $sync_check = qq(
	SELECT a, b, c, d FROM test_numerical ORDER BY a;
	SELECT a, b, c FROM test_arrays ORDER BY a;
);

# Check the synced data on the subscriber
my $result = $node_subscriber->safe_psql('postgres', $sync_check);

is( $result, '1|1.2|1.3|10
2|2.2|2.3|20
{1,2,3}|{1.1,1.2,1.3}|{one,two,three}
{3,1,2}|{1.3,1.1,1.2}|{three,one,two}', 'check synced data on subscriber');

# ----------------------------------
# Ensure apply works in binary mode
# ----------------------------------

# Insert some content and make sure it's replicated across
$node_publisher->safe_psql(
	'postgres', qq(
	INSERT INTO public.test_arrays (a, b, c) VALUES
		('{2,1,3}', '{1.2, 1.1, 1.3}', '{"two", "one", "three"}'),
		('{1,3,2}', '{1.1, 1.3, 1.2}', '{"one", "three", "two"}');

	INSERT INTO public.test_numerical (a, b, c, d) VALUES
		(3, 3.2, 3.3, 30),
		(4, 4.2, 4.3, 40);
	));

$node_publisher->wait_for_catchup('tsub');

$result = $node_subscriber->safe_psql('postgres',
	"SELECT a, b, c, d FROM test_numerical ORDER BY a");

is( $result, '1|1.2|1.3|10
2|2.2|2.3|20
3|3.2|3.3|30
4|4.2|4.3|40', 'check replicated data on subscriber');

# Test updates as well
$node_publisher->safe_psql(
	'postgres', qq(
	UPDATE public.test_arrays SET b[1] = 42, c = NULL;
	UPDATE public.test_numerical SET b = 42, c = NULL;
	));

$node_publisher->wait_for_catchup('tsub');

$result = $node_subscriber->safe_psql('postgres',
	"SELECT a, b, c FROM test_arrays ORDER BY a");

is( $result, '{1,2,3}|{42,1.2,1.3}|
{1,3,2}|{42,1.3,1.2}|
{2,1,3}|{42,1.1,1.3}|
{3,1,2}|{42,1.1,1.2}|', 'check updated replicated data on subscriber');

$result = $node_subscriber->safe_psql('postgres',
	"SELECT a, b, c, d FROM test_numerical ORDER BY a");

is( $result, '1|42||10
2|42||20
3|42||30
4|42||40', 'check updated replicated data on subscriber');

# ------------------------------------------------------------------------------
# Use ALTER SUBSCRIPTION to change to text format and then back to binary format
# ------------------------------------------------------------------------------

# Test to reset back to text formatting, and then to binary again
$node_subscriber->safe_psql('postgres',
	"ALTER SUBSCRIPTION tsub SET (binary = false);");

$node_publisher->safe_psql(
	'postgres', qq(
	INSERT INTO public.test_numerical (a, b, c, d) VALUES
		(5, 5.2, 5.3, 50);
	));

$node_publisher->wait_for_catchup('tsub');

$result = $node_subscriber->safe_psql('postgres',
	"SELECT a, b, c, d FROM test_numerical ORDER BY a");

is( $result, '1|42||10
2|42||20
3|42||30
4|42||40
5|5.2|5.3|50', 'check replicated data on subscriber');

$node_subscriber->safe_psql('postgres',
	"ALTER SUBSCRIPTION tsub SET (binary = true);");

$node_publisher->safe_psql(
	'postgres', qq(
	INSERT INTO public.test_arrays (a, b, c) VALUES
		('{2,3,1}', '{1.2, 1.3, 1.1}', '{"two", "three", "one"}');
	));

$node_publisher->wait_for_catchup('tsub');

$result = $node_subscriber->safe_psql('postgres',
	"SELECT a, b, c FROM test_arrays ORDER BY a");

is( $result, '{1,2,3}|{42,1.2,1.3}|
{1,3,2}|{42,1.3,1.2}|
{2,1,3}|{42,1.1,1.3}|
{2,3,1}|{1.2,1.3,1.1}|{two,three,one}
{3,1,2}|{42,1.1,1.2}|', 'check replicated data on subscriber');

# ---------------------------------------------------------------
# Test binary replication without and with send/receive functions
# ---------------------------------------------------------------

# Create a custom type without send/rcv functions
$ddl = qq(
    CREATE TYPE myvarchar;
    CREATE FUNCTION myvarcharin(cstring, oid, integer) RETURNS myvarchar
        LANGUAGE internal IMMUTABLE PARALLEL SAFE STRICT AS 'varcharin';
    CREATE FUNCTION myvarcharout(myvarchar) RETURNS cstring
        LANGUAGE internal IMMUTABLE PARALLEL SAFE STRICT AS 'varcharout';
    CREATE TYPE myvarchar (
        input = myvarcharin,
        output = myvarcharout);
    CREATE TABLE public.test_myvarchar (
        a myvarchar
    ););

$node_publisher->safe_psql('postgres', $ddl);
$node_subscriber->safe_psql('postgres', $ddl);

# Insert some initial data
$node_publisher->safe_psql(
	'postgres', qq(
    INSERT INTO public.test_myvarchar (a) VALUES
		('a');
    ));

# Check the subscriber log from now on.
my $offset = -s $node_subscriber->logfile;

# Refresh the publication to trigger the tablesync
$node_subscriber->safe_psql('postgres',
	"ALTER SUBSCRIPTION tsub REFRESH PUBLICATION");

# It should fail
$node_subscriber->wait_for_log(
	qr/ERROR: ( [A-Z0-9]+:)? no binary input function available for type/,
	$offset);

# Create and set send/rcv functions for the custom type
$ddl = qq(
    CREATE FUNCTION myvarcharsend(myvarchar) RETURNS bytea
        LANGUAGE internal STABLE PARALLEL SAFE STRICT AS 'varcharsend';
    CREATE FUNCTION myvarcharrecv(internal, oid, integer) RETURNS myvarchar
        LANGUAGE internal STABLE PARALLEL SAFE STRICT AS 'varcharrecv';
    ALTER TYPE myvarchar SET (
        send = myvarcharsend,
        receive = myvarcharrecv
    ););

$node_publisher->safe_psql('postgres', $ddl);
$node_subscriber->safe_psql('postgres', $ddl);

# Now tablesync should succeed
$node_subscriber->wait_for_subscription_sync($node_publisher, 'tsub');

# Check the synced data on the subscriber
$result =
  $node_subscriber->safe_psql('postgres', 'SELECT a FROM test_myvarchar;');

is($result, 'a', 'check synced data on subscriber with custom type');

# -----------------------------------------------------
# Test mismatched column types with/without binary mode
# -----------------------------------------------------

# Test syncing tables with mismatching column types
$node_publisher->safe_psql(
	'postgres', qq(
    CREATE TABLE public.test_mismatching_types (
        a bigint PRIMARY KEY
    );
    INSERT INTO public.test_mismatching_types (a)
        VALUES (1), (2);
    ));

# Check the subscriber log from now on.
$offset = -s $node_subscriber->logfile;

$node_subscriber->safe_psql(
	'postgres', qq(
    CREATE TABLE public.test_mismatching_types (
        a int PRIMARY KEY
    );
    ALTER SUBSCRIPTION tsub REFRESH PUBLICATION;
    ));

# Cannot sync due to type mismatch
$node_subscriber->wait_for_log(
	qr/ERROR: ( [A-Z0-9]+:)? incorrect binary data format/, $offset);

# Check the publisher log from now on.
$offset = -s $node_publisher->logfile;

# Setting binary to false should allow syncing
$node_subscriber->safe_psql(
	'postgres', qq(
    ALTER SUBSCRIPTION tsub SET (binary = false);));

# Ensure the COPY command is executed in text format on the publisher
$node_publisher->wait_for_log(
	qr/LOG: ( [A-Z0-9]+:)? statement: COPY (.+)? TO STDOUT\n/, $offset);

$node_subscriber->wait_for_subscription_sync($node_publisher, 'tsub');

# Check the synced data on the subscriber
$result = $node_subscriber->safe_psql('postgres',
	'SELECT a FROM test_mismatching_types ORDER BY a;');

is( $result, '1
2', 'check synced data on subscriber with binary = false');

$node_subscriber->stop('fast');
$node_publisher->stop('fast');

done_testing();
