Skip to main content

ouisync/network/
connection.rs

1use super::{
2    peer_addr::PeerAddr,
3    peer_info::PeerInfo,
4    peer_source::{ConnectionDirection, PeerSource},
5    peer_state::PeerState,
6    runtime_id::PublicRuntimeId,
7    stats::{ByteCounters, StatsTracker},
8};
9use crate::sync::{AwaitDrop, DropAwaitable, WatchSenderExt};
10use std::{
11    collections::{HashMap, hash_map::Entry},
12    fmt,
13    sync::{
14        Arc,
15        atomic::{AtomicU64, Ordering},
16    },
17    time::SystemTime,
18};
19use tokio::sync::watch;
20
21/// Container for known connections.
22pub(super) struct ConnectionSet {
23    connections: watch::Sender<HashMap<ConnectionKey, ConnectionData>>,
24}
25
26impl ConnectionSet {
27    pub fn new() -> Self {
28        Self {
29            connections: watch::Sender::new(HashMap::default()),
30        }
31    }
32
33    /// Attempt to reserve an connection to the given peer. If the connection hasn't been reserved
34    /// yet, it returns a `ConnectionPermit` which keeps the connection reserved as long as it
35    /// lives. Otherwise it returns `None`. To release a connection the permit needs to be dropped.
36    /// Also returns a notification object that can be used to wait until the permit gets released.
37    pub fn reserve(&self, addr: PeerAddr, source: PeerSource) -> ReserveResult {
38        let key = ConnectionKey {
39            addr,
40            dir: source.direction(),
41        };
42
43        self.connections
44            .send_if_modified_return(|connections| match connections.entry(key) {
45                Entry::Vacant(entry) => {
46                    let id = ConnectionId::next();
47
48                    entry.insert(ConnectionData {
49                        id,
50                        state: InnerPeerState::Known,
51                        source,
52                        stats_tracker: StatsTracker::default(),
53                        on_release: DropAwaitable::new(),
54                    });
55
56                    (
57                        true,
58                        ReserveResult::Permit(ConnectionPermit {
59                            connections: self.connections.clone(),
60                            key,
61                            id,
62                        }),
63                    )
64                }
65                Entry::Occupied(entry) => {
66                    let data = entry.get();
67
68                    (
69                        false,
70                        ReserveResult::Occupied(data.on_release.subscribe(), data.source, data.id),
71                    )
72                }
73            })
74    }
75
76    pub fn peer_info_collector(&self) -> PeerInfoCollector {
77        PeerInfoCollector(self.connections.clone())
78    }
79
80    pub fn get_peer_info(&self, addr: PeerAddr) -> Option<PeerInfo> {
81        self.with(addr, |data| data.peer_info(addr))
82    }
83
84    pub fn get_peer_key(&self, addr: PeerAddr) -> Option<usize> {
85        self.with(addr, |data| match data.state {
86            InnerPeerState::Active { peer_key, .. } => Some(peer_key),
87            InnerPeerState::Known | InnerPeerState::Connecting | InnerPeerState::Handshaking => {
88                None
89            }
90        })
91        .flatten()
92    }
93
94    fn with<F, R>(&self, addr: PeerAddr, f: F) -> Option<R>
95    where
96        F: FnOnce(&ConnectionData) -> R,
97    {
98        let connections = self.connections.borrow();
99
100        connections
101            .get(&ConnectionKey {
102                addr,
103                dir: ConnectionDirection::Incoming,
104            })
105            .or_else(|| {
106                connections.get(&ConnectionKey {
107                    addr,
108                    dir: ConnectionDirection::Outgoing,
109                })
110            })
111            .map(f)
112    }
113
114    pub fn subscribe(&self) -> watch::Receiver<HashMap<ConnectionKey, ConnectionData>> {
115        self.connections.subscribe()
116    }
117}
118
119/// Unique identifier of a connection. Connections are mostly already identified by the peer address
120/// and direction (incoming / outgoing), but this type allows to distinguish even connections with
121/// the same address/direction but that were established in two separate occasions.
122#[derive(Clone, Copy, Eq, PartialEq, Debug)]
123#[repr(transparent)]
124pub(super) struct ConnectionId(u64);
125
126impl ConnectionId {
127    pub fn next() -> Self {
128        static NEXT: AtomicU64 = AtomicU64::new(0);
129        Self(NEXT.fetch_add(1, Ordering::Relaxed))
130    }
131}
132
133impl fmt::Display for ConnectionId {
134    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
135        write!(f, "{}", self.0)
136    }
137}
138
139pub(super) enum ReserveResult {
140    Permit(ConnectionPermit),
141    // Use the receiver to get notified when the existing permit is destroyed.
142    Occupied(AwaitDrop, PeerSource, ConnectionId),
143}
144
145#[derive(Clone)]
146pub struct PeerInfoCollector(watch::Sender<HashMap<ConnectionKey, ConnectionData>>);
147
148impl PeerInfoCollector {
149    pub fn collect(&self) -> Vec<PeerInfo> {
150        self.0
151            .borrow()
152            .iter()
153            .map(|(key, data)| data.peer_info(key.addr))
154            .collect()
155    }
156}
157
158/// Connection permit that prevents another connection to the same peer (socket address) to be
159/// established as long as it remains in scope.
160pub(super) struct ConnectionPermit {
161    connections: watch::Sender<HashMap<ConnectionKey, ConnectionData>>,
162    key: ConnectionKey,
163    id: ConnectionId,
164}
165
166impl ConnectionPermit {
167    pub fn mark_as_connecting(&self) {
168        self.set_state(InnerPeerState::Connecting);
169    }
170
171    pub fn mark_as_handshaking(&self) {
172        self.set_state(InnerPeerState::Handshaking);
173    }
174
175    pub fn mark_as_active(&self, runtime_id: PublicRuntimeId, peer_key: usize) {
176        self.set_state(InnerPeerState::Active {
177            id: runtime_id,
178            since: SystemTime::now(),
179            peer_key,
180        });
181    }
182
183    fn set_state(&self, new_state: InnerPeerState) {
184        self.connections.send_if_modified(|connections| {
185            // unwrap is ok because if `self` exists then the entry should exists as well.
186            let peer = connections.get_mut(&self.key).unwrap();
187
188            if peer.state != new_state {
189                peer.state = new_state;
190                true
191            } else {
192                false
193            }
194        });
195    }
196
197    /// Returns a `AwaitDrop` that gets notified when this permit gets released.
198    pub fn released(&self) -> AwaitDrop {
199        self.with(|data| data.on_release.subscribe())
200    }
201
202    pub fn addr(&self) -> PeerAddr {
203        self.key.addr
204    }
205
206    pub fn id(&self) -> ConnectionId {
207        self.id
208    }
209
210    pub fn source(&self) -> PeerSource {
211        self.with(|data| data.source)
212    }
213
214    pub fn byte_counters(&self) -> Arc<ByteCounters> {
215        self.with(|data| data.stats_tracker.bytes.clone())
216    }
217
218    fn with<F, R>(&self, f: F) -> R
219    where
220        F: FnOnce(&ConnectionData) -> R,
221    {
222        // unwrap is ok because if `self` exists then the entry exists as well.
223        f(self.connections.borrow().get(&self.key).unwrap())
224    }
225}
226
227impl fmt::Debug for ConnectionPermit {
228    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
229        f.debug_struct("ConnectionPermit")
230            .field("key", &self.key)
231            .field("id", &self.id)
232            .finish_non_exhaustive()
233    }
234}
235
236impl Drop for ConnectionPermit {
237    fn drop(&mut self) {
238        self.connections.send_if_modified(|connections| {
239            let Entry::Occupied(entry) = connections.entry(self.key) else {
240                return false;
241            };
242
243            if entry.get().id != self.id {
244                return false;
245            }
246
247            entry.remove();
248            true
249        });
250    }
251}
252
253#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
254pub(super) struct ConnectionKey {
255    addr: PeerAddr,
256    dir: ConnectionDirection,
257}
258
259pub(super) struct ConnectionData {
260    id: ConnectionId,
261    state: InnerPeerState,
262    source: PeerSource,
263    stats_tracker: StatsTracker,
264    on_release: DropAwaitable,
265}
266
267impl ConnectionData {
268    fn peer_info(&self, addr: PeerAddr) -> PeerInfo {
269        let stats = self.stats_tracker.read();
270
271        PeerInfo {
272            addr,
273            source: self.source,
274            state: self.state.to_peer_state(),
275            stats,
276        }
277    }
278}
279
280#[derive(Eq, PartialEq)]
281enum InnerPeerState {
282    Known,
283    Connecting,
284    Handshaking,
285    Active {
286        id: PublicRuntimeId,
287        since: SystemTime,
288        peer_key: usize,
289    },
290}
291
292impl InnerPeerState {
293    fn to_peer_state(&self) -> PeerState {
294        match self {
295            Self::Known => PeerState::Known,
296            Self::Connecting => PeerState::Connecting,
297            Self::Handshaking => PeerState::Handshaking,
298            Self::Active { id, since, .. } => PeerState::Active {
299                id: *id,
300                since: *since,
301            },
302        }
303    }
304}