ouisync/network/
connection.rs

1use super::{
2    peer_addr::PeerAddr,
3    peer_info::PeerInfo,
4    peer_source::PeerSource,
5    peer_state::PeerState,
6    runtime_id::PublicRuntimeId,
7    stats::{ByteCounters, StatsTracker},
8};
9use crate::{
10    collections::{hash_map::Entry, HashMap},
11    sync::{AwaitDrop, DropAwaitable, WatchSenderExt},
12};
13use serde::Serialize;
14use std::{
15    fmt,
16    sync::{
17        atomic::{AtomicU64, Ordering},
18        Arc,
19    },
20    time::SystemTime,
21};
22use tokio::sync::watch;
23
24/// Container for known connections.
25pub(super) struct ConnectionSet {
26    connections: watch::Sender<HashMap<ConnectionKey, ConnectionData>>,
27}
28
29impl ConnectionSet {
30    pub fn new() -> Self {
31        Self {
32            connections: watch::Sender::new(HashMap::default()),
33        }
34    }
35
36    /// Attempt to reserve an connection to the given peer. If the connection hasn't been reserved
37    /// yet, it returns a `ConnectionPermit` which keeps the connection reserved as long as it
38    /// lives. Otherwise it returns `None`. To release a connection the permit needs to be dropped.
39    /// Also returns a notification object that can be used to wait until the permit gets released.
40    pub fn reserve(&self, addr: PeerAddr, source: PeerSource) -> ReserveResult {
41        let key = ConnectionKey {
42            addr,
43            dir: ConnectionDirection::from_source(source),
44        };
45
46        self.connections
47            .send_if_modified_return(|connections| match connections.entry(key) {
48                Entry::Vacant(entry) => {
49                    let id = ConnectionId::next();
50
51                    entry.insert(ConnectionData {
52                        id,
53                        state: PeerState::Known,
54                        source,
55                        stats_tracker: StatsTracker::default(),
56                        on_release: DropAwaitable::new(),
57                    });
58
59                    (
60                        true,
61                        ReserveResult::Permit(ConnectionPermit {
62                            connections: self.connections.clone(),
63                            key,
64                            id,
65                        }),
66                    )
67                }
68                Entry::Occupied(entry) => {
69                    let peer_permit = entry.get();
70
71                    (
72                        false,
73                        ReserveResult::Occupied(
74                            peer_permit.on_release.subscribe(),
75                            peer_permit.source,
76                            peer_permit.id,
77                        ),
78                    )
79                }
80            })
81    }
82
83    pub fn peer_info_collector(&self) -> PeerInfoCollector {
84        PeerInfoCollector(self.connections.clone())
85    }
86
87    pub fn get_peer_info(&self, addr: PeerAddr) -> Option<PeerInfo> {
88        let connections = self.connections.borrow();
89
90        connections
91            .get(&ConnectionKey {
92                addr,
93                dir: ConnectionDirection::Incoming,
94            })
95            .or_else(|| {
96                connections.get(&ConnectionKey {
97                    addr,
98                    dir: ConnectionDirection::Outgoing,
99                })
100            })
101            .map(|data| data.peer_info(addr))
102    }
103
104    pub fn subscribe(&self) -> watch::Receiver<HashMap<ConnectionKey, ConnectionData>> {
105        self.connections.subscribe()
106    }
107}
108
109/// Unique identifier of a connection. Connections are mostly already identified by the peer address
110/// and direction (incoming / outgoing), but this type allows to distinguish even connections with
111/// the same address/direction but that were established in two separate occasions.
112#[derive(Clone, Copy, Eq, PartialEq, Debug)]
113#[repr(transparent)]
114pub(super) struct ConnectionId(u64);
115
116impl ConnectionId {
117    pub fn next() -> Self {
118        static NEXT: AtomicU64 = AtomicU64::new(0);
119        Self(NEXT.fetch_add(1, Ordering::Relaxed))
120    }
121}
122
123impl fmt::Display for ConnectionId {
124    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125        write!(f, "{}", self.0)
126    }
127}
128
129pub(super) enum ReserveResult {
130    Permit(ConnectionPermit),
131    // Use the receiver to get notified when the existing permit is destroyed.
132    Occupied(AwaitDrop, PeerSource, ConnectionId),
133}
134
135#[derive(Clone)]
136pub struct PeerInfoCollector(watch::Sender<HashMap<ConnectionKey, ConnectionData>>);
137
138impl PeerInfoCollector {
139    pub fn collect(&self) -> Vec<PeerInfo> {
140        self.0
141            .borrow()
142            .iter()
143            .map(|(key, data)| data.peer_info(key.addr))
144            .collect()
145    }
146}
147
148#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Serialize)]
149pub(super) enum ConnectionDirection {
150    Incoming,
151    Outgoing,
152}
153
154impl ConnectionDirection {
155    pub fn from_source(source: PeerSource) -> Self {
156        match source {
157            PeerSource::Listener => Self::Incoming,
158            PeerSource::UserProvided
159            | PeerSource::LocalDiscovery
160            | PeerSource::Dht
161            | PeerSource::PeerExchange => Self::Outgoing,
162        }
163    }
164}
165
166/// Connection permit that prevents another connection to the same peer (socket address) to be
167/// established as long as it remains in scope.
168pub(super) struct ConnectionPermit {
169    connections: watch::Sender<HashMap<ConnectionKey, ConnectionData>>,
170    key: ConnectionKey,
171    id: ConnectionId,
172}
173
174impl ConnectionPermit {
175    pub fn mark_as_connecting(&self) {
176        self.set_state(PeerState::Connecting);
177    }
178
179    pub fn mark_as_handshaking(&self) {
180        self.set_state(PeerState::Handshaking);
181    }
182
183    pub fn mark_as_active(&self, runtime_id: PublicRuntimeId) {
184        self.set_state(PeerState::Active {
185            id: runtime_id,
186            since: SystemTime::now(),
187        });
188    }
189
190    fn set_state(&self, new_state: PeerState) {
191        self.connections.send_if_modified(|connections| {
192            // unwrap is ok because if `self` exists then the entry should exists as well.
193            let peer = connections.get_mut(&self.key).unwrap();
194
195            if peer.state != new_state {
196                peer.state = new_state;
197                true
198            } else {
199                false
200            }
201        });
202    }
203
204    /// Returns a `AwaitDrop` that gets notified when this permit gets released.
205    pub fn released(&self) -> AwaitDrop {
206        // We can't use unwrap here because this method is used in `ConnectionPermitHalf` which can
207        // outlive the entry if the other half gets dropped.
208        self.with(|data| data.on_release.subscribe())
209            .unwrap_or_else(|| DropAwaitable::new().subscribe())
210    }
211
212    pub fn addr(&self) -> PeerAddr {
213        self.key.addr
214    }
215
216    pub fn id(&self) -> ConnectionId {
217        self.id
218    }
219
220    pub fn source(&self) -> PeerSource {
221        // unwrap is ok because if `self` exists then the entry should exists as well.
222        self.with(|data| data.source).unwrap()
223    }
224
225    pub fn byte_counters(&self) -> Arc<ByteCounters> {
226        self.with(|data| data.stats_tracker.bytes.clone())
227            .unwrap_or_default()
228    }
229
230    fn with<F, R>(&self, f: F) -> Option<R>
231    where
232        F: FnOnce(&ConnectionData) -> R,
233    {
234        self.connections.borrow().get(&self.key).map(f)
235    }
236}
237
238impl fmt::Debug for ConnectionPermit {
239    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
240        f.debug_struct("ConnectionPermit")
241            .field("key", &self.key)
242            .field("id", &self.id)
243            .finish_non_exhaustive()
244    }
245}
246
247impl Drop for ConnectionPermit {
248    fn drop(&mut self) {
249        self.connections.send_if_modified(|connections| {
250            let Entry::Occupied(entry) = connections.entry(self.key) else {
251                return false;
252            };
253
254            if entry.get().id != self.id {
255                return false;
256            }
257
258            entry.remove();
259            true
260        });
261    }
262}
263
264#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
265pub(super) struct ConnectionKey {
266    addr: PeerAddr,
267    dir: ConnectionDirection,
268}
269
270pub(super) struct ConnectionData {
271    id: ConnectionId,
272    state: PeerState,
273    source: PeerSource,
274    stats_tracker: StatsTracker,
275    on_release: DropAwaitable,
276}
277
278impl ConnectionData {
279    fn peer_info(&self, addr: PeerAddr) -> PeerInfo {
280        let stats = self.stats_tracker.read();
281
282        PeerInfo {
283            addr,
284            source: self.source,
285            state: self.state,
286            stats,
287        }
288    }
289}