Skip to content

Commit 1852d43

Browse files
authored
Improve our Citus support for modern Citus compatibility. (#939)
* Improve our Citus support for modern Citus compatibility. 1. Allow rebalance operations with wal_level = 'logical' 2. Automatically register the coordinator node to itself * Skip coordinator registration when --skip-pg-hba is used. There is a chicken-and-egg problem that prevents connecting to the coordinator this early in the setup of the database. Also, when manual HBA editing is wanted, it doesn't seem too far off to ask to also take care of manually running `pg_autoctl activate` on the primary coordinator.
1 parent d580c0d commit 1852d43

File tree

6 files changed

+100
-0
lines changed

6 files changed

+100
-0
lines changed

src/bin/pg_autoctl/fsm.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,10 @@ KeeperFSMTransition KeeperFSM[] = {
201201
/*
202202
* Started as a single, no nothing
203203
*/
204+
{ INIT_STATE, SINGLE_STATE, NODE_KIND_CITUS_COORDINATOR,
205+
COMMENT_INIT_TO_SINGLE,
206+
&fsm_citus_coordinator_init_primary },
207+
204208
{ INIT_STATE, SINGLE_STATE, NODE_KIND_CITUS_WORKER,
205209
COMMENT_INIT_TO_SINGLE,
206210
&fsm_citus_worker_init_primary },

src/bin/pg_autoctl/fsm.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ bool fsm_init_from_standby(Keeper *keeper);
7777
bool fsm_drop_node(Keeper *keeper);
7878

7979
/* src/bin/pg_autoctl/fsm_transition_citus.c */
80+
bool fsm_citus_coordinator_init_primary(Keeper *keeper);
8081
bool fsm_citus_worker_init_primary(Keeper *keeper);
8182
bool fsm_citus_worker_resume_as_primary(Keeper *keeper);
8283
bool fsm_citus_coordinator_promote_standby_to_single(Keeper *keeper);

src/bin/pg_autoctl/fsm_transition_citus.c

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,91 @@
4141
static bool ensure_hostname_is_current_on_coordinator(Keeper *keeper);
4242

4343

44+
/*
45+
* fsm_citus_coordinator_init_primary initializes a primary coordinator node in
46+
* a Citus formation. After doing the usual initialization steps as per the
47+
* non-citus version of the FSM, the coordinator node registers itself to the
48+
* Citus nodes metadata.
49+
*/
50+
bool
51+
fsm_citus_coordinator_init_primary(Keeper *keeper)
52+
{
53+
KeeperConfig *config = &(keeper->config);
54+
55+
CoordinatorNodeAddress coordinatorNodeAddress = { 0 };
56+
Coordinator coordinator = { 0 };
57+
int nodeid = -1;
58+
59+
if (!fsm_init_primary(keeper))
60+
{
61+
/* errors have already been logged */
62+
return false;
63+
}
64+
65+
/*
66+
* Only Citus workers have more work to do, coordinator are ok. To add
67+
* coordinator to the metadata, users can call "activate" subcommand
68+
* for the coordinator.
69+
*/
70+
if (keeper->postgres.pgKind != NODE_KIND_CITUS_COORDINATOR)
71+
{
72+
log_error("BUG: fsm_citus_coordinator_init_primary called for "
73+
"node kind %s",
74+
nodeKindToString(keeper->postgres.pgKind));
75+
return false;
76+
}
77+
78+
PostgresSetup *pgSetup = &(config->pgSetup);
79+
80+
if (pgSetup->hbaLevel <= HBA_EDIT_SKIP)
81+
{
82+
log_info(
83+
"Skipping coordinator registration to itself when --skip-pg-hba "
84+
"is used, because we can't connect at pg_autoctl create node time");
85+
return true;
86+
}
87+
88+
/*
89+
* We now have a coordinator to talk to: add ourselves as inactive.
90+
*/
91+
coordinatorNodeAddress.node.port = keeper->config.pgSetup.pgport;
92+
93+
strlcpy(coordinatorNodeAddress.node.name,
94+
keeper->config.name,
95+
sizeof(coordinatorNodeAddress.node.name));
96+
97+
strlcpy(coordinatorNodeAddress.node.host,
98+
keeper->config.hostname,
99+
sizeof(coordinatorNodeAddress.node.host));
100+
101+
if (!coordinator_init(&coordinator, &(coordinatorNodeAddress.node), keeper))
102+
{
103+
log_fatal("Failed to contact the coordinator because its URL is invalid, "
104+
"see above for details");
105+
return false;
106+
}
107+
108+
if (!coordinator_add_node(&coordinator, keeper, &nodeid))
109+
{
110+
/*
111+
* master_add_inactive_node() is idempotent: if the node already has
112+
* been added, nothing changes, in particular if the node is active
113+
* already then the function happily let the node active.
114+
*/
115+
log_fatal("Failed to add current node to the Citus coordinator, "
116+
"see above for details");
117+
return false;
118+
}
119+
120+
log_info("Added coordinator node %s:%d in formation \"%s\" to itself",
121+
keeper->config.hostname,
122+
keeper->config.pgSetup.pgport,
123+
config->formation);
124+
125+
return true;
126+
}
127+
128+
44129
/*
45130
* fsm_citus_init_primary initializes a primary worker node in a Citus
46131
* formation. After doing the usual initialization steps as per the non-citus

src/bin/pg_autoctl/primary_standby.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ GUC postgres_default_settings_13[] = {
8686
GUC citus_default_settings_pre_13[] = {
8787
DEFAULT_GUC_SETTINGS_FOR_PG_AUTO_FAILOVER_PRE_13,
8888
{ "shared_preload_libraries", "'citus,pg_stat_statements'" },
89+
{ "wal_level", "logical" },
8990
{ "citus.node_conninfo", "'sslmode=prefer'" },
9091
{ "citus.cluster_name", "'default'" },
9192
{ "citus.use_secondary_nodes", "'never'" },
@@ -96,6 +97,7 @@ GUC citus_default_settings_pre_13[] = {
9697
GUC citus_default_settings_13[] = {
9798
DEFAULT_GUC_SETTINGS_FOR_PG_AUTO_FAILOVER_13,
9899
{ "shared_preload_libraries", "'citus,pg_stat_statements'" },
100+
{ "wal_level", "logical" },
99101
{ "citus.node_conninfo", "'sslmode=prefer'" },
100102
{ "citus.cluster_name", "'default'" },
101103
{ "citus.use_secondary_nodes", "'never'" },

tests/test_basic_citus_operation.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import os.path
55
import time
66
import subprocess
7+
import pprint
78

89
cluster = None
910
monitor = None
@@ -65,6 +66,12 @@ def test_001_init_coordinator():
6566
assert coordinator1a.wait_until_state(target_state="primary")
6667
assert coordinator1b.wait_until_state(target_state="secondary")
6768

69+
sql = "select nodename, nodeport from pg_dist_node"
70+
nodes = coordinator1a.run_sql_query(sql)
71+
72+
assert nodes[0][0] == str(coordinator1a.vnode.address)
73+
assert nodes[0][1] == coordinator1a.port
74+
6875

6976
def test_002_init_workers():
7077
global worker1a

tests/test_citus_skip_pg_hba.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ def test_001a_init_coordinator():
5252
# run_sql_query() will need
5353
# host "172.27.1.1", user "docker", database "postgres"
5454
hba.write("host postgres docker %s trust\n" % cluster.networkSubnet)
55+
hba.write("host citus docker %s trust\n" % cluster.networkSubnet)
5556
hba.write("host all all %s trust\n" % cluster.networkSubnet)
5657
hba.write("host replication all %s trust\n" % cluster.networkSubnet)
5758

0 commit comments

Comments
 (0)