1#![allow(clippy::declare_interior_mutable_const)]
3
4use crate::{crypto::sign::PublicKey, protocol::BlockId};
5use core::fmt;
6use futures_util::{stream, Stream};
7use std::sync::atomic::{AtomicUsize, Ordering};
8use tokio::sync::broadcast;
9
10#[derive(Copy, Clone, Debug)]
11#[non_exhaustive]
12pub enum Payload {
13 SnapshotApproved(PublicKey),
15 SnapshotRejected(PublicKey),
17 BlockReceived(BlockId),
19 MaintenanceCompleted,
24}
25
26#[derive(Copy, Clone, Debug)]
28pub struct Event {
29 pub payload: Payload,
31 pub(crate) scope: EventScope,
35}
36
37impl Event {
38 pub(crate) fn new(payload: Payload) -> Self {
39 Self {
40 payload,
41 scope: EventScope::DEFAULT,
42 }
43 }
44
45 pub(crate) fn with_scope(self, scope: EventScope) -> Self {
46 Self { scope, ..self }
47 }
48}
49
50#[derive(Copy, Clone, Eq, PartialEq, Debug)]
51pub(crate) struct EventScope(usize);
52
53impl EventScope {
54 pub const DEFAULT: Self = Self(0);
55
56 pub fn new() -> Self {
58 static NEXT: AtomicUsize = AtomicUsize::new(1);
59 Self(NEXT.fetch_add(1, Ordering::Relaxed))
60 }
61}
62
63#[derive(Clone)]
64pub(crate) struct EventSender {
65 inner: broadcast::Sender<Event>,
66 scope: EventScope,
67}
68
69impl EventSender {
70 pub fn new(capacity: usize) -> Self {
71 Self {
72 inner: broadcast::channel(capacity).0,
73 scope: EventScope::DEFAULT,
74 }
75 }
76
77 pub fn with_scope(self, scope: EventScope) -> Self {
78 Self { scope, ..self }
79 }
80
81 pub fn send(&self, payload: Payload) {
82 self.inner
83 .send(Event::new(payload).with_scope(self.scope))
84 .unwrap_or(0);
85 }
86
87 pub fn subscribe(&self) -> broadcast::Receiver<Event> {
88 self.inner.subscribe()
89 }
90}
91
92#[derive(Debug)]
93pub(crate) struct Lagged;
94
95impl fmt::Display for Lagged {
96 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
97 write!(f, "event channel lagged")
98 }
99}
100
101impl std::error::Error for Lagged {}
102
103pub(crate) fn into_stream(
105 rx: broadcast::Receiver<Event>,
106) -> impl Stream<Item = Result<Event, Lagged>> {
107 stream::unfold(rx, |mut rx| async move {
108 match rx.recv().await {
109 Ok(event) => Some((Ok(event), rx)),
110 Err(broadcast::error::RecvError::Lagged(_)) => Some((Err(Lagged), rx)),
111 Err(broadcast::error::RecvError::Closed) => None,
112 }
113 })
114}