//! Interface for the `NOTIFY_SOCKET` readiness protocol //! //! The entry point of this module is the [`Notifier`] struct. //! //! # Example //! //! ```no_run //! # main() -> Result<(), Box> { //! // Do whatever you need for your service to be ready. //! if let Some(notifier) = Notifier::new(true)? { //! notifier.notify(&[NotifyState::Ready])? //! } //! # Ok(()) //! # } //! ``` use core::fmt::Display; use std::env; use std::ffi::OsString; use std::io::IoSlice; use std::os::unix::net::UnixDatagram; use rustix::{ fd::{AsFd, BorrowedFd}, pipe::PipeFlags, }; use self::error::{BarrierError, NewNotifierError, NotifyError}; pub mod error; pub mod types; /// A wrapper around the socket specified by `$NOTIFY_SOCKET` #[derive(Debug)] pub struct Notifier { sock_addr: rustix::net::SocketAddrUnix, socket: UnixDatagram, } impl Notifier { /// Create a new [`Notifier`]. /// /// If `unset_env` is set to `true` this will unset `NOTIFY_SOCKET` resulting in further call to this function to return `Ok(None)`. /// /// Returns `Ok(None)` if notification isn't supported (`NOTIFY_SOCKET` isn't defined). /// /// # Errors /// /// This function error out if the socket couldn't be opened. pub fn new(unset_env: bool) -> Result, NewNotifierError> { let span = tracing::info_span!("new", f_unset_env = ?unset_env); let _enter = span.enter(); tracing::info!("Opening NOTIFY_SOCKET if available."); let env_sock = match env::var_os("NOTIFY_SOCKET") { None => return Ok(None), Some(v) => v, }; tracing::debug!("NOTIFY_SOCKET = {env_sock:?}"); if unset_env { tracing::trace!("Removing NOTIFY_SOCKET from environment."); env::remove_var("NOTIFY_SOCKET"); } // False positive #[allow(clippy::single_match_else)] // If the first character of `$NOTIFY_SOCKET` is '@', the string // is understood as Linux abstract namespace socket. let socket_addr = match env_sock.as_encoded_bytes().strip_prefix(b"@").map(|v| { // SAFETY: // - Only strip ASCII '@' which is a non-empty UTF-8 substring unsafe { OsString::from_encoded_bytes_unchecked(v.to_vec()) } }) { Some(stripped_addr) => { tracing::trace!("Opening abstract socket {stripped_addr:?}."); rustix::net::SocketAddrUnix::new_abstract_name(stripped_addr.as_encoded_bytes()) .map_err(NewNotifierError::InvalidAbstractSocket)? } None => { tracing::trace!("Opening named socket {env_sock:?}."); rustix::net::SocketAddrUnix::new(env_sock) .map_err(NewNotifierError::InvalidSocketPath)? } }; let socket = UnixDatagram::unbound().map_err(NewNotifierError::CouldntOpenSocket)?; let ret = Self { sock_addr: socket_addr, socket, }; Ok(Some(ret)) } /// Notify service manager about status change and send file descriptors. /// /// Use this together with [`NotifyState::FdStore`]. Otherwise works like [`Notifier::notify()`]. /// /// # Errors /// /// This function will error out if the passed [`NotifyState`] do not follow the rules set by systemd or if they couldn't be fully sent. pub fn notify_with_fds( &self, state: &[NotifyState<'_>], fds: &[BorrowedFd], ) -> Result<(), NotifyError> { let span = tracing::info_span!("notify_with_fds", f_self = ?self, f_state = ?state, f_fds = ?fds); let _enter = span.enter(); let msg = state .iter() .fold(String::new(), |acc, state| format!("{acc}{state}\n")) .into_bytes(); let msg_len = msg.len(); let msg_iov = IoSlice::new(&msg); let mut ancillary = if fds.is_empty() { tracing::trace!("No file descriptors provided, not sending ancillary messages."); rustix::net::SendAncillaryBuffer::default() } else { tracing::trace!( "{} file descriptors provided, sending through ancillary messages", fds.len() ); let mut ancillary = rustix::net::SendAncillaryBuffer::default(); let tmp = rustix::net::SendAncillaryMessage::ScmRights(fds); if !ancillary.push(tmp) { return Err(NotifyError::PushAncillaryMessage); } ancillary }; span.record("expected_length", msg_len); tracing::debug!("Sending notification messages."); let sent_len = rustix::net::sendmsg_unix( self.socket.as_fd(), &self.sock_addr, &[msg_iov], &mut ancillary, rustix::net::SendFlags::empty(), ) .map_err(NotifyError::SendMsg)?; span.record("sent_length", sent_len); tracing::debug!("Notification message sent. {sent_len} bytes sent."); if sent_len != msg_len { tracing::error!("The notification message couldn't be completely sent!"); return Err(NotifyError::PartialSend); } Ok(()) } /// Notify service manager about status changes. /// /// Send a notification to the manager about service status changes. Also see [`Notifier::notify_with_fds()`] to send file descriptors. /// /// # Errors /// /// This function will error out if the passed [`NotifyState`] couldn't be fully sent. pub fn notify(&self, state: &[NotifyState<'_>]) -> Result<(), NotifyError> { self.notify_with_fds(state, &[]) } /// Ensure that all previous notifications have been treated by the service manager. /// /// **This is a blocking call. If you are using it in an async function you might want to use an equivalent of [`tokio::task::spawn_blocking`](https://docs.rs/tokio/latest/tokio/task/fn.spawn_blocking.html).** /// /// # Errors /// /// This function will error out if the synchronisation mechanism couldn't be created, the synchronising notification failed or the synchronisation timed out. pub fn barrier(&self, timeout: types::BarrierTimeout) -> Result<(), BarrierError> { let (to_poll, sent) = rustix::pipe::pipe_with(PipeFlags::CLOEXEC) .map_err(BarrierError::FailedPipeCreation)?; self.notify_with_fds( &[NotifyState::Other(types::OtherState::barrier())], &[sent.as_fd()], )?; core::mem::drop(sent); let to_poll = rustix::event::PollFd::new(&to_poll, rustix::event::PollFlags::HUP); rustix::event::poll(&mut [to_poll], timeout.to_raw()) .map_err(BarrierError::FailedPolling) .and_then(|events| { if events == 0_usize { return Err(BarrierError::TimedOut); } Ok(()) }) } /// Create a synchronizing RAII guard. /// /// This create an RAII guard that automatically call [`barrier()`](Self::barrier) with the provided timeout when dropped. /// /// Do note that this guard's [`Drop`] implementation will block for the provided timeout and ignore all errors returned by [`barrier()`](Self::barrier). pub fn guard(&self, timeout: types::BarrierTimeout) -> NotifyBarrierGuard<'_> { NotifyBarrierGuard { notifier: self, timeout, } } /// Create a scope at the end of which all notifications sent inside should have been treated by the service manager. pub fn with_guard(&self, timeout: types::BarrierTimeout, f: F) -> T where F: FnOnce(NotifyBarrierGuard) -> T, { f(self.guard(timeout)) } } /// RAII guard automatically synchronizing the notifications with the service manager. /// /// This is created by [`Notifier::guard`]. /// /// Do note that this guard's [`Drop`] implementation will block for the provided timeout and ignore all errors returned by [`barrier()`](Self::barrier). pub struct NotifyBarrierGuard<'a> { notifier: &'a Notifier, timeout: types::BarrierTimeout, } impl<'a> NotifyBarrierGuard<'a> { /// Notify service manager about status changes. /// /// Send a notification to the manager about service status changes. Also see [`notify_with_fds()`](Self::notify_with_fds) to send file descriptors. /// /// # Errors /// /// This function will error out if the passed [`NotifyState`] do not follow the rules set by systemd or if they couldn't be fully sent. #[inline] pub fn notify(&self, state: &[NotifyState<'_>]) -> Result<(), NotifyError> { self.notifier.notify(state) } /// Notify service manager about status change and send file descriptors. /// /// Use this together with [`NotifyState::FdStore`]. Otherwise works like [`notify()`](Self::notify). /// /// # Errors /// /// This function will error out if the passed [`NotifyState`] do not follow the rules set by systemd or if they couldn't be fully sent. #[inline] pub fn notify_with_fds( &self, state: &[NotifyState<'_>], fds: &[BorrowedFd], ) -> Result<(), NotifyError> { self.notifier.notify_with_fds(state, fds) } /// Ensure that all previous notifications have been treated by the service manager. /// /// **This is a blocking call. If you are using it in an async function you might want to use an equivalent of [`tokio::task::spawn_blocking`](https://docs.rs/tokio/latest/tokio/task/fn.spawn_blocking.html).** /// /// # Errors /// /// This function will error out if the synchronisation mechanism couldn't be created, the synchronising notification failed or the synchronisation timed out. #[inline] pub fn barrier(&self, timeout: types::BarrierTimeout) -> Result<(), BarrierError> { self.notifier.barrier(timeout) } /// Create a scope at the end of which all notifications sent inside should have been treated by the service manager. #[inline] pub fn with_guard(&self, timeout: types::BarrierTimeout, f: F) -> T where F: FnOnce(Self) -> T, { f(self.notifier.guard(timeout)) } } impl Drop for NotifyBarrierGuard<'_> { fn drop(&mut self) { self.barrier(self.timeout).unwrap_or_default(); } } /// Check for watchdog support at runtime /// /// If `unset_env` is true, the environment variables related to watchdog support will be cleared. /// /// # Return /// /// * [`None`] if watchdog support is not enabled. /// * The timeout before which the watchdog expects a response from the process otherwise. pub fn is_watchdog_enabled(unset_env: bool) -> Option { let timeout = std::env::var("WATCHDOG_USEC").ok(); let watchdog_pid = std::env::var("WATCHDOG_PID").ok(); if unset_env { std::env::remove_var("WATCHDOG_USEC"); std::env::remove_var("WATCHDOG_PID"); } let timeout = timeout .and_then(|timeout| timeout.parse::().ok()) .map(std::time::Duration::from_micros)?; let watchdog_pid = if let Some(pid) = watchdog_pid { pid.parse::().ok()? } else { return Some(timeout); }; if watchdog_pid == std::process::id() { Some(timeout) } else { None } } /// Status changes, see `sd_notify(3)`. #[allow(clippy::module_name_repetitions)] #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum NotifyState<'a> { /// D-Bus error-style error code. BusError(types::BusError<'a>), /// errno-style error code. Errno(u8), /// A name for the submitted file descriptors. FdName(types::FdName<'a>), /// Stores additional file descriptors in the service manager. Use [`Notifier::notify_with_fds()`] with this. FdStore, /// Remove stored file descriptors. Must be used together with [`NotifyState::FdName`]. FdStoreRemove, /// Tell the service manager to not poll the filedescriptors for errors. This causes /// systemd to hold on to broken file descriptors which must be removed manually. /// Must be used together with [`NotifyState::FdStore`]. FdpollDisable, /// The main process ID of the service, in case of forking applications. Mainpid(libc::pid_t), /// Custom state change, as a `KEY=VALUE` string. Other(types::OtherState<'a>), /// Service startup is finished. Ready, /// Service is reloading. Reloading, /// Custom status change. Status(types::StatusLine<'a>), /// Service is beginning to shutdown. Stopping, /// Tell the service manager to update the watchdog timestamp. Watchdog, /// Tell the service manager to execute the configured watchdog option. WatchdogTrigger, /// Reset watchdog timeout value during runtime. /// Minimal precision is microseconds, not nanoseconds. WatchdogUsec(types::Microseconds), /// Tells the service manager to extend the startup, runtime or shutdown service timeout corresponding the current state. /// Minimal precision is microseconds, not nanoseconds. ExtendTimeoutUsec(types::Microseconds), } impl<'a> Display for NotifyState<'a> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match *self { NotifyState::BusError(s) => write!(f, "BUSERROR={s}"), NotifyState::Errno(e) => write!(f, "ERRNO={e}"), NotifyState::FdName(name) => write!(f, "FDNAME={name}"), NotifyState::FdStore => f.write_str("FDSTORE=1"), NotifyState::FdStoreRemove => f.write_str("FDSTOREREMOVE=1"), NotifyState::FdpollDisable => f.write_str("FDPOLL=0"), NotifyState::Mainpid(pid) => write!(f, "MAINPID={pid}"), NotifyState::Other(message) => f.write_str(message.as_ref()), NotifyState::Ready => f.write_str("READY=1"), NotifyState::Reloading => f.write_str("RELOADING=1"), NotifyState::Status(status) => write!(f, "STATUS={status}"), NotifyState::Stopping => f.write_str("STOPPING=1"), NotifyState::Watchdog => f.write_str("WATCHDOG=1"), NotifyState::WatchdogTrigger => f.write_str("WATCHDOG=trigger"), NotifyState::WatchdogUsec(duration) => { write!(f, "WATCHDOG_USEC={duration}") } NotifyState::ExtendTimeoutUsec(duration) => { write!(f, "EXTEND_TIMEOUT_USEC={duration}") } } } }