78ef0f6ada
The service now supports the `streaming` state. Since we dont check for lag or timeline in this service, a healthy node is : * leader : in a running state * standby_leader : running (pre Patroni 3.0.4), streaming otherwise * standby & sync_standby : running (pre Patroni 3.0.4), streaming otherwise Updated the tests for this service.
66 lines
2.3 KiB
Python
66 lines
2.3 KiB
Python
import json
|
|
import logging
|
|
import shutil
|
|
from contextlib import contextmanager
|
|
from functools import partial
|
|
from http.server import HTTPServer, SimpleHTTPRequestHandler
|
|
from pathlib import Path
|
|
from typing import Any, Iterator, Mapping, Union
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class PatroniAPI(HTTPServer):
|
|
def __init__(self, directory: Path, *, datadir: Path) -> None:
|
|
self.directory = directory
|
|
self.datadir = datadir
|
|
handler_cls = partial(SimpleHTTPRequestHandler, directory=str(directory))
|
|
super().__init__(("", 0), handler_cls)
|
|
|
|
def serve_forever(self, *args: Any) -> None:
|
|
logger.info(
|
|
"starting fake Patroni API at %s (directory=%s)",
|
|
self.endpoint,
|
|
self.directory,
|
|
)
|
|
return super().serve_forever(*args)
|
|
|
|
@property
|
|
def endpoint(self) -> str:
|
|
return f"http://{self.server_name}:{self.server_port}"
|
|
|
|
@contextmanager
|
|
def routes(self, mapping: Mapping[str, Union[Path, str]]) -> Iterator[None]:
|
|
"""Temporarily install specified files in served directory, thus
|
|
building "routes" from given mapping.
|
|
|
|
The 'mapping' defines target route paths as keys and files to be
|
|
installed in served directory as values. Mapping values of type 'str'
|
|
are assumed be relative file path to the 'datadir'.
|
|
"""
|
|
for route_path, fpath in mapping.items():
|
|
if isinstance(fpath, str):
|
|
fpath = self.datadir / fpath
|
|
shutil.copy(fpath, self.directory / route_path)
|
|
try:
|
|
yield None
|
|
finally:
|
|
for fname in mapping:
|
|
(self.directory / fname).unlink()
|
|
|
|
|
|
def cluster_api_set_replica_running(in_json: Path, target_dir: Path) -> Path:
|
|
# starting from 3.0.4 the state of replicas is streaming or in archive recovery
|
|
# instead of running
|
|
with in_json.open() as f:
|
|
js = json.load(f)
|
|
for node in js["members"]:
|
|
if node["role"] in ["replica", "sync_standby", "standby_leader"]:
|
|
if node["state"] in ["streaming", "in archive recovery"]:
|
|
node["state"] = "running"
|
|
assert target_dir.is_dir()
|
|
out_json = target_dir / in_json.name
|
|
with out_json.open("w") as f:
|
|
json.dump(js, f)
|
|
return out_json
|