Skip to content

Commit 1083b83

Browse files
authored
Track nodes current timeline on the monitor. (#730)
This allows comparing the LSN with the added notion of the current time line for the nodes, avoiding some advanced data loss hazard if a node makes progress while being in a network split with the application nodes, and when rejoining later we should detect if a failover happened on the other nodes on the system.
1 parent 57d5ac0 commit 1083b83

23 files changed

Lines changed: 329 additions & 105 deletions

src/bin/pg_autoctl/cli_do_monitor.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -572,6 +572,7 @@ cli_do_monitor_node_active(int argc, char **argv)
572572
keeper.state.current_group,
573573
keeper.state.current_role,
574574
keeper.postgres.pgIsRunning,
575+
keeper.postgres.postgresSetup.control.timeline_id,
575576
keeper.postgres.currentLSN,
576577
keeper.postgres.pgsrSyncState,
577578
&assignedState))

src/bin/pg_autoctl/cli_show.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -557,6 +557,9 @@ cli_show_local_state()
557557
strlcpy(nodeState.node.lsn, "0/0", PG_LSN_MAXLENGTH);
558558
}
559559

560+
nodeState.node.tli =
561+
keeper.postgres.postgresSetup.control.timeline_id;
562+
560563
strlcpy(nodeState.node.lsn,
561564
keeper.postgres.currentLSN,
562565
PG_LSN_MAXLENGTH);
@@ -569,6 +572,7 @@ cli_show_local_state()
569572
/* errors have already been logged, just continue */
570573
}
571574

575+
nodeState.node.tli = config.pgSetup.control.timeline_id;
572576
strlcpy(nodeState.node.lsn,
573577
config.pgSetup.control.latestCheckpointLSN,
574578
PG_LSN_MAXLENGTH);

src/bin/pg_autoctl/defaults.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
#define PG_AUTOCTL_STATE_VERSION 1
1515

1616
/* additional version information for printing version on CLI */
17-
#define PG_AUTOCTL_VERSION "1.6.0.1"
17+
#define PG_AUTOCTL_VERSION "1.6.0.2"
1818

1919
/* version of the extension that we requite to talk to on the monitor */
2020
#define PG_AUTOCTL_EXTENSION_VERSION "1.6"

src/bin/pg_autoctl/fsm.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,7 @@ keeper_fsm_step(Keeper *keeper)
428428
keeperState->current_group,
429429
keeperState->current_role,
430430
postgres->pgIsRunning,
431+
postgres->postgresSetup.control.timeline_id,
431432
postgres->currentLSN,
432433
postgres->pgsrSyncState,
433434
&assignedState))

src/bin/pg_autoctl/keeper.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1608,12 +1608,15 @@ keeper_register_again(Keeper *keeper)
16081608
return false;
16091609
}
16101610

1611+
int currentTLI = keeper->postgres.postgresSetup.control.timeline_id;
1612+
16111613
if (!monitor_node_active(monitor,
16121614
config->formation,
16131615
assignedState.nodeId,
16141616
assignedState.groupId,
16151617
assignedState.state,
16161618
ReportPgIsRunning(keeper),
1619+
currentTLI,
16171620
keeper->postgres.currentLSN,
16181621
keeper->postgres.pgsrSyncState,
16191622
&assignedState))

src/bin/pg_autoctl/keeper_pg_init.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -551,6 +551,7 @@ wait_until_primary_is_ready(Keeper *keeper,
551551
MonitorAssignedState *assignedState)
552552
{
553553
bool pgIsRunning = false;
554+
int currentTLI = 1;
554555
char currrentLSN[PG_LSN_MAXLENGTH] = "0/0";
555556
char *pgsrSyncState = "";
556557
int errors = 0, tries = 0;
@@ -593,6 +594,7 @@ wait_until_primary_is_ready(Keeper *keeper,
593594
keeper->state.current_group,
594595
keeper->state.current_role,
595596
pgIsRunning,
597+
currentTLI,
596598
currrentLSN,
597599
pgsrSyncState,
598600
assignedState))
@@ -1068,6 +1070,7 @@ keeper_pg_init_node_active(Keeper *keeper)
10681070
keeper->state.current_group,
10691071
keeper->state.current_role,
10701072
ReportPgIsRunning(keeper),
1073+
keeper->postgres.postgresSetup.control.timeline_id,
10711074
keeper->postgres.currentLSN,
10721075
keeper->postgres.pgsrSyncState,
10731076
&assignedState))

src/bin/pg_autoctl/monitor.c

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -898,19 +898,20 @@ bool
898898
monitor_node_active(Monitor *monitor,
899899
char *formation, int64_t nodeId,
900900
int groupId, NodeState currentState,
901-
bool pgIsRunning,
901+
bool pgIsRunning, int currentTLI,
902902
char *currentLSN, char *pgsrSyncState,
903903
MonitorAssignedState *assignedState)
904904
{
905905
PGSQL *pgsql = &monitor->pgsql;
906906
const char *sql =
907907
"SELECT * FROM pgautofailover.node_active($1, $2, $3, "
908-
"$4::pgautofailover.replication_state, $5, $6, $7)";
909-
int paramCount = 7;
910-
Oid paramTypes[7] = {
911-
TEXTOID, INT8OID, INT4OID, TEXTOID, BOOLOID, LSNOID, TEXTOID
908+
"$4::pgautofailover.replication_state, $5, $6, $7, $8)";
909+
int paramCount = 8;
910+
Oid paramTypes[8] = {
911+
TEXTOID, INT8OID, INT4OID, TEXTOID,
912+
BOOLOID, INT4OID, LSNOID, TEXTOID
912913
};
913-
const char *paramValues[7];
914+
const char *paramValues[8];
914915
MonitorAssignedStateParseContext parseContext =
915916
{ { 0 }, assignedState, false };
916917
const char *nodeStateString = NodeStateToString(currentState);
@@ -920,8 +921,9 @@ monitor_node_active(Monitor *monitor,
920921
paramValues[2] = intToString(groupId).strValue;
921922
paramValues[3] = nodeStateString;
922923
paramValues[4] = pgIsRunning ? "true" : "false";
923-
paramValues[5] = currentLSN;
924-
paramValues[6] = pgsrSyncState;
924+
paramValues[5] = intToString(currentTLI).strValue;
925+
paramValues[6] = currentLSN;
926+
paramValues[7] = pgsrSyncState;
925927

926928
if (!pgsql_execute_with_params(pgsql, sql,
927929
paramCount, paramTypes, paramValues,
@@ -1755,7 +1757,7 @@ parseCurrentNodeState(PGresult *result, int rowNumber,
17551757
int errors = 0;
17561758

17571759
/* we don't expect any of the column to be NULL */
1758-
for (colNumber = 0; colNumber < 12; colNumber++)
1760+
for (colNumber = 0; colNumber < 13; colNumber++)
17591761
{
17601762
if (PQgetisnull(result, rowNumber, 0))
17611763
{
@@ -1776,8 +1778,9 @@ parseCurrentNodeState(PGresult *result, int rowNumber,
17761778
* 7 - OUT assigned_group_state pgautofailover.replication_state,
17771779
* 8 - OUT candidate_priority int,
17781780
* 9 - OUT replication_quorum bool,
1779-
* 10 - OUT reported_lsn pg_lsn,
1780-
* 11 - OUT health integer
1781+
* 10 - OUT reported_tli int,
1782+
* 11 - OUT reported_lsn pg_lsn,
1783+
* 12 - OUT health integer
17811784
*
17821785
* We need the groupId to parse the formation kind into a nodeKind, so we
17831786
* begin at column 1 and get back to column 0 later, after column 4.
@@ -1884,11 +1887,18 @@ parseCurrentNodeState(PGresult *result, int rowNumber,
18841887
nodeState->replicationQuorum = (*value) == 't';
18851888
}
18861889

1887-
/* we trust Postgres pg_lsn data type to fit in our PG_LSN_MAXLENGTH */
18881890
value = PQgetvalue(result, rowNumber, 10);
1889-
strlcpy(nodeState->node.lsn, value, PG_LSN_MAXLENGTH);
1891+
if (!stringToInt(value, &(nodeState->node.tli)))
1892+
{
1893+
log_error("Invalid timeline \"%s\" returned by monitor", value);
1894+
++errors;
1895+
}
18901896

1897+
/* we trust Postgres pg_lsn data type to fit in our PG_LSN_MAXLENGTH */
18911898
value = PQgetvalue(result, rowNumber, 11);
1899+
strlcpy(nodeState->node.lsn, value, PG_LSN_MAXLENGTH);
1900+
1901+
value = PQgetvalue(result, rowNumber, 12);
18921902
if (!stringToInt(value, &(nodeState->health)))
18931903
{
18941904
log_error("Invalid node health \"%s\" returned by monitor", value);
@@ -1921,9 +1931,9 @@ parseCurrentNodeStateArray(CurrentNodeStateArray *nodesArray, PGresult *result)
19211931
}
19221932

19231933
/* pgautofailover.current_state returns 11 columns */
1924-
if (PQnfields(result) != 12)
1934+
if (PQnfields(result) != 13)
19251935
{
1926-
log_error("Query returned %d columns, expected 12", PQnfields(result));
1936+
log_error("Query returned %d columns, expected 13", PQnfields(result));
19271937
return false;
19281938
}
19291939

@@ -3751,6 +3761,7 @@ monitor_check_report_state(void *context, CurrentNodeState *nodeState)
37513761
char timestring[MAXCTIMESIZE] = { 0 };
37523762
char hostport[BUFSIZE] = { 0 };
37533763
char composedId[BUFSIZE] = { 0 };
3764+
char tliLSN[BUFSIZE] = { 0 };
37543765

37553766
/* filter notifications for our own formation */
37563767
if (strcmp(nodeState->formation, ctx->formation) != 0 ||
@@ -3769,7 +3780,8 @@ monitor_check_report_state(void *context, CurrentNodeState *nodeState)
37693780
&(nodeState->node),
37703781
ctx->groupId,
37713782
hostport,
3772-
composedId);
3783+
composedId,
3784+
tliLSN);
37733785

37743786
fformat(stdout, "%8s | %*s | %*s | %*s | %19s | %19s\n",
37753787
timestring + 11,
@@ -3899,6 +3911,7 @@ monitor_check_node_report_state(void *context, CurrentNodeState *nodeState)
38993911
char timestring[MAXCTIMESIZE] = { 0 };
39003912
char hostport[BUFSIZE] = { 0 };
39013913
char composedId[BUFSIZE] = { 0 };
3914+
char tliLSN[BUFSIZE] = { 0 };
39023915

39033916
/* filter notifications for our own formation */
39043917
if (strcmp(nodeState->formation, ctx->formation) != 0 ||
@@ -3917,7 +3930,8 @@ monitor_check_node_report_state(void *context, CurrentNodeState *nodeState)
39173930
&(nodeState->node),
39183931
ctx->groupId,
39193932
hostport,
3920-
composedId);
3933+
composedId,
3934+
tliLSN);
39213935

39223936
fformat(stdout, "%8s | %*s | %*s | %*s | %19s | %19s\n",
39233937
timestring + 11,

src/bin/pg_autoctl/monitor.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ bool monitor_register_node(Monitor *monitor,
107107
bool monitor_node_active(Monitor *monitor,
108108
char *formation, int64_t nodeId,
109109
int groupId, NodeState currentState,
110-
bool pgIsRunning,
110+
bool pgIsRunning, int currentTLI,
111111
char *currentLSN, char *pgsrSyncState,
112112
MonitorAssignedState *assignedState);
113113
bool monitor_get_node_replication_settings(Monitor *monitor,

src/bin/pg_autoctl/nodestate_utils.c

Lines changed: 33 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ nodestatePrepareHeaders(CurrentNodeStateArray *nodesArray,
2929
nodesArray->headers.maxNameSize = 4; /* "Name" */
3030
nodesArray->headers.maxHostSize = 10; /* "Host:Port" */
3131
nodesArray->headers.maxNodeSize = 5; /* "Node" */
32-
nodesArray->headers.maxLSNSize = 3; /* "LSN" */
32+
nodesArray->headers.maxLSNSize = 9; /* "TLI: LSN" */
3333
nodesArray->headers.maxStateSize = MAX_NODE_STATE_LEN;
3434
nodesArray->headers.maxHealthSize = strlen("read-write *");
3535

@@ -117,38 +117,21 @@ void
117117
nodestateAdjustHeaders(NodeAddressHeaders *headers,
118118
NodeAddress *node, int groupId)
119119
{
120-
int nameLen = strlen(node->name);
121-
122-
/* compute strlen of host:port */
123-
IntString portString = intToString(node->port);
124-
int hostLen =
125-
strlen(node->host) + strlen(portString.strValue) + 1;
126-
127-
/* compute strlen of groupId/nodeId, as in "0/1" */
128-
IntString nodeIdString = intToString(node->nodeId);
129-
int nodeLen = 0;
130-
131-
int lsnLen = strlen(node->lsn);
132-
133-
switch (headers->nodeKind)
134-
{
135-
case NODE_KIND_STANDALONE:
136-
{
137-
nodeLen = strlen(nodeIdString.strValue);
138-
break;
139-
}
140-
141-
default:
142-
{
143-
IntString groupIdString = intToString(groupId);
120+
char hostport[BUFSIZE] = { 0 };
121+
char composedId[BUFSIZE] = { 0 };
122+
char tliLSN[BUFSIZE] = { 0 };
144123

145-
nodeLen =
146-
strlen(groupIdString.strValue) +
147-
strlen(nodeIdString.strValue) + 1;
124+
(void) nodestatePrepareNode(headers,
125+
node,
126+
groupId,
127+
hostport,
128+
composedId,
129+
tliLSN);
148130

149-
break;
150-
}
151-
}
131+
int nameLen = strlen(node->name);
132+
int hostLen = strlen(hostport);
133+
int nodeLen = strlen(composedId);
134+
int lsnLen = strlen(tliLSN);
152135

153136
/*
154137
* In order to have a static nice table output even when using
@@ -178,8 +161,8 @@ nodestateAdjustHeaders(NodeAddressHeaders *headers,
178161

179162
if (headers->maxLSNSize == 0)
180163
{
181-
/* Unknown LSN is going to be "0/0" */
182-
headers->maxLSNSize = 3;
164+
/* Unknown LSN is going to be " 1: 0/0" */
165+
headers->maxLSNSize = 9;
183166
}
184167

185168
if (headers->maxHealthSize == 0)
@@ -224,7 +207,7 @@ nodestatePrintHeader(NodeAddressHeaders *headers)
224207
headers->maxNameSize, "Name",
225208
headers->maxNodeSize, "Node",
226209
headers->maxHostSize, "Host:Port",
227-
headers->maxLSNSize, "LSN",
210+
headers->maxLSNSize, "TLI: LSN",
228211
headers->maxHealthSize, "Connection",
229212
headers->maxStateSize, "Current State",
230213
headers->maxStateSize, "Assigned State");
@@ -250,14 +233,16 @@ nodestatePrintNodeState(NodeAddressHeaders *headers,
250233
{
251234
char hostport[BUFSIZE] = { 0 };
252235
char composedId[BUFSIZE] = { 0 };
236+
char tliLSN[BUFSIZE] = { 0 };
253237
char connection[BUFSIZE] = { 0 };
254238
char healthChar = nodestateHealthToChar(nodeState->health);
255239

256240
(void) nodestatePrepareNode(headers,
257241
&(nodeState->node),
258242
nodeState->groupId,
259243
hostport,
260-
composedId);
244+
composedId,
245+
tliLSN);
261246

262247
if (healthChar == ' ')
263248
{
@@ -273,7 +258,7 @@ nodestatePrintNodeState(NodeAddressHeaders *headers,
273258
headers->maxNameSize, nodeState->node.name,
274259
headers->maxNodeSize, composedId,
275260
headers->maxHostSize, hostport,
276-
headers->maxLSNSize, nodeState->node.lsn,
261+
headers->maxLSNSize, tliLSN,
277262
headers->maxHealthSize, connection,
278263
headers->maxStateSize, NodeStateToString(nodeState->reportedState),
279264
headers->maxStateSize, NodeStateToString(nodeState->goalState));
@@ -287,9 +272,11 @@ nodestatePrintNodeState(NodeAddressHeaders *headers,
287272
*/
288273
void
289274
nodestatePrepareNode(NodeAddressHeaders *headers, NodeAddress *node,
290-
int groupId, char *hostport, char *composedId)
275+
int groupId, char *hostport,
276+
char *composedId, char *tliLSN)
291277
{
292278
sformat(hostport, BUFSIZE, "%s:%d", node->host, node->port);
279+
sformat(tliLSN, BUFSIZE, "%3d: %s", node->tli, node->lsn);
293280

294281
switch (headers->nodeKind)
295282
{
@@ -353,6 +340,8 @@ nodestateAsJSON(CurrentNodeState *nodeState, JSON_Value *js)
353340
json_object_set_string(jsobj, "assigned_group_state",
354341
NodeStateToString(nodeState->goalState));
355342

343+
json_object_set_number(jsobj, "timeline", (double) nodeState->node.tli);
344+
356345
json_object_set_string(jsobj, "Minimum Recovery Ending LSN",
357346
nodeState->node.lsn);
358347

@@ -551,14 +540,14 @@ printNodeArray(NodeAddressArray *nodesArray)
551540
void
552541
printNodeHeader(NodeAddressHeaders *headers)
553542
{
554-
fformat(stdout, "%*s | %*s | %*s | %18s | %8s\n",
543+
fformat(stdout, "%*s | %*s | %*s | %21s | %8s\n",
555544
headers->maxNameSize, "Name",
556545
headers->maxNodeSize, "Node",
557546
headers->maxHostSize, "Host:Port",
558-
"LSN",
547+
"TLI: LSN",
559548
"Primary?");
560549

561-
fformat(stdout, "%*s-+-%*s-+-%*s-+-%18s-+-%8s\n",
550+
fformat(stdout, "%*s-+-%*s-+-%*s-+-%21s-+-%8s\n",
562551
headers->maxNameSize, headers->nameSeparatorHeader,
563552
headers->maxNodeSize, headers->nodeSeparatorHeader,
564553
headers->maxHostSize, headers->hostSeparatorHeader,
@@ -574,13 +563,14 @@ printNodeEntry(NodeAddressHeaders *headers, NodeAddress *node)
574563
{
575564
char hostport[BUFSIZE] = { 0 };
576565
char composedId[BUFSIZE] = { 0 };
566+
char tliLSN[BUFSIZE] = { 0 };
577567

578-
(void) nodestatePrepareNode(headers, node, 0, hostport, composedId);
568+
(void) nodestatePrepareNode(headers, node, 0, hostport, composedId, tliLSN);
579569

580-
fformat(stdout, "%*s | %*s | %*s | %18s | %8s\n",
570+
fformat(stdout, "%*s | %*s | %*s | %21s | %8s\n",
581571
headers->maxNameSize, node->name,
582572
headers->maxNodeSize, composedId,
583573
headers->maxHostSize, hostport,
584-
node->lsn,
574+
tliLSN,
585575
node->isPrimary ? "yes" : "no");
586576
}

src/bin/pg_autoctl/nodestate_utils.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,9 @@ void nodestatePrintNodeState(NodeAddressHeaders *headers,
8383
CurrentNodeState *nodeState);
8484

8585
void nodestatePrepareNode(NodeAddressHeaders *headers, NodeAddress *node,
86-
int groupId, char *hostport, char *composedId);
86+
int groupId, char *hostport,
87+
char *composedId, char *tliLSN);
88+
8789
void prepareHostNameSeparator(char nameSeparatorHeader[], int size);
8890

8991
bool nodestateAsJSON(CurrentNodeState *nodeState, JSON_Value *js);

0 commit comments

Comments
 (0)