check-patroni/tests/__init__.py
benoit 8d6b8502b6 cluster_has_replica: fix the way a healthy replica is detected
For patroni >= version 3.0.4:
* the role is `replica` or `sync_standby`
* the state is `streaming` or `in archive recovery`
* the timeline is the same as the leader
* the lag is lower or equal to `max_lag`

For prio versions of patroni:
* the role is `replica` or `sync_standby`
* the state is `running`
* the timeline is the same as the leader
* the lag is lower or equal to `max_lag`

Additionnally, we now display the timeline in the perfstats. We also try
to display the perf stats of unhealthy replica as much as possible.

Update tests for cluster_has_replica:
* Fix the tests to make them work with the new algotithm
* Add a specific test for tl divergences
2023-11-11 10:50:35 +01:00

66 lines
2.2 KiB
Python

import json
import logging
import shutil
from contextlib import contextmanager
from functools import partial
from http.server import HTTPServer, SimpleHTTPRequestHandler
from pathlib import Path
from typing import Any, Iterator, Mapping, Union
logger = logging.getLogger(__name__)
class PatroniAPI(HTTPServer):
def __init__(self, directory: Path, *, datadir: Path) -> None:
self.directory = directory
self.datadir = datadir
handler_cls = partial(SimpleHTTPRequestHandler, directory=str(directory))
super().__init__(("", 0), handler_cls)
def serve_forever(self, *args: Any) -> None:
logger.info(
"starting fake Patroni API at %s (directory=%s)",
self.endpoint,
self.directory,
)
return super().serve_forever(*args)
@property
def endpoint(self) -> str:
return f"http://{self.server_name}:{self.server_port}"
@contextmanager
def routes(self, mapping: Mapping[str, Union[Path, str]]) -> Iterator[None]:
"""Temporarily install specified files in served directory, thus
building "routes" from given mapping.
The 'mapping' defines target route paths as keys and files to be
installed in served directory as values. Mapping values of type 'str'
are assumed be relative file path to the 'datadir'.
"""
for route_path, fpath in mapping.items():
if isinstance(fpath, str):
fpath = self.datadir / fpath
shutil.copy(fpath, self.directory / route_path)
try:
yield None
finally:
for fname in mapping:
(self.directory / fname).unlink()
def cluster_api_set_replica_running(in_json: Path, target_dir: Path) -> Path:
# starting from 3.0.4 the state of replicas is streaming or in archive recovery
# instead of running
with in_json.open() as f:
js = json.load(f)
for node in js["members"]:
if node["role"] in ["replica", "sync_standby"]:
if node["state"] in ["streaming", "in archive recovery"]:
node["state"] = "running"
assert target_dir.is_dir()
out_json = target_dir / in_json.name
with out_json.open("w") as f:
json.dump(js, f)
return out_json