287 lines
15 KiB
Diff
287 lines
15 KiB
Diff
|
From 3a509cab07da84e980b3e3f78db5f09f3798031b Mon Sep 17 00:00:00 2001
|
||
|
From: Alexander Kukushkin <alexander.kukushkin@zalando.de>
|
||
|
Date: Fri, 25 Jan 2019 10:41:22 +0100
|
||
|
Subject: [PATCH] Make it possible to automatically reinit the former master
|
||
|
|
||
|
If the pg_rewind is disabled or can't be used, the former master could
|
||
|
fail to start as a new replica due to diverged timelines. In this case
|
||
|
the only way to fix it is wiping the data directory and reinitilizing.
|
||
|
|
||
|
So far Patroni was able to remove the data directory only after failed
|
||
|
attempt to run pg_rewind. This commit fixes it.
|
||
|
If the `postgresql.remove_data_directory_on_diverged_timelines` is set,
|
||
|
Patroni will wipe the data directory and reinitialize the former master
|
||
|
automatically.
|
||
|
|
||
|
Fixes: https://github.com/zalando/patroni/issues/941
|
||
|
---
|
||
|
patroni/ha.py | 30 +++++++++++++++++++-----------
|
||
|
patroni/postgresql.py | 27 +++++++++++++++++++--------
|
||
|
tests/test_ha.py | 18 ++++++++++++++----
|
||
|
tests/test_postgresql.py | 18 ++++++++++--------
|
||
|
4 files changed, 62 insertions(+), 31 deletions(-)
|
||
|
|
||
|
diff --git a/patroni/ha.py b/patroni/ha.py
|
||
|
index b7ac3494..3989c71b 100644
|
||
|
--- a/patroni/ha.py
|
||
|
+++ b/patroni/ha.py
|
||
|
@@ -265,13 +265,22 @@ def bootstrap_standby_leader(self):
|
||
|
|
||
|
return result
|
||
|
|
||
|
- def _handle_rewind(self):
|
||
|
+ def _handle_rewind_or_reinitialize(self):
|
||
|
leader = self.get_remote_master() if self.is_standby_cluster() else self.cluster.leader
|
||
|
- if self.state_handler.rewind_needed_and_possible(leader):
|
||
|
+ if not self.state_handler.rewind_or_reinitialize_needed_and_possible(leader):
|
||
|
+ return None
|
||
|
+
|
||
|
+ if self.state_handler.can_rewind:
|
||
|
self._async_executor.schedule('running pg_rewind from ' + leader.name)
|
||
|
self._async_executor.run_async(self.state_handler.rewind, (leader,))
|
||
|
return True
|
||
|
|
||
|
+ # remove_data_directory_on_diverged_timelines is set
|
||
|
+ if not self.is_standby_cluster():
|
||
|
+ self._async_executor.schedule('reinitializing due to diverged timelines')
|
||
|
+ self._async_executor.run_async(self._do_reinitialize, args=(self.cluster, ))
|
||
|
+ return True
|
||
|
+
|
||
|
def recover(self):
|
||
|
# Postgres is not running and we will restart in standby mode. Watchdog is not needed until we promote.
|
||
|
self.watchdog.disable()
|
||
|
@@ -304,7 +313,7 @@ def recover(self):
|
||
|
if self.is_standby_cluster() or not self.has_lock():
|
||
|
if not self.state_handler.rewind_executed:
|
||
|
self.state_handler.trigger_check_diverged_lsn()
|
||
|
- if self._handle_rewind():
|
||
|
+ if self._handle_rewind_or_reinitialize():
|
||
|
return self._async_executor.scheduled_action
|
||
|
|
||
|
if self.has_lock(): # in standby cluster
|
||
|
@@ -338,9 +347,7 @@ def _get_node_to_follow(self, cluster):
|
||
|
else:
|
||
|
node_to_follow = cluster.leader
|
||
|
|
||
|
- return (node_to_follow if
|
||
|
- node_to_follow and
|
||
|
- node_to_follow.name != self.state_handler.name else None)
|
||
|
+ return node_to_follow if node_to_follow and node_to_follow.name != self.state_handler.name else None
|
||
|
|
||
|
def follow(self, demote_reason, follow_reason, refresh=True):
|
||
|
if refresh:
|
||
|
@@ -351,7 +358,8 @@ def follow(self, demote_reason, follow_reason, refresh=True):
|
||
|
node_to_follow = self._get_node_to_follow(self.cluster)
|
||
|
|
||
|
if self.is_paused():
|
||
|
- if not (self.state_handler.need_rewind and self.state_handler.can_rewind) or self.cluster.is_unlocked():
|
||
|
+ if not (self.state_handler.need_rewind and self.state_handler.can_rewind_or_reinitialize_allowed)\
|
||
|
+ or self.cluster.is_unlocked():
|
||
|
self.state_handler.set_role('master' if is_leader else 'replica')
|
||
|
if is_leader:
|
||
|
return 'continue to run as master without lock'
|
||
|
@@ -361,7 +369,7 @@ def follow(self, demote_reason, follow_reason, refresh=True):
|
||
|
self.demote('immediate-nolock')
|
||
|
return demote_reason
|
||
|
|
||
|
- if self._handle_rewind():
|
||
|
+ if self._handle_rewind_or_reinitialize():
|
||
|
return self._async_executor.scheduled_action
|
||
|
|
||
|
if not self.state_handler.check_recovery_conf(node_to_follow):
|
||
|
@@ -733,7 +741,7 @@ def demote(self, mode):
|
||
|
else:
|
||
|
if self.is_synchronous_mode():
|
||
|
self.state_handler.set_synchronous_standby(None)
|
||
|
- if self.state_handler.rewind_needed_and_possible(leader):
|
||
|
+ if self.state_handler.rewind_or_reinitialize_needed_and_possible(leader):
|
||
|
return False # do not start postgres, but run pg_rewind on the next iteration
|
||
|
self.state_handler.follow(node_to_follow)
|
||
|
|
||
|
@@ -1253,8 +1261,8 @@ def _run_cycle(self):
|
||
|
# the demote code follows through to starting Postgres right away, however, in the rewind case
|
||
|
# it returns from demote and reaches this point to start PostgreSQL again after rewind. In that
|
||
|
# case it makes no sense to continue to recover() unless rewind has finished successfully.
|
||
|
- elif (self.state_handler.rewind_failed or
|
||
|
- not (self.state_handler.need_rewind and self.state_handler.can_rewind)):
|
||
|
+ elif self.state_handler.rewind_failed or not self.state_handler.need_rewind \
|
||
|
+ or not self.state_handler.can_rewind_or_reinitialize_allowed:
|
||
|
return 'postgres is not running'
|
||
|
|
||
|
# try to start dead postgres
|
||
|
diff --git a/patroni/postgresql.py b/patroni/postgresql.py
|
||
|
index 546b9982..bd057f91 100644
|
||
|
--- a/patroni/postgresql.py
|
||
|
+++ b/patroni/postgresql.py
|
||
|
@@ -16,7 +16,7 @@
|
||
|
from patroni.exceptions import PostgresConnectionException, PostgresException
|
||
|
from patroni.utils import compare_values, parse_bool, parse_int, Retry, RetryFailedError, polling_loop, split_host_port
|
||
|
from patroni.postmaster import PostmasterProcess
|
||
|
-from patroni.dcs import slot_name_from_member_name, RemoteMember
|
||
|
+from patroni.dcs import slot_name_from_member_name, RemoteMember, Leader
|
||
|
from requests.structures import CaseInsensitiveDict
|
||
|
from six import string_types
|
||
|
from six.moves.urllib.parse import quote_plus
|
||
|
@@ -404,6 +404,10 @@ def can_rewind(self):
|
||
|
return False
|
||
|
return self.configuration_allows_rewind(self.controldata())
|
||
|
|
||
|
+ @property
|
||
|
+ def can_rewind_or_reinitialize_allowed(self):
|
||
|
+ return self.config.get('remove_data_directory_on_diverged_timelines') or self.can_rewind
|
||
|
+
|
||
|
@property
|
||
|
def sysid(self):
|
||
|
if not self._sysid and not self.bootstrapping:
|
||
|
@@ -1323,7 +1327,11 @@ def _check_timeline_and_lsn(self, leader):
|
||
|
if local_timeline is None or local_lsn is None:
|
||
|
return
|
||
|
|
||
|
- if not self.check_leader_is_not_in_recovery(**leader.conn_kwargs(self._superuser)):
|
||
|
+ if isinstance(leader, Leader):
|
||
|
+ if leader.member.data.get('role') != 'master':
|
||
|
+ return
|
||
|
+ # standby cluster
|
||
|
+ elif not self.check_leader_is_not_in_recovery(**leader.conn_kwargs(self._superuser)):
|
||
|
return
|
||
|
|
||
|
history = need_rewind = None
|
||
|
@@ -1403,19 +1411,21 @@ def rewind(self, leader):
|
||
|
else:
|
||
|
logger.error('Failed to rewind from healty master: %s', leader.name)
|
||
|
|
||
|
- if self.config.get('remove_data_directory_on_rewind_failure', False):
|
||
|
- logger.warning('remove_data_directory_on_rewind_failure is set. removing...')
|
||
|
- self.remove_data_directory()
|
||
|
- self._rewind_state = REWIND_STATUS.INITIAL
|
||
|
+ for name in ('remove_data_directory_on_rewind_failure', 'remove_data_directory_on_diverged_timelines'):
|
||
|
+ if self.config.get(name):
|
||
|
+ logger.warning('%s is set. removing...', name)
|
||
|
+ self.remove_data_directory()
|
||
|
+ self._rewind_state = REWIND_STATUS.INITIAL
|
||
|
+ break
|
||
|
else:
|
||
|
self._rewind_state = REWIND_STATUS.FAILED
|
||
|
return False
|
||
|
|
||
|
def trigger_check_diverged_lsn(self):
|
||
|
- if self.can_rewind and self._rewind_state != REWIND_STATUS.NEED:
|
||
|
+ if self.can_rewind_or_reinitialize_allowed and self._rewind_state != REWIND_STATUS.NEED:
|
||
|
self._rewind_state = REWIND_STATUS.CHECK
|
||
|
|
||
|
- def rewind_needed_and_possible(self, leader):
|
||
|
+ def rewind_or_reinitialize_needed_and_possible(self, leader):
|
||
|
if leader and leader.name != self.name and leader.conn_url and self._rewind_state == REWIND_STATUS.CHECK:
|
||
|
self._check_timeline_and_lsn(leader)
|
||
|
return leader and leader.conn_url and self._rewind_state == REWIND_STATUS.NEED
|
||
|
@@ -1652,6 +1662,7 @@ def clone(self, clone_member):
|
||
|
base backup)
|
||
|
"""
|
||
|
|
||
|
+ self._rewind_state = REWIND_STATUS.INITIAL
|
||
|
ret = self.create_replica(clone_member) == 0
|
||
|
if ret:
|
||
|
self._post_restore()
|
||
|
diff --git a/tests/test_ha.py b/tests/test_ha.py
|
||
|
index faf49034..758c6b17 100644
|
||
|
--- a/tests/test_ha.py
|
||
|
+++ b/tests/test_ha.py
|
||
|
@@ -241,12 +241,20 @@ def test_crash_recovery(self):
|
||
|
self.p.controldata = lambda: {'Database cluster state': 'in production', 'Database system identifier': SYSID}
|
||
|
self.assertEqual(self.ha.run_cycle(), 'doing crash recovery in a single user mode')
|
||
|
|
||
|
- @patch.object(Postgresql, 'rewind_needed_and_possible', Mock(return_value=True))
|
||
|
+ @patch.object(Postgresql, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=True))
|
||
|
+ @patch.object(Postgresql, 'can_rewind', PropertyMock(return_value=True))
|
||
|
def test_recover_with_rewind(self):
|
||
|
self.p.is_running = false
|
||
|
self.ha.cluster = get_cluster_initialized_with_leader()
|
||
|
self.assertEqual(self.ha.run_cycle(), 'running pg_rewind from leader')
|
||
|
|
||
|
+ @patch.object(Postgresql, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=True))
|
||
|
+ @patch.object(Postgresql, 'create_replica', Mock(return_value=1))
|
||
|
+ def test_recover_with_reinitialize(self):
|
||
|
+ self.p.is_running = false
|
||
|
+ self.ha.cluster = get_cluster_initialized_with_leader()
|
||
|
+ self.assertEqual(self.ha.run_cycle(), 'reinitializing due to diverged timelines')
|
||
|
+
|
||
|
@patch('sys.exit', return_value=1)
|
||
|
@patch('patroni.ha.Ha.sysid_valid', MagicMock(return_value=True))
|
||
|
def test_sysid_no_match(self, exit_mock):
|
||
|
@@ -345,7 +353,8 @@ def test_follow_in_pause(self):
|
||
|
self.p.is_leader = false
|
||
|
self.assertEqual(self.ha.run_cycle(), 'PAUSE: no action')
|
||
|
|
||
|
- @patch.object(Postgresql, 'rewind_needed_and_possible', Mock(return_value=True))
|
||
|
+ @patch.object(Postgresql, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=True))
|
||
|
+ @patch.object(Postgresql, 'can_rewind', PropertyMock(return_value=True))
|
||
|
def test_follow_triggers_rewind(self):
|
||
|
self.p.is_leader = false
|
||
|
self.p.trigger_check_diverged_lsn()
|
||
|
@@ -459,7 +468,7 @@ def test_manual_failover_from_leader(self):
|
||
|
f = Failover(0, self.p.name, '', None)
|
||
|
self.ha.cluster = get_cluster_initialized_with_leader(f)
|
||
|
self.assertEqual(self.ha.run_cycle(), 'manual failover: demoting myself')
|
||
|
- self.p.rewind_needed_and_possible = true
|
||
|
+ self.p.rewind_or_reinitialize_needed_and_possible = true
|
||
|
self.assertEqual(self.ha.run_cycle(), 'manual failover: demoting myself')
|
||
|
self.ha.fetch_node_status = get_node_status(nofailover=True)
|
||
|
self.assertEqual(self.ha.run_cycle(), 'no action. i am the leader with the lock')
|
||
|
@@ -681,7 +690,8 @@ def test_process_unhealthy_standby_cluster_as_standby_leader(self):
|
||
|
msg = 'promoted self to a standby leader because i had the session lock'
|
||
|
self.assertEqual(self.ha.run_cycle(), msg)
|
||
|
|
||
|
- @patch.object(Postgresql, 'rewind_needed_and_possible', Mock(return_value=True))
|
||
|
+ @patch.object(Postgresql, 'rewind_or_reinitialize_needed_and_possible', Mock(return_value=True))
|
||
|
+ @patch.object(Postgresql, 'can_rewind', PropertyMock(return_value=True))
|
||
|
def test_process_unhealthy_standby_cluster_as_cascade_replica(self):
|
||
|
self.p.is_leader = false
|
||
|
self.p.name = 'replica'
|
||
|
diff --git a/tests/test_postgresql.py b/tests/test_postgresql.py
|
||
|
index 9b7bf754..339f962e 100644
|
||
|
--- a/tests/test_postgresql.py
|
||
|
+++ b/tests/test_postgresql.py
|
||
|
@@ -345,10 +345,10 @@ def test__get_local_timeline_lsn(self):
|
||
|
Mock(return_value={'Database cluster state': 'shut down in recovery',
|
||
|
'Minimum recovery ending location': '0/0',
|
||
|
"Min recovery ending loc's timeline": '0'})):
|
||
|
- self.p.rewind_needed_and_possible(self.leader)
|
||
|
+ self.p.rewind_or_reinitialize_needed_and_possible(self.leader)
|
||
|
with patch.object(Postgresql, 'is_running', Mock(return_value=True)):
|
||
|
with patch.object(MockCursor, 'fetchone', Mock(side_effect=[(False, ), Exception])):
|
||
|
- self.p.rewind_needed_and_possible(self.leader)
|
||
|
+ self.p.rewind_or_reinitialize_needed_and_possible(self.leader)
|
||
|
|
||
|
@patch.object(Postgresql, 'start', Mock())
|
||
|
@patch.object(Postgresql, 'can_rewind', PropertyMock(return_value=True))
|
||
|
@@ -357,21 +357,23 @@ def test__get_local_timeline_lsn(self):
|
||
|
def test__check_timeline_and_lsn(self, mock_check_leader_is_not_in_recovery):
|
||
|
mock_check_leader_is_not_in_recovery.return_value = False
|
||
|
self.p.trigger_check_diverged_lsn()
|
||
|
- self.assertFalse(self.p.rewind_needed_and_possible(self.leader))
|
||
|
+ self.assertFalse(self.p.rewind_or_reinitialize_needed_and_possible(self.leader))
|
||
|
+ self.leader = self.leader.member
|
||
|
+ self.assertFalse(self.p.rewind_or_reinitialize_needed_and_possible(self.leader))
|
||
|
mock_check_leader_is_not_in_recovery.return_value = True
|
||
|
- self.assertFalse(self.p.rewind_needed_and_possible(self.leader))
|
||
|
+ self.assertFalse(self.p.rewind_or_reinitialize_needed_and_possible(self.leader))
|
||
|
self.p.trigger_check_diverged_lsn()
|
||
|
with patch('psycopg2.connect', Mock(side_effect=Exception)):
|
||
|
- self.assertFalse(self.p.rewind_needed_and_possible(self.leader))
|
||
|
+ self.assertFalse(self.p.rewind_or_reinitialize_needed_and_possible(self.leader))
|
||
|
self.p.trigger_check_diverged_lsn()
|
||
|
with patch.object(MockCursor, 'fetchone', Mock(side_effect=[('', 2, '0/0'), ('', b'3\t0/40159C0\tn\n')])):
|
||
|
- self.assertFalse(self.p.rewind_needed_and_possible(self.leader))
|
||
|
+ self.assertFalse(self.p.rewind_or_reinitialize_needed_and_possible(self.leader))
|
||
|
self.p.trigger_check_diverged_lsn()
|
||
|
with patch.object(MockCursor, 'fetchone', Mock(return_value=('', 1, '0/0'))):
|
||
|
with patch.object(Postgresql, '_get_local_timeline_lsn', Mock(return_value=(1, '0/0'))):
|
||
|
- self.assertFalse(self.p.rewind_needed_and_possible(self.leader))
|
||
|
+ self.assertFalse(self.p.rewind_or_reinitialize_needed_and_possible(self.leader))
|
||
|
self.p.trigger_check_diverged_lsn()
|
||
|
- self.assertTrue(self.p.rewind_needed_and_possible(self.leader))
|
||
|
+ self.assertTrue(self.p.rewind_or_reinitialize_needed_and_possible(self.leader))
|
||
|
|
||
|
@patch.object(MockCursor, 'fetchone', Mock(side_effect=[(True,), Exception]))
|
||
|
def test_check_leader_is_not_in_recovery(self):
|