Compare commits

...

7 commits

Author SHA1 Message Date
Mathieu Trossevin 099a22fb98
Merge branch 'main' into notify_barrier 2024-01-11 14:44:10 +01:00
Mathieu Trossevin eb5dc84ca8 Merge branch 'main' into notify_barrier 2024-01-06 15:04:15 +01:00
Mathieu Trossevin 31d681ba2a Proper case for TimedOut variant 2024-01-05 22:51:28 +01:00
Mathieu Trossevin 6dae26212d notify: Even better documentation 2024-01-05 22:49:21 +01:00
Mathieu Trossevin cfac28272b Merge branch 'main' into notify_barrier 2024-01-05 22:41:13 +01:00
Mathieu Trossevin 035f354b04 notify: improve documentation 2024-01-05 22:28:57 +01:00
Mathieu Trossevin d60d906483 notify: Add an equivalent to sd_notify_barrier
This also add an RAII guard for that can be used for the same purpose.
(Mostly for use in a closure.)
2024-01-05 21:27:09 +01:00
4 changed files with 260 additions and 12 deletions

View file

@ -13,7 +13,7 @@ rust-version = "1.74.0"
[features]
default = []
listenfd = ["dep:rustix", "rustix/fs", "dep:libc"]
notify = ["dep:libc", "dep:rustix"]
notify = ["dep:libc", "dep:rustix", "rustix/pipe", "rustix/event"]
[dependencies]
libc = { version = "0.2.150", optional = true }

View file

@ -122,6 +122,60 @@ impl Display for OtherStateError {
impl Error for OtherStateError {}
#[derive(Debug)]
pub enum BarrierError {
Notify(NotifyError),
FailedPipeCreation(Errno),
FailedPolling(Errno),
TimedOut,
}
impl From<NotifyError> for BarrierError {
fn from(value: NotifyError) -> Self {
Self::Notify(value)
}
}
impl Display for BarrierError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Notify(_) => write!(f, "Couldn't notify of barrier."),
Self::FailedPipeCreation(errno) => write!(
f,
"Couldn't create pipe to serve as a notify syncronisation barrier : error {errno}."
),
Self::FailedPolling(errno) => write!(f, "poll failed with error : {errno}."),
Self::TimedOut => write!(f, "Notification synchronisation timed out."),
}
}
}
impl Error for BarrierError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
Self::Notify(source) => Some(source),
Self::FailedPipeCreation(source) => Some(source),
Self::FailedPolling(source) => Some(source),
Self::TimedOut => None,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum PollTimeoutFromIntError {
Negative,
}
impl Display for PollTimeoutFromIntError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Negative => write!(f, "This variant of BarrierTimeout cannot be negative but a negative number was passed.")
}
}
}
impl Error for PollTimeoutFromIntError {}
#[derive(Debug)]
pub enum MicrosecondsFromDurationError {
NoMicroseconds,

View file

@ -20,9 +20,12 @@ use std::ffi::OsString;
use std::io::IoSlice;
use std::os::unix::net::UnixDatagram;
use rustix::fd::{AsFd, BorrowedFd};
use rustix::{
fd::{AsFd, BorrowedFd},
pipe::PipeFlags,
};
use self::error::{NewNotifierError, NotifyError};
use self::error::{BarrierError, NewNotifierError, NotifyError};
pub mod error;
pub mod types;
@ -146,10 +149,126 @@ impl Notifier {
///
/// # Errors
///
/// This function will error out if the passed [`NotifyState`] do not follow the rules set by systemd or if they couldn't be fully sent.
/// This function will error out if the passed [`NotifyState`] couldn't be fully sent.
pub fn notify(&self, state: &[NotifyState<'_>]) -> Result<(), NotifyError> {
self.notify_with_fds(state, &[])
}
/// Ensure that all previous notifications have been treated by the service manager.
///
/// **This is a blocking call. If you are using it in an async function you might want to use an equivalent of [`tokio::task::spawn_blocking`](https://docs.rs/tokio/latest/tokio/task/fn.spawn_blocking.html).**
///
/// # Errors
///
/// This function will error out if the synchronisation mechanism couldn't be created, the synchronising notification failed or the synchronisation timed out.
pub fn barrier(&self, timeout: types::BarrierTimeout) -> Result<(), BarrierError> {
let (to_poll, sent) = rustix::pipe::pipe_with(PipeFlags::CLOEXEC)
.map_err(BarrierError::FailedPipeCreation)?;
self.notify_with_fds(
&[NotifyState::Other(types::OtherState::barrier())],
&[sent.as_fd()],
)?;
core::mem::drop(sent);
let to_poll = rustix::event::PollFd::new(&to_poll, rustix::event::PollFlags::HUP);
rustix::event::poll(&mut [to_poll], timeout.to_raw())
.map_err(BarrierError::FailedPolling)
.and_then(|events| {
if events == 0_usize {
return Err(BarrierError::TimedOut);
}
Ok(())
})
}
/// Create a synchronizing RAII guard.
///
/// This create an RAII guard that automatically call [`barrier()`](Self::barrier) with the provided timeout when dropped.
///
/// Do note that this guard's [`Drop`] implementation will block for the provided timeout and ignore all errors returned by [`barrier()`](Self::barrier).
pub fn guard(&self, timeout: types::BarrierTimeout) -> NotifyBarrierGuard<'_> {
NotifyBarrierGuard {
notifier: self,
timeout,
}
}
/// Create a scope at the end of which all notifications sent inside should have been treated by the service manager.
pub fn with_guard<F, T>(&self, timeout: types::BarrierTimeout, f: F) -> T
where
F: FnOnce(NotifyBarrierGuard) -> T,
{
f(self.guard(timeout))
}
}
/// RAII guard automatically synchronizing the notifications with the service manager.
///
/// This is created by [`Notifier::guard`].
///
/// Do note that this guard's [`Drop`] implementation will block for the provided timeout and ignore all errors returned by [`barrier()`](Self::barrier).
pub struct NotifyBarrierGuard<'a> {
notifier: &'a Notifier,
timeout: types::BarrierTimeout,
}
impl<'a> NotifyBarrierGuard<'a> {
/// Notify service manager about status changes.
///
/// Send a notification to the manager about service status changes. Also see [`notify_with_fds()`](Self::notify_with_fds) to send file descriptors.
///
/// # Errors
///
/// This function will error out if the passed [`NotifyState`] do not follow the rules set by systemd or if they couldn't be fully sent.
#[inline]
pub fn notify(&self, state: &[NotifyState<'_>]) -> Result<(), NotifyError> {
self.notifier.notify(state)
}
/// Notify service manager about status change and send file descriptors.
///
/// Use this together with [`NotifyState::FdStore`]. Otherwise works like [`notify()`](Self::notify).
///
/// # Errors
///
/// This function will error out if the passed [`NotifyState`] do not follow the rules set by systemd or if they couldn't be fully sent.
#[inline]
pub fn notify_with_fds(
&self,
state: &[NotifyState<'_>],
fds: &[BorrowedFd],
) -> Result<(), NotifyError> {
self.notifier.notify_with_fds(state, fds)
}
/// Ensure that all previous notifications have been treated by the service manager.
///
/// **This is a blocking call. If you are using it in an async function you might want to use an equivalent of [`tokio::task::spawn_blocking`](https://docs.rs/tokio/latest/tokio/task/fn.spawn_blocking.html).**
///
/// # Errors
///
/// This function will error out if the synchronisation mechanism couldn't be created, the synchronising notification failed or the synchronisation timed out.
#[inline]
pub fn barrier(&self, timeout: types::BarrierTimeout) -> Result<(), BarrierError> {
self.notifier.barrier(timeout)
}
/// Create a scope at the end of which all notifications sent inside should have been treated by the service manager.
#[inline]
pub fn with_guard<F, T>(&self, timeout: types::BarrierTimeout, f: F) -> T
where
F: FnOnce(Self) -> T,
{
f(self.notifier.guard(timeout))
}
}
impl Drop for NotifyBarrierGuard<'_> {
fn drop(&mut self) {
self.barrier(self.timeout).unwrap_or_default();
}
}
/// Check for watchdog support at runtime
@ -231,17 +350,17 @@ pub enum NotifyState<'a> {
impl<'a> Display for NotifyState<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match *self {
NotifyState::BusError(types::BusError(s)) => write!(f, "BUSERROR={s}"),
NotifyState::BusError(s) => write!(f, "BUSERROR={s}"),
NotifyState::Errno(e) => write!(f, "ERRNO={e}"),
NotifyState::FdName(types::FdName(name)) => write!(f, "FDNAME={name}"),
NotifyState::FdName(name) => write!(f, "FDNAME={name}"),
NotifyState::FdStore => f.write_str("FDSTORE=1"),
NotifyState::FdStoreRemove => f.write_str("FDSTOREREMOVE=1"),
NotifyState::FdpollDisable => f.write_str("FDPOLL=0"),
NotifyState::Mainpid(pid) => write!(f, "MAINPID={pid}"),
NotifyState::Other(types::OtherState(message)) => f.write_str(message),
NotifyState::Other(message) => f.write_str(message.as_ref()),
NotifyState::Ready => f.write_str("READY=1"),
NotifyState::Reloading => f.write_str("RELOADING=1"),
NotifyState::Status(types::StatusLine(status)) => write!(f, "STATUS={status}"),
NotifyState::Status(status) => write!(f, "STATUS={status}"),
NotifyState::Stopping => f.write_str("STOPPING=1"),
NotifyState::Watchdog => f.write_str("WATCHDOG=1"),
NotifyState::WatchdogTrigger => f.write_str("WATCHDOG=trigger"),

View file

@ -13,7 +13,7 @@ use super::error;
/// * Doesn't contains control characters.
/// * Doesn't contains a colon (`:`).
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct FdName<'a>(pub(super) &'a str);
pub struct FdName<'a>(&'a str);
impl<'a> TryFrom<&'a str> for FdName<'a> {
type Error = error::FdNameError;
@ -48,11 +48,18 @@ impl AsRef<str> for FdName<'_> {
}
}
impl Display for FdName<'_> {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Display::fmt(self.0, f)
}
}
/// A status line for [`NotifyState::Status`](super::NotifyState::Status).
///
/// As the name explains it needs to be a single line.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct StatusLine<'a>(pub(super) &'a str);
pub struct StatusLine<'a>(&'a str);
impl<'a> TryFrom<&'a str> for StatusLine<'a> {
type Error = error::StatusLineError;
@ -72,6 +79,13 @@ impl AsRef<str> for StatusLine<'_> {
}
}
impl Display for StatusLine<'_> {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Display::fmt(self.0, f)
}
}
/// Semantic type representing a number of microseconds.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct Microseconds(Duration);
@ -93,11 +107,54 @@ impl Display for Microseconds {
}
}
/// Timeout for the [`Notifier::barrier()`](super::Notifier::barrier).
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum BarrierTimeout {
/// Will block indefinitely.
Infinite,
/// Will not block at all and return immediately even if no event has happened.
Immediate,
/// Will block for a number of milliseconds.
NonZero(PollTimeout),
}
impl BarrierTimeout {
pub(crate) const fn to_raw(self) -> i32 {
match self {
Self::Immediate => 0_i32,
Self::Infinite => -1_i32,
Self::NonZero(PollTimeout(timeout)) => timeout.get(),
}
}
}
/// Variant of [`BarrierTimeout`] for positive timeout.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct PollTimeout(core::num::NonZeroI32);
impl TryFrom<core::num::NonZeroI32> for PollTimeout {
type Error = error::PollTimeoutFromIntError;
fn try_from(milliseconds: core::num::NonZeroI32) -> Result<Self, Self::Error> {
if milliseconds.is_negative() {
return Err(Self::Error::Negative);
}
Ok(Self(milliseconds))
}
}
impl From<PollTimeout> for core::num::NonZeroI32 {
#[inline]
fn from(value: PollTimeout) -> Self {
value.0
}
}
/// A D-Bus error-style error code.
///
/// Right now it doesn't impose any additional constraint on [`str`]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct BusError<'a>(pub(super) &'a str);
pub struct BusError<'a>(&'a str);
impl<'a> From<&'a str> for BusError<'a> {
fn from(value: &'a str) -> Self {
@ -106,13 +163,24 @@ impl<'a> From<&'a str> for BusError<'a> {
}
impl AsRef<str> for BusError<'_> {
#[inline]
fn as_ref(&self) -> &str {
self.0
}
}
impl Display for BusError<'_> {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Display::fmt(self.0, f)
}
}
/// An arbitrary custom state other than `BARRIER=1`.
///
/// `BARRIER=1` is blocked as it result in special expectations by the protocol.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct OtherState<'a>(pub(super) &'a str);
pub struct OtherState<'a>(&'a str);
impl<'a> TryFrom<&'a str> for OtherState<'a> {
type Error = error::OtherStateError;
@ -135,7 +203,14 @@ impl<'a> TryFrom<&'a str> for OtherState<'a> {
}
impl AsRef<str> for OtherState<'_> {
#[inline]
fn as_ref(&self) -> &str {
self.0
}
}
impl OtherState<'_> {
pub(super) const fn barrier() -> Self {
Self("BARRIER=1")
}
}