#![allow(clippy::declare_interior_mutable_const)]
use crate::{crypto::sign::PublicKey, protocol::BlockId};
use core::fmt;
use futures_util::{stream, Stream};
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::sync::broadcast;
#[derive(Copy, Clone, Debug)]
#[non_exhaustive]
pub enum Payload {
SnapshotApproved(PublicKey),
SnapshotRejected(PublicKey),
BlockReceived(BlockId),
MaintenanceCompleted,
}
#[derive(Copy, Clone, Debug)]
pub struct Event {
pub payload: Payload,
pub(crate) scope: EventScope,
}
impl Event {
pub(crate) fn new(payload: Payload) -> Self {
Self {
payload,
scope: EventScope::DEFAULT,
}
}
pub(crate) fn with_scope(self, scope: EventScope) -> Self {
Self { scope, ..self }
}
}
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
pub(crate) struct EventScope(usize);
impl EventScope {
pub const DEFAULT: Self = Self(0);
pub fn new() -> Self {
static NEXT: AtomicUsize = AtomicUsize::new(1);
Self(NEXT.fetch_add(1, Ordering::Relaxed))
}
}
#[derive(Clone)]
pub(crate) struct EventSender {
inner: broadcast::Sender<Event>,
scope: EventScope,
}
impl EventSender {
pub fn new(capacity: usize) -> Self {
Self {
inner: broadcast::channel(capacity).0,
scope: EventScope::DEFAULT,
}
}
pub fn with_scope(self, scope: EventScope) -> Self {
Self { scope, ..self }
}
pub fn send(&self, payload: Payload) {
self.inner
.send(Event::new(payload).with_scope(self.scope))
.unwrap_or(0);
}
pub fn subscribe(&self) -> broadcast::Receiver<Event> {
self.inner.subscribe()
}
}
#[derive(Debug)]
pub(crate) struct Lagged;
impl fmt::Display for Lagged {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "event channel lagged")
}
}
impl std::error::Error for Lagged {}
pub(crate) fn into_stream(
rx: broadcast::Receiver<Event>,
) -> impl Stream<Item = Result<Event, Lagged>> {
stream::unfold(rx, |mut rx| async move {
match rx.recv().await {
Ok(event) => Some((Ok(event), rx)),
Err(broadcast::error::RecvError::Lagged(_)) => Some((Err(Lagged), rx)),
Err(broadcast::error::RecvError::Closed) => None,
}
})
}