ouisync/
event.rs

1// Probably false positive triggered by `task_local`
2#![allow(clippy::declare_interior_mutable_const)]
3
4use crate::{crypto::sign::PublicKey, protocol::BlockId};
5use core::fmt;
6use futures_util::{stream, Stream};
7use std::sync::atomic::{AtomicUsize, Ordering};
8use tokio::sync::broadcast;
9
10#[derive(Copy, Clone, Debug)]
11#[non_exhaustive]
12pub enum Payload {
13    /// A new snapshot was approved in the specified branch.
14    SnapshotApproved(PublicKey),
15    /// A new snapshot was rejected in the specified branch.
16    SnapshotRejected(PublicKey),
17    /// A block with the specified id was received from a remote replica.
18    BlockReceived(BlockId),
19    /// The `maintain` worker job successfully completed. It won't perform any more work until
20    /// triggered again by any of the above events.
21    /// This event is useful mostly for diagnostics or testing and can be safely ignored in other
22    /// contexts.
23    MaintenanceCompleted,
24}
25
26/// Notification event
27#[derive(Copy, Clone, Debug)]
28pub struct Event {
29    /// Event payload.
30    pub payload: Payload,
31    /// Event scope. Can be used to distinguish which part of the code the event was emitted from.
32    /// Scope can be set by running the event-emitting task with `EventScope::apply`. If no scope
33    /// is set, uses `EventScope::DEFAULT`.
34    pub(crate) scope: EventScope,
35}
36
37impl Event {
38    pub(crate) fn new(payload: Payload) -> Self {
39        Self {
40            payload,
41            scope: EventScope::DEFAULT,
42        }
43    }
44
45    pub(crate) fn with_scope(self, scope: EventScope) -> Self {
46        Self { scope, ..self }
47    }
48}
49
50#[derive(Copy, Clone, Eq, PartialEq, Debug)]
51pub(crate) struct EventScope(usize);
52
53impl EventScope {
54    pub const DEFAULT: Self = Self(0);
55
56    /// Creates new scope.
57    pub fn new() -> Self {
58        static NEXT: AtomicUsize = AtomicUsize::new(1);
59        Self(NEXT.fetch_add(1, Ordering::Relaxed))
60    }
61}
62
63#[derive(Clone)]
64pub(crate) struct EventSender {
65    inner: broadcast::Sender<Event>,
66    scope: EventScope,
67}
68
69impl EventSender {
70    pub fn new(capacity: usize) -> Self {
71        Self {
72            inner: broadcast::channel(capacity).0,
73            scope: EventScope::DEFAULT,
74        }
75    }
76
77    pub fn with_scope(self, scope: EventScope) -> Self {
78        Self { scope, ..self }
79    }
80
81    pub fn send(&self, payload: Payload) {
82        self.inner
83            .send(Event::new(payload).with_scope(self.scope))
84            .unwrap_or(0);
85    }
86
87    pub fn subscribe(&self) -> broadcast::Receiver<Event> {
88        self.inner.subscribe()
89    }
90}
91
92#[derive(Debug)]
93pub(crate) struct Lagged;
94
95impl fmt::Display for Lagged {
96    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
97        write!(f, "event channel lagged")
98    }
99}
100
101impl std::error::Error for Lagged {}
102
103/// Converts event receiver into a `Stream`.
104pub(crate) fn into_stream(
105    rx: broadcast::Receiver<Event>,
106) -> impl Stream<Item = Result<Event, Lagged>> {
107    stream::unfold(rx, |mut rx| async move {
108        match rx.recv().await {
109            Ok(event) => Some((Ok(event), rx)),
110            Err(broadcast::error::RecvError::Lagged(_)) => Some((Err(Lagged), rx)),
111            Err(broadcast::error::RecvError::Closed) => None,
112        }
113    })
114}