1use super::{
2 peer_addr::PeerAddr,
3 peer_info::PeerInfo,
4 peer_source::PeerSource,
5 peer_state::PeerState,
6 runtime_id::PublicRuntimeId,
7 stats::{ByteCounters, StatsTracker},
8};
9use crate::{
10 collections::{hash_map::Entry, HashMap},
11 sync::{AwaitDrop, DropAwaitable, WatchSenderExt},
12};
13use serde::Serialize;
14use std::{
15 fmt,
16 sync::{
17 atomic::{AtomicU64, Ordering},
18 Arc,
19 },
20 time::SystemTime,
21};
22use tokio::sync::watch;
23
24pub(super) struct ConnectionSet {
26 connections: watch::Sender<HashMap<ConnectionKey, ConnectionData>>,
27}
28
29impl ConnectionSet {
30 pub fn new() -> Self {
31 Self {
32 connections: watch::Sender::new(HashMap::default()),
33 }
34 }
35
36 pub fn reserve(&self, addr: PeerAddr, source: PeerSource) -> ReserveResult {
41 let key = ConnectionKey {
42 addr,
43 dir: ConnectionDirection::from_source(source),
44 };
45
46 self.connections
47 .send_if_modified_return(|connections| match connections.entry(key) {
48 Entry::Vacant(entry) => {
49 let id = ConnectionId::next();
50
51 entry.insert(ConnectionData {
52 id,
53 state: PeerState::Known,
54 source,
55 stats_tracker: StatsTracker::default(),
56 on_release: DropAwaitable::new(),
57 });
58
59 (
60 true,
61 ReserveResult::Permit(ConnectionPermit {
62 connections: self.connections.clone(),
63 key,
64 id,
65 }),
66 )
67 }
68 Entry::Occupied(entry) => {
69 let peer_permit = entry.get();
70
71 (
72 false,
73 ReserveResult::Occupied(
74 peer_permit.on_release.subscribe(),
75 peer_permit.source,
76 peer_permit.id,
77 ),
78 )
79 }
80 })
81 }
82
83 pub fn peer_info_collector(&self) -> PeerInfoCollector {
84 PeerInfoCollector(self.connections.clone())
85 }
86
87 pub fn get_peer_info(&self, addr: PeerAddr) -> Option<PeerInfo> {
88 let connections = self.connections.borrow();
89
90 connections
91 .get(&ConnectionKey {
92 addr,
93 dir: ConnectionDirection::Incoming,
94 })
95 .or_else(|| {
96 connections.get(&ConnectionKey {
97 addr,
98 dir: ConnectionDirection::Outgoing,
99 })
100 })
101 .map(|data| data.peer_info(addr))
102 }
103
104 pub fn subscribe(&self) -> watch::Receiver<HashMap<ConnectionKey, ConnectionData>> {
105 self.connections.subscribe()
106 }
107}
108
109#[derive(Clone, Copy, Eq, PartialEq, Debug)]
113#[repr(transparent)]
114pub(super) struct ConnectionId(u64);
115
116impl ConnectionId {
117 pub fn next() -> Self {
118 static NEXT: AtomicU64 = AtomicU64::new(0);
119 Self(NEXT.fetch_add(1, Ordering::Relaxed))
120 }
121}
122
123impl fmt::Display for ConnectionId {
124 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
125 write!(f, "{}", self.0)
126 }
127}
128
129pub(super) enum ReserveResult {
130 Permit(ConnectionPermit),
131 Occupied(AwaitDrop, PeerSource, ConnectionId),
133}
134
135#[derive(Clone)]
136pub struct PeerInfoCollector(watch::Sender<HashMap<ConnectionKey, ConnectionData>>);
137
138impl PeerInfoCollector {
139 pub fn collect(&self) -> Vec<PeerInfo> {
140 self.0
141 .borrow()
142 .iter()
143 .map(|(key, data)| data.peer_info(key.addr))
144 .collect()
145 }
146}
147
148#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Serialize)]
149pub(super) enum ConnectionDirection {
150 Incoming,
151 Outgoing,
152}
153
154impl ConnectionDirection {
155 pub fn from_source(source: PeerSource) -> Self {
156 match source {
157 PeerSource::Listener => Self::Incoming,
158 PeerSource::UserProvided
159 | PeerSource::LocalDiscovery
160 | PeerSource::Dht
161 | PeerSource::PeerExchange => Self::Outgoing,
162 }
163 }
164}
165
166pub(super) struct ConnectionPermit {
169 connections: watch::Sender<HashMap<ConnectionKey, ConnectionData>>,
170 key: ConnectionKey,
171 id: ConnectionId,
172}
173
174impl ConnectionPermit {
175 pub fn mark_as_connecting(&self) {
176 self.set_state(PeerState::Connecting);
177 }
178
179 pub fn mark_as_handshaking(&self) {
180 self.set_state(PeerState::Handshaking);
181 }
182
183 pub fn mark_as_active(&self, runtime_id: PublicRuntimeId) {
184 self.set_state(PeerState::Active {
185 id: runtime_id,
186 since: SystemTime::now(),
187 });
188 }
189
190 fn set_state(&self, new_state: PeerState) {
191 self.connections.send_if_modified(|connections| {
192 let peer = connections.get_mut(&self.key).unwrap();
194
195 if peer.state != new_state {
196 peer.state = new_state;
197 true
198 } else {
199 false
200 }
201 });
202 }
203
204 pub fn released(&self) -> AwaitDrop {
206 self.with(|data| data.on_release.subscribe())
209 .unwrap_or_else(|| DropAwaitable::new().subscribe())
210 }
211
212 pub fn addr(&self) -> PeerAddr {
213 self.key.addr
214 }
215
216 pub fn id(&self) -> ConnectionId {
217 self.id
218 }
219
220 pub fn source(&self) -> PeerSource {
221 self.with(|data| data.source).unwrap()
223 }
224
225 pub fn byte_counters(&self) -> Arc<ByteCounters> {
226 self.with(|data| data.stats_tracker.bytes.clone())
227 .unwrap_or_default()
228 }
229
230 fn with<F, R>(&self, f: F) -> Option<R>
231 where
232 F: FnOnce(&ConnectionData) -> R,
233 {
234 self.connections.borrow().get(&self.key).map(f)
235 }
236}
237
238impl fmt::Debug for ConnectionPermit {
239 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
240 f.debug_struct("ConnectionPermit")
241 .field("key", &self.key)
242 .field("id", &self.id)
243 .finish_non_exhaustive()
244 }
245}
246
247impl Drop for ConnectionPermit {
248 fn drop(&mut self) {
249 self.connections.send_if_modified(|connections| {
250 let Entry::Occupied(entry) = connections.entry(self.key) else {
251 return false;
252 };
253
254 if entry.get().id != self.id {
255 return false;
256 }
257
258 entry.remove();
259 true
260 });
261 }
262}
263
264#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
265pub(super) struct ConnectionKey {
266 addr: PeerAddr,
267 dir: ConnectionDirection,
268}
269
270pub(super) struct ConnectionData {
271 id: ConnectionId,
272 state: PeerState,
273 source: PeerSource,
274 stats_tracker: StatsTracker,
275 on_release: DropAwaitable,
276}
277
278impl ConnectionData {
279 fn peer_info(&self, addr: PeerAddr) -> PeerInfo {
280 let stats = self.stats_tracker.read();
281
282 PeerInfo {
283 addr,
284 source: self.source,
285 state: self.state,
286 stats,
287 }
288 }
289}